diff --git a/libs/api/include/rtbot/OperatorJson.h b/libs/api/include/rtbot/OperatorJson.h index e9d186cb..81487c34 100644 --- a/libs/api/include/rtbot/OperatorJson.h +++ b/libs/api/include/rtbot/OperatorJson.h @@ -31,7 +31,9 @@ #include "rtbot/std/InfiniteImpulseResponse.h" #include "rtbot/std/Linear.h" #include "rtbot/std/MovingAverage.h" +#include "rtbot/std/MovingSum.h" #include "rtbot/std/PeakDetector.h" +#include "rtbot/std/Replace.h" #include "rtbot/std/ResamplerConstant.h" #include "rtbot/std/ResamplerHermite.h" #include "rtbot/std/StandardDeviation.h" @@ -73,6 +75,8 @@ class OperatorJson { return make_output(id, parsed["portTypes"].get>()); } else if (type == "MovingAverage") { return make_moving_average(id, parsed["window_size"].get()); + } else if (type == "MovingSum") { + return make_moving_sum(id, parsed["window_size"].get()); } else if (type == "StandardDeviation") { return make_std_dev(id, parsed["window_size"].get()); } else if (type == "FiniteImpulseResponse") { @@ -131,6 +135,8 @@ class OperatorJson { return make_constant_boolean_to_number(id, parsed["value"].get()); } else if (type == "LessThan") { return make_less_than(id, parsed["value"].get()); + } else if (type == "LessThanOrEqualToReplace") { + return make_less_than_or_equal_to_replace(id, parsed["value"].get(), parsed["replaceBy"].get()); } else if (type == "EqualTo") { return make_equal_to(id, parsed["value"].get(), parsed.value("epsilon", 1e-10)); } else if (type == "NotEqualTo") { @@ -264,6 +270,8 @@ class OperatorJson { j["portTypes"] = std::dynamic_pointer_cast(op)->get_port_types(); } else if (type == "MovingAverage") { j["window_size"] = std::dynamic_pointer_cast(op)->window_size(); + } else if (type == "MovingSum") { + j["window_size"] = std::dynamic_pointer_cast(op)->window_size(); } else if (type == "StandardDeviation") { j["window_size"] = std::dynamic_pointer_cast(op)->window_size(); } else if (type == "FiniteImpulseResponse") { @@ -305,8 +313,9 @@ class OperatorJson { j["epsilon"] = std::dynamic_pointer_cast(op)->get_epsilon(); } else if (type == "GreaterThan") { j["value"] = std::dynamic_pointer_cast(op)->get_threshold(); - } else if (type == "LessThan") { - j["value"] = std::dynamic_pointer_cast(op)->get_threshold(); + } else if (type == "LessThanOrEqualToReplace") { + j["value"] = std::dynamic_pointer_cast(op)->get_threshold(); + j["replaceBy"] = std::dynamic_pointer_cast(op)->get_replace_by(); } else if (type == "LogicalAnd" || type == "LogicalOr" || type == "LogicalXor" || type == "LogicalNand" || type == "LogicalNor" || type == "LogicalImplication") { j["numPorts"] = std::dynamic_pointer_cast(op)->get_num_ports(); @@ -369,6 +378,7 @@ class OperatorJson { } else { throw std::runtime_error("Unknown operator type: " + type); } + return j.dump(); } }; diff --git a/libs/api/test/test_program.cpp b/libs/api/test/test_program.cpp index 3f0df777..397a0734 100644 --- a/libs/api/test/test_program.cpp +++ b/libs/api/test/test_program.cpp @@ -441,6 +441,283 @@ SCENARIO("Program handles Pipeline operators and resets", "[program][pipeline]") } } +SCENARIO("Program handles complex Pipeline operators and resets", "[program][pipeline]") { + GIVEN("A complex program with a Pipeline") { + std::string program_json = R"({ + "apiVersion": "v1", + "operators": [ + { + "id": "input", + "type": "Input", + "portTypes": [ + "number", + "number" + ] + }, + { + "id": "hi_input_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "lo_input_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "hiresampler", + "type": "ResamplerConstant", + "interval": 5000 + }, + { + "id": "loresampler", + "type": "ResamplerConstant", + "interval": 5000 + }, + { + "id": "hiresampler_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "loresampler_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "power1", + "type": "Scale", + "value": 220 + }, + { + "id": "power2", + "type": "Scale", + "value": 220 + }, + { + "id": "power1_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "power2_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "total_power", + "type": "Addition" + }, + { + "id": "total_power_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "hourly", + "type": "Pipeline", + "input_port_types": [ + "number" + ], + "output_port_types": [ + "number" + ], + "operators": [ + { + "id": "trapezoid", + "type": "MovingAverage", + "window_size": 2 + }, + { + "id": "trapezoid_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "wh", + "type": "Scale", + "value": 0.00138888888 + }, + { + "id": "wh_cutoff", + "type": "LessThanOrEqualToReplace", + "value": 0.5, + "replaceBy": 0.0 + }, + { + "id": "hourly_ms", + "type": "MovingSum", + "window_size": 720 + } + ], + "connections": [ + { + "from": "trapezoid", + "to": "trapezoid_cutoff" + }, + { + "from": "trapezoid_cutoff", + "to": "wh" + }, + { + "from": "wh", + "to": "wh_cutoff" + }, + { + "from": "wh_cutoff", + "to": "hourly_ms" + } + ], + "entryOperator": "trapezoid", + "outputMappings": { + "hourly_ms": { + "o1": "o1" + } + } + }, + { + "id": "output", + "type": "Output", + "portTypes": [ + "number" + ] + } + ], + "connections": [ + { + "from": "input", + "to": "hi_input_cutoff", + "fromPort": "o1", + "toPort": "i1" + }, + { + "from": "input", + "to": "lo_input_cutoff", + "fromPort": "o2", + "toPort": "i1" + }, + { + "from": "hi_input_cutoff", + "to": "hiresampler" + }, + { + "from": "lo_input_cutoff", + "to": "loresampler" + }, + { + "from": "hiresampler", + "to": "hiresampler_cutoff" + }, + { + "from": "hiresampler_cutoff", + "to": "power1" + }, + { + "from": "loresampler", + "to": "loresampler_cutoff" + }, + { + "from": "loresampler_cutoff", + "to": "power2" + }, + { + "from": "power1", + "to": "power1_cutoff" + }, + { + "from": "power2", + "to": "power2_cutoff" + }, + { + "from": "power1_cutoff", + "to": "total_power", + "fromPort": "o1", + "toPort": "i1" + }, + { + "from": "power2_cutoff", + "to": "total_power", + "fromPort": "o1", + "toPort": "i2" + }, + { + "from": "total_power", + "to": "total_power_cutoff" + }, + { + "from": "total_power_cutoff", + "to": "hourly" + }, + { + "from": "hourly", + "to": "output", + "fromPort": "o1", + "toPort": "i1" + } + ], + "entryOperator": "input", + "output": { + "output": [ + "o1" + ] + }, + "title": "Power Consumption Monitor with Multiple Averages", + "description": "Calculates hourly power consumption from two current inputs" +})"; + + Program program(program_json); + + WHEN("Processing messages") { + int t = 5000; + for (int h = 1; h <= 20; h++) { + ProgramMsgBatch final_batch; + // int iterations = 0; + if (h % 2 == 1) { + while (final_batch.size() == 0) { + program.receive({t, NumberData{30.23}}, "i1"); + final_batch = program.receive({t, NumberData{10.5802}}, "i2"); + t = t + 5000; + // iterations++; + } + } else { + while (final_batch.size() == 0) { + program.receive({t, NumberData{0.01}}, "i1"); + final_batch = program.receive({t, NumberData{0.01}}, "i2"); + t = t + 5000; + // iterations++; + } + } + + if (final_batch.size() == 1 && h % 2 == 1) { + const auto* out_msg = dynamic_cast*>(final_batch["output"]["o1"].back().get()); + REQUIRE(out_msg->data.value > 0.0); + /*std::cout << " time " << out_msg->time << std::endl; + std::cout << " value " << out_msg->data.value << std::endl; + std::cout << " iterations " << iterations << std::endl; + std::cout << " ----------------------------- " << std::endl;*/ + } else if (final_batch.size() == 1 && h % 2 == 0) { + const auto* out_msg = dynamic_cast*>(final_batch["output"]["o1"].back().get()); + REQUIRE(out_msg->data.value == 0.0); + /*std::cout << " time " << out_msg->time << std::endl; + std::cout << " value " << out_msg->data.value << std::endl; + std::cout << " iterations " << iterations << std::endl; + std::cout << " ----------------------------- " << std::endl;*/ + } else { + FAIL(true); + } + } + } + } +} + SCENARIO("Program handles Pipeline serialization", "[program][pipeline]") { GIVEN("A program with a stateful Pipeline") { std::string program_json = R"({ diff --git a/libs/core/include/rtbot/Pipeline.h b/libs/core/include/rtbot/Pipeline.h index 404a6418..fdc0d2c7 100644 --- a/libs/core/include/rtbot/Pipeline.h +++ b/libs/core/include/rtbot/Pipeline.h @@ -129,7 +129,7 @@ class Pipeline : public Operator { entry_operator_->execute(); input_queue.pop_front(); // Process output mappings - bool was_reseted = false; + bool was_reset = false; for (const auto& [op_id, mappings] : output_mappings_) { auto it = operators_.find(op_id); if (it != operators_.end()) { @@ -139,23 +139,22 @@ class Pipeline : public Operator { const auto& source_queue = op->get_output_queue(operator_port); // Only forward if source operator has produced output on the mapped port if (!source_queue.empty()) { - was_reseted = false; auto& target_queue = get_output_queue(pipeline_port); for (const auto& msg : source_queue) { RTBOT_LOG_DEBUG("Forwarding message ", msg->to_string(), " from ", op_id, " -> ", pipeline_port); target_queue.push_back(msg->clone()); reset(); - was_reseted = true; + was_reset = true; break; } } } - if (was_reseted) { + if (was_reset) { break; } } } - if (was_reseted) { + if (was_reset) { break; } } diff --git a/libs/std/include/rtbot/std/Collector.h b/libs/std/include/rtbot/std/Collector.h deleted file mode 100644 index ba70e46f..00000000 --- a/libs/std/include/rtbot/std/Collector.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef COLLECTOR_H -#define COLLECTOR_H - -#include "Operator.h" - -namespace rtbot { - -template -struct Collector : public Operator { - Collector() = default; - Collector(string const& id, size_t n = 1) : Operator(id) { this->addDataInput("i1", n); } - - string typeName() const override { return "Collector"; } - - OperatorMessage processData() override { return {}; } -}; - -} // end namespace rtbot - -#endif // COLLECTOR_H diff --git a/libs/std/include/rtbot/std/CosineResampler.h b/libs/std/include/rtbot/std/CosineResampler.h index 3b095711..ea9fd2b6 100644 --- a/libs/std/include/rtbot/std/CosineResampler.h +++ b/libs/std/include/rtbot/std/CosineResampler.h @@ -12,4 +12,6 @@ namespace rtbot { using namespace std; // TODO +} // namespace rtbot + #endif // COSINERESAMPLER_H diff --git a/libs/std/include/rtbot/std/MovingSum.h b/libs/std/include/rtbot/std/MovingSum.h new file mode 100644 index 00000000..271616db --- /dev/null +++ b/libs/std/include/rtbot/std/MovingSum.h @@ -0,0 +1,46 @@ +#ifndef MOVING_SUM_H +#define MOVING_SUM_H + +#include + +#include "rtbot/Buffer.h" +#include "rtbot/Message.h" +#include "rtbot/Operator.h" +#include "rtbot/PortType.h" + +namespace rtbot { + +// MovingSum only needs sum tracking +struct MovingSumFeatures { + static constexpr bool TRACK_SUM = true; + static constexpr bool TRACK_VARIANCE = false; +}; + +class MovingSum : public Buffer { + public: + MovingSum(std::string id, size_t window_size) : Buffer(std::move(id), window_size) {} + + std::string type_name() const override { return "MovingSum"; } + + protected: + std::vector>> process_message(const Message *msg) override { + // Only emit messages when the buffer is full to ensure + // we have enough data for a proper moving sum + if (!this->buffer_full()) { + return {}; + } + + // Create output message with same timestamp but sum value + std::vector>> v; + v.push_back(create_message(msg->time, NumberData{this->sum()})); + return v; + } +}; + +inline std::shared_ptr make_moving_sum(std::string id, size_t window_size) { + return std::make_shared(std::move(id), window_size); +} + +} // namespace rtbot + +#endif // MOVING_SUM_H \ No newline at end of file diff --git a/libs/std/include/rtbot/std/MovingSum.md b/libs/std/include/rtbot/std/MovingSum.md new file mode 100644 index 00000000..fa6e5c5a --- /dev/null +++ b/libs/std/include/rtbot/std/MovingSum.md @@ -0,0 +1,130 @@ +--- +behavior: + buffered: true + throughput: constant +view: + shape: circle + latex: + template: | + MS({{window_size}}) +jsonschema: + type: object + properties: + id: + type: string + description: The id of the operator + examples: ["ms1"] + window_size: + type: integer + description: The size of the moving sum window + minimum: 1 + examples: [20] + required: ["id", "window_size"] +--- + +# MovingSum + +The MovingSum operator calculates a rolling sum over a sliding window of numeric values. Each time a new value arrives, it's added to the buffer and included in the sum calculation. When the buffer reaches its specified size, the oldest value is removed before adding new ones. + +## Configuration + +### Required Parameters + +- `id`: Unique identifier for the operator +- `window_size`: Number of values to include in the moving sum window (must be >= 1) + +### Example Configuration + +```json +{ + "id": "ms1", + "window_size": 20 +} +``` + +## Port Configuration + +### Inputs + +- `i1`: Single input port accepting NumberData messages + +### Outputs + +- `o1`: Single output port emitting NumberData messages with added values + +## Operation + +1. Incoming messages are added to a sliding window buffer +2. The sum is calculated as the sum of all values when the buffer gets full +3. Output messages contain: + - Same timestamp as input message + - Value field contains the current sum +4. Initial output begins as soon as the window size length of the buffer is reached +5. The buffer automatically manages the window size by removing oldest values when full + +## Implementation Details + +The operator: + +- Uses a simple sum calculation +- Maintains a running sum for efficiency +- Automatically handles buffer overflow +- Processes messages in order of arrival +- Preserves input message timestamps + +### Memory Usage + +- O(N) where N is the window_size +- Fixed memory footprint once buffer fills +- No dynamic allocation during normal operation + +### Performance Characteristics + +- Message insertion: O(1) +- Average calculation: O(1) +- Memory cleanup: O(1) +- Output generation: O(1) + +## Error Handling + +The operator will throw exceptions for: + +- Invalid window size (must be >= 1) +- Invalid message types +- Type mismatches on input port + +## Use Cases + +Ideal for: + +- Area below a curve +- Data preprocessing +- Real-time monitoring + +## Examples + +### Basic Usage + +```cpp +// Create operator with window size 5 +auto ms = std::make_unique("ms1", 2); + +// Process some values +ms->receive_data(create_message(1, NumberData{10.0}), 0); +ms->receive_data(create_message(2, NumberData{20.0}), 0); +ms->execute(); + +// Access current average +double sum = ms->sum(); +``` + +### Pipeline Integration + +```cpp +auto input = make_number_input("in1"); +auto ms = std::make_unique("ms1", 10); +auto output = make_number_output("out1"); + +input->connect(ms.get()); +ms->connect(output.get()); +``` diff --git a/libs/std/include/rtbot/std/Replace.h b/libs/std/include/rtbot/std/Replace.h new file mode 100644 index 00000000..4204d430 --- /dev/null +++ b/libs/std/include/rtbot/std/Replace.h @@ -0,0 +1,72 @@ +#ifndef REPLACE_H +#define REPLACE_H + +#include +#include +#include + +#include "rtbot/Message.h" +#include "rtbot/Operator.h" + +namespace rtbot { + +class Replace : public Operator { + public: + Replace(std::string id) : Operator(std::move(id)) { + // Add single input and output port for numeric data + add_data_port(); + add_output_port(); + } + + virtual ~Replace() = default; + + // Pure virtual method that derived classes must implement + virtual double replace(double value) const = 0; + + protected: + void process_data() override { + auto& input_queue = get_data_queue(0); + auto& output_queue = get_output_queue(0); + + while (!input_queue.empty()) { + const auto* msg = dynamic_cast*>(input_queue.front().get()); + if (!msg) { + throw std::runtime_error("Invalid message type in Replace"); + } + + if (!std::isnan(msg->data.value) && std::isfinite(msg->data.value)) { + double replacement = replace(msg->data.value); + output_queue.push_back(create_message(msg->time, NumberData{replacement})); + } + + input_queue.pop_front(); + } + } +}; + +// Concrete implementations for various filter operations + +class LessThanOrEqualToReplace : public Replace { + public: + LessThanOrEqualToReplace(std::string id, double threshold, double replaceBy) + : Replace(std::move(id)), threshold_(threshold), replaceBy_(replaceBy) {} + std::string type_name() const override { return "LessThanOrEqualToReplace"; } + + double get_threshold() const { return threshold_; } + double get_replace_by() const { return replaceBy_; } + double replace(double x) const override { return (x <= threshold_) ? replaceBy_ : x; } + + private: + double threshold_; + double replaceBy_; +}; + +// Factory functions +inline std::shared_ptr make_less_than_or_equal_to_replace(std::string id, double threshold, + double replaceBy) { + return std::make_shared(std::move(id), threshold, replaceBy); +} + +} // namespace rtbot + +#endif // REPLACE_H \ No newline at end of file diff --git a/libs/std/include/rtbot/std/Replace.md b/libs/std/include/rtbot/std/Replace.md new file mode 100644 index 00000000..5b5cfc70 --- /dev/null +++ b/libs/std/include/rtbot/std/Replace.md @@ -0,0 +1,66 @@ +--- +behavior: + buffered: true + throughput: variable +view: + shape: circle +operators: + LessThanOrEqualToReplace: + latex: + template: | + <= {{value}} +jsonschemas: + - type: object + title: LessThanOrEqualToReplace + properties: + type: + type: string + enum: ["LessThanOrEqualToReplace"] + id: + type: string + description: The id of the operator + value: + type: number + description: The threshold value to compare against + examples: [5.0] + replaceBy: + type: number + description: The replacement value + examples: [0.0] + required: ["id", "value", "replaceBy"] +--- + +# LessThanOrEqualToReplace + +The LessThanOrEqualToReplace operator replace the values of certain messages and replace those by {replaceBy} if the values of those messages are less than or Equal to the threshold value + +## Configuration + +- `value`: Reference value to compare against +- `replaceBy`: The value used to replace + +## Port Configuration + +Inputs: + +- Port 0: Accepts NumberData messages + +Outputs: + +- Port 0: Emits matching NumberData messages + +## Operation + +The operator compares each input value against the reference: + +- If input_value <= reference_value then the input values is replaced by {replaceBy} +- Otherwise: Message is forwarded without changes + +## Error Handling + +- Throws if receiving invalid message types + +## Performance + +- O(1) processing per message +- No message buffering diff --git a/libs/std/test/test_moving_sum.cpp b/libs/std/test/test_moving_sum.cpp new file mode 100644 index 00000000..04b30053 --- /dev/null +++ b/libs/std/test/test_moving_sum.cpp @@ -0,0 +1,175 @@ +#include +#include + +#include "rtbot/std/MovingSum.h" + +using namespace rtbot; + +SCENARIO("MovingSum operator handles basic calculations", "[moving_sum]") { + GIVEN("A MovingSum operator with window size 3") { + auto ms = MovingSum("test_ms", 3); + + WHEN("Receiving less messages than window size") { + ms.receive_data(create_message(1, NumberData{2.0}), 0); + ms.execute(); + + THEN("No output is produced") { + const auto& output = ms.get_output_queue(0); + REQUIRE(output.empty()); + REQUIRE(ms.sum() == 2.0); + } + + AND_WHEN("Second message arrives") { + ms.receive_data(create_message(2, NumberData{4.0}), 0); + ms.execute(); + + THEN("Still no output but average is updated") { + const auto& output = ms.get_output_queue(0); + REQUIRE(output.empty()); + REQUIRE(ms.sum() == 6.0); // (2 + 4) + } + } + } + + WHEN("Buffer becomes full") { + // Fill buffer with values [2.0, 4.0, 6.0] + ms.receive_data(create_message(1, NumberData{2.0}), 0); + ms.execute(); + ms.receive_data(create_message(2, NumberData{4.0}), 0); + ms.execute(); + ms.receive_data(create_message(3, NumberData{6.0}), 0); + ms.execute(); + + THEN("Output is produced with correct average") { + const auto& output = ms.get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 3); + REQUIRE(msg->data.value == 12.0); // (2 + 4 + 6) + } + + AND_WHEN("New value arrives") { + ms.clear_all_output_ports(); + ms.receive_data(create_message(4, NumberData{8.0}), 0); + ms.execute(); + + THEN("Window slides and new average is calculated") { + const auto& output = ms.get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg != nullptr); + REQUIRE(msg->time == 4); + REQUIRE(msg->data.value == 18); // (4 + 6 + 8) + } + } + } + } +} + +SCENARIO("MovingSum operator handles state serialization", "[moving_sum]") { + GIVEN("A MovingSum operator with some data") { + auto ms = MovingSum("test_ms", 3); + + // Add some data + ms.receive_data(create_message(1, NumberData{2.0}), 0); + ms.execute(); + ms.receive_data(create_message(2, NumberData{4.0}), 0); + ms.execute(); + + WHEN("State is serialized and restored") { + // Serialize state + Bytes state = ms.collect(); + + // Create new operator + auto restored = MovingSum("test_ms", 3); + + // Restore state + Bytes::const_iterator it = state.cbegin(); + restored.restore(it); + + THEN("State is correctly preserved") { + REQUIRE(restored.sum() == ms.sum()); + + AND_WHEN("New data is added to both") { + ms.receive_data(create_message(3, NumberData{6.0}), 0); + restored.receive_data(create_message(3, NumberData{6.0}), 0); + + ms.execute(); + restored.execute(); + + THEN("Both produce identical output") { + const auto& orig_output = ms.get_output_queue(0); + const auto& rest_output = restored.get_output_queue(0); + + REQUIRE(orig_output.size() == rest_output.size()); + + if (!orig_output.empty()) { + const auto* orig_msg = dynamic_cast*>(orig_output.front().get()); + const auto* rest_msg = dynamic_cast*>(rest_output.front().get()); + + REQUIRE(orig_msg->time == rest_msg->time); + REQUIRE(orig_msg->data.value == rest_msg->data.value); + } + } + } + } + } + } +} + +SCENARIO("MovingSum operator handles edge cases", "[moving_sum]") { + SECTION("Invalid window size") { REQUIRE_THROWS_AS(MovingSum("test_ms", 0), std::runtime_error); } + + SECTION("Window size of 1") { + auto ms = MovingSum("test_ms", 1); + + ms.receive_data(create_message(1, NumberData{42.0}), 0); + ms.execute(); + + const auto& output = ms.get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg->data.value == 42.0); + } + + SECTION("Large numbers") { + auto ms = MovingSum("test_ms", 3); + double large_value = 1e15; + + ms.receive_data(create_message(1, NumberData{large_value}), 0); + ms.execute(); + ms.receive_data(create_message(2, NumberData{large_value + 3}), 0); + ms.execute(); + ms.receive_data(create_message(3, NumberData{large_value + 6}), 0); + ms.execute(); + + const auto& output = ms.get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + REQUIRE(msg->data.value == 3 * large_value + 9); // Sum of the three values + } + + SECTION("Numerical stability") { + auto ms = MovingSum("test_ms", 3); + + // Add sequence of very small and very large numbers + ms.receive_data(create_message(1, NumberData{1e-10}), 0); + ms.execute(); + ms.receive_data(create_message(2, NumberData{1e10}), 0); + ms.execute(); + ms.receive_data(create_message(3, NumberData{1e-10}), 0); + ms.execute(); + + const auto& output = ms.get_output_queue(0); + REQUIRE(output.size() == 1); + + const auto* msg = dynamic_cast*>(output.front().get()); + double expected = (1e-10 + 1e10 + 1e-10); + REQUIRE(msg->data.value == expected); + } +} \ No newline at end of file diff --git a/libs/std/test/test_replace.cpp b/libs/std/test/test_replace.cpp new file mode 100644 index 00000000..5b1c476c --- /dev/null +++ b/libs/std/test/test_replace.cpp @@ -0,0 +1,114 @@ +#include +#include +#include +#include + +#include "rtbot/std/Replace.h" + +using namespace rtbot; + +SCENARIO("Replace derived classes handle basic filtering", "[replace_op]") { + SECTION("LessThanOrEqualToReplace operator") { + auto ltR = make_less_than_or_equal_to_replace("ltR", 3.0, 1.0); + + REQUIRE(ltR->type_name() == "LessThanOrEqualToReplace"); + REQUIRE(dynamic_cast(ltR.get())->get_threshold() == 3.0); + REQUIRE(dynamic_cast(ltR.get())->get_replace_by() == 1.0); + + std::vector> inputs = { + {1, 1.0}, // Should be replaced + {2, 4.0}, // maintain + {4, 2.5}, // Should be replaced + {5, 3.0} // Should be replaced + }; + + std::vector> expected = {{1, 1.0}, {2, 4.0}, {4, 1.0}, {5, 1.0}}; + + for (const auto& input : inputs) { + ltR->receive_data(create_message(input.first, NumberData{input.second}), 0); + } + ltR->execute(); + + auto& output = ltR->get_output_queue(0); + REQUIRE(output.size() == expected.size()); + + for (size_t i = 0; i < output.size(); ++i) { + auto* msg = dynamic_cast*>(output[i].get()); + REQUIRE(msg->time == expected[i].first); + REQUIRE(msg->data.value == expected[i].second); + } + } +} + +SCENARIO("LessThanOrEqualToReplace handles edge cases", "[replace_op]") { + SECTION("NaN values") { + auto ltR = make_less_than_or_equal_to_replace("ltR", 1.0, 0.0); + + ltR->receive_data(create_message(1, NumberData{std::numeric_limits::quiet_NaN()}), 0); + ltR->execute(); + + auto& output = ltR->get_output_queue(0); + REQUIRE(output.empty()); // NaN comparisons should fail + } + + SECTION("Infinity values") { + auto ltR = make_less_than_or_equal_to_replace("ltR", std::numeric_limits::infinity(), 0.0); + + ltR->receive_data(create_message(1, NumberData{1.0}), 0); + ltR->execute(); + + auto& output = ltR->get_output_queue(0); + REQUIRE(!output.empty()); // Finite values should be less than infinity + } +} + +SCENARIO("LessThanOrEqualToReplace handles error cases", "[replace_op]") { + SECTION("Invalid message type") { + auto ltR = make_less_than_or_equal_to_replace("ltR", 3.0, 1.9); + + REQUIRE_THROWS_AS(ltR->receive_data(create_message(1, BooleanData{true}), 0), std::runtime_error); + } + + SECTION("Invalid port index") { + auto ltR = make_less_than_or_equal_to_replace("ltR", 3.0, 9.0); + + REQUIRE_THROWS_AS(ltR->receive_data(create_message(1, NumberData{1.0}), 1), std::runtime_error); + } +} + +SCENARIO("ReplaceOp handles serialization", "[replace_op]") { + SECTION("LessThanOrEqualToReplace operator serialization") { + auto ltR = make_less_than_or_equal_to_replace("ltR", 3.0, 2.0); + + // Fill with some data + ltR->receive_data(create_message(1, NumberData{1.0}), 0); + ltR->receive_data(create_message(2, NumberData{4.0}), 0); + ltR->execute(); + + // Serialize state + Bytes state = ltR->collect(); + + // Create new operator and restore state + auto restored = make_less_than_or_equal_to_replace("ltR", 3.0, 2.0); + auto it = state.cbegin(); + restored->restore(it); + + // Verify restored state + REQUIRE(restored->type_name() == ltR->type_name()); + REQUIRE(dynamic_cast(restored.get())->get_threshold() == + dynamic_cast(ltR.get())->get_threshold()); + REQUIRE(dynamic_cast(restored.get())->get_replace_by() == + dynamic_cast(ltR.get())->get_replace_by()); + + // Process new data and verify behavior + restored->clear_all_output_ports(); + restored->receive_data(create_message(3, NumberData{1.0}), 0); + restored->execute(); + + auto& output = restored->get_output_queue(0); + REQUIRE(!output.empty()); + auto* msg = dynamic_cast*>(output[0].get()); + REQUIRE(msg->time == 3); + REQUIRE(msg->data.value == 2.0); + } +} \ No newline at end of file