@@ -441,6 +441,283 @@ SCENARIO("Program handles Pipeline operators and resets", "[program][pipeline]")
441441 }
442442}
443443
444+ SCENARIO (" Program handles complex Pipeline operators and resets" , " [program][pipeline]" ) {
445+ GIVEN (" A complex program with a Pipeline" ) {
446+ std::string program_json = R"( {
447+ "apiVersion": "v1",
448+ "operators": [
449+ {
450+ "id": "input",
451+ "type": "Input",
452+ "portTypes": [
453+ "number",
454+ "number"
455+ ]
456+ },
457+ {
458+ "id": "hi_input_cutoff",
459+ "type": "LessThanOrEqualToReplace",
460+ "value": 0.5,
461+ "replaceBy": 0.0
462+ },
463+ {
464+ "id": "lo_input_cutoff",
465+ "type": "LessThanOrEqualToReplace",
466+ "value": 0.5,
467+ "replaceBy": 0.0
468+ },
469+ {
470+ "id": "hiresampler",
471+ "type": "ResamplerConstant",
472+ "interval": 5000
473+ },
474+ {
475+ "id": "loresampler",
476+ "type": "ResamplerConstant",
477+ "interval": 5000
478+ },
479+ {
480+ "id": "hiresampler_cutoff",
481+ "type": "LessThanOrEqualToReplace",
482+ "value": 0.5,
483+ "replaceBy": 0.0
484+ },
485+ {
486+ "id": "loresampler_cutoff",
487+ "type": "LessThanOrEqualToReplace",
488+ "value": 0.5,
489+ "replaceBy": 0.0
490+ },
491+ {
492+ "id": "power1",
493+ "type": "Scale",
494+ "value": 220
495+ },
496+ {
497+ "id": "power2",
498+ "type": "Scale",
499+ "value": 220
500+ },
501+ {
502+ "id": "power1_cutoff",
503+ "type": "LessThanOrEqualToReplace",
504+ "value": 0.5,
505+ "replaceBy": 0.0
506+ },
507+ {
508+ "id": "power2_cutoff",
509+ "type": "LessThanOrEqualToReplace",
510+ "value": 0.5,
511+ "replaceBy": 0.0
512+ },
513+ {
514+ "id": "total_power",
515+ "type": "Addition"
516+ },
517+ {
518+ "id": "total_power_cutoff",
519+ "type": "LessThanOrEqualToReplace",
520+ "value": 0.5,
521+ "replaceBy": 0.0
522+ },
523+ {
524+ "id": "hourly",
525+ "type": "Pipeline",
526+ "input_port_types": [
527+ "number"
528+ ],
529+ "output_port_types": [
530+ "number"
531+ ],
532+ "operators": [
533+ {
534+ "id": "trapezoid",
535+ "type": "MovingAverage",
536+ "window_size": 2
537+ },
538+ {
539+ "id": "trapezoid_cutoff",
540+ "type": "LessThanOrEqualToReplace",
541+ "value": 0.5,
542+ "replaceBy": 0.0
543+ },
544+ {
545+ "id": "wh",
546+ "type": "Scale",
547+ "value": 0.00138888888
548+ },
549+ {
550+ "id": "wh_cutoff",
551+ "type": "LessThanOrEqualToReplace",
552+ "value": 0.5,
553+ "replaceBy": 0.0
554+ },
555+ {
556+ "id": "hourly_ms",
557+ "type": "MovingSum",
558+ "window_size": 720
559+ }
560+ ],
561+ "connections": [
562+ {
563+ "from": "trapezoid",
564+ "to": "trapezoid_cutoff"
565+ },
566+ {
567+ "from": "trapezoid_cutoff",
568+ "to": "wh"
569+ },
570+ {
571+ "from": "wh",
572+ "to": "wh_cutoff"
573+ },
574+ {
575+ "from": "wh_cutoff",
576+ "to": "hourly_ms"
577+ }
578+ ],
579+ "entryOperator": "trapezoid",
580+ "outputMappings": {
581+ "hourly_ms": {
582+ "o1": "o1"
583+ }
584+ }
585+ },
586+ {
587+ "id": "output",
588+ "type": "Output",
589+ "portTypes": [
590+ "number"
591+ ]
592+ }
593+ ],
594+ "connections": [
595+ {
596+ "from": "input",
597+ "to": "hi_input_cutoff",
598+ "fromPort": "o1",
599+ "toPort": "i1"
600+ },
601+ {
602+ "from": "input",
603+ "to": "lo_input_cutoff",
604+ "fromPort": "o2",
605+ "toPort": "i1"
606+ },
607+ {
608+ "from": "hi_input_cutoff",
609+ "to": "hiresampler"
610+ },
611+ {
612+ "from": "lo_input_cutoff",
613+ "to": "loresampler"
614+ },
615+ {
616+ "from": "hiresampler",
617+ "to": "hiresampler_cutoff"
618+ },
619+ {
620+ "from": "hiresampler_cutoff",
621+ "to": "power1"
622+ },
623+ {
624+ "from": "loresampler",
625+ "to": "loresampler_cutoff"
626+ },
627+ {
628+ "from": "loresampler_cutoff",
629+ "to": "power2"
630+ },
631+ {
632+ "from": "power1",
633+ "to": "power1_cutoff"
634+ },
635+ {
636+ "from": "power2",
637+ "to": "power2_cutoff"
638+ },
639+ {
640+ "from": "power1_cutoff",
641+ "to": "total_power",
642+ "fromPort": "o1",
643+ "toPort": "i1"
644+ },
645+ {
646+ "from": "power2_cutoff",
647+ "to": "total_power",
648+ "fromPort": "o1",
649+ "toPort": "i2"
650+ },
651+ {
652+ "from": "total_power",
653+ "to": "total_power_cutoff"
654+ },
655+ {
656+ "from": "total_power_cutoff",
657+ "to": "hourly"
658+ },
659+ {
660+ "from": "hourly",
661+ "to": "output",
662+ "fromPort": "o1",
663+ "toPort": "i1"
664+ }
665+ ],
666+ "entryOperator": "input",
667+ "output": {
668+ "output": [
669+ "o1"
670+ ]
671+ },
672+ "title": "Power Consumption Monitor with Multiple Averages",
673+ "description": "Calculates hourly power consumption from two current inputs"
674+ })" ;
675+
676+ Program program (program_json);
677+
678+ WHEN (" Processing messages" ) {
679+ int t = 5000 ;
680+ for (int h = 1 ; h <= 20 ; h++) {
681+ ProgramMsgBatch final_batch;
682+ // int iterations = 0;
683+ if (h % 2 == 1 ) {
684+ while (final_batch.size () == 0 ) {
685+ program.receive ({t, NumberData{30.23 }}, " i1" );
686+ final_batch = program.receive ({t, NumberData{10.5802 }}, " i2" );
687+ t = t + 5000 ;
688+ // iterations++;
689+ }
690+ } else {
691+ while (final_batch.size () == 0 ) {
692+ program.receive ({t, NumberData{0.01 }}, " i1" );
693+ final_batch = program.receive ({t, NumberData{0.01 }}, " i2" );
694+ t = t + 5000 ;
695+ // iterations++;
696+ }
697+ }
698+
699+ if (final_batch.size () == 1 && h % 2 == 1 ) {
700+ const auto * out_msg = dynamic_cast <const Message<NumberData>*>(final_batch[" output" ][" o1" ].back ().get ());
701+ REQUIRE (out_msg->data .value > 0.0 );
702+ /* std::cout << " time " << out_msg->time << std::endl;
703+ std::cout << " value " << out_msg->data.value << std::endl;
704+ std::cout << " iterations " << iterations << std::endl;
705+ std::cout << " ----------------------------- " << std::endl;*/
706+ } else if (final_batch.size () == 1 && h % 2 == 0 ) {
707+ const auto * out_msg = dynamic_cast <const Message<NumberData>*>(final_batch[" output" ][" o1" ].back ().get ());
708+ REQUIRE (out_msg->data .value == 0.0 );
709+ /* std::cout << " time " << out_msg->time << std::endl;
710+ std::cout << " value " << out_msg->data.value << std::endl;
711+ std::cout << " iterations " << iterations << std::endl;
712+ std::cout << " ----------------------------- " << std::endl;*/
713+ } else {
714+ FAIL (true );
715+ }
716+ }
717+ }
718+ }
719+ }
720+
444721SCENARIO (" Program handles Pipeline serialization" , " [program][pipeline]" ) {
445722 GIVEN (" A program with a stateful Pipeline" ) {
446723 std::string program_json = R"( {
0 commit comments