diff --git a/pkgs/core/schemas/0050_tables_definitions.sql b/pkgs/core/schemas/0050_tables_definitions.sql index 68a6d2756..f39446e54 100644 --- a/pkgs/core/schemas/0050_tables_definitions.sql +++ b/pkgs/core/schemas/0050_tables_definitions.sql @@ -26,6 +26,12 @@ create table pgflow.steps ( opt_start_delay int, required_input_pattern jsonb, -- JSON pattern for @> containment check (if) forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot) + constraint required_input_pattern_is_object check ( + required_input_pattern is null or jsonb_typeof(required_input_pattern) = 'object' + ), + constraint forbidden_input_pattern_is_object check ( + forbidden_input_pattern is null or jsonb_typeof(forbidden_input_pattern) = 'object' + ), when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default) when_exhausted text not null default 'fail', -- What to do when handler fails after retries created_at timestamptz not null default now(), diff --git a/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql index 1214dbeeb..4334ea2ab 100644 --- a/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260214181656_pgflow_step_conditions.sql @@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp -- Create index "idx_step_states_skipped" to table: "step_states" CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); -- Modify "steps" table -ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail'; +ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "forbidden_input_pattern_is_object" CHECK ((forbidden_input_pattern IS NULL) OR (jsonb_typeof(forbidden_input_pattern) = 'object'::text)), ADD CONSTRAINT "required_input_pattern_is_object" CHECK ((required_input_pattern IS NULL) OR (jsonb_typeof(required_input_pattern) = 'object'::text)), ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail'; -- Modify "_compare_flow_shapes" function CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$ DECLARE diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 685d7c457..e9707016b 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk= +h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s= +20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE= diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql index 06083db2b..e9506faa4 100644 --- a/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/broadcast_order.test.sql @@ -10,6 +10,28 @@ select pgflow.add_step('order_test', 'step_a'); select pgflow.add_step('order_test', 'step_b', ARRAY['step_a']); select pgflow.add_step('order_test', 'step_c', ARRAY['step_b']); +-- Setup capture table for reliable insertion order tracking +create temporary table skip_event_log ( + seq bigserial primary key, + run_id uuid not null, + step_slug text not null +); + +create or replace function pg_temp.capture_skip_event() +returns trigger language plpgsql as $$ +begin + if new.payload->>'event_type' = 'step:skipped' then + insert into skip_event_log(run_id, step_slug) + values ((new.payload->>'run_id')::uuid, new.payload->>'step_slug'); + end if; + return new; +end; +$$; + +create trigger capture_skip_event_trigger +after insert on realtime.messages +for each row execute function pg_temp.capture_skip_event(); + -- Start flow with flow as ( select * from pgflow.start_flow('order_test', '{}'::jsonb) @@ -33,28 +55,14 @@ select is( ); -- Test 2: Events should be in dependency order (A before B before C) -with ordered_events as ( - select - inserted_at, - payload->>'step_slug' as step_slug, - row_number() over (order by inserted_at, payload->>'step_slug') as event_order - from realtime.messages - where payload->>'event_type' = 'step:skipped' - and payload->>'run_id' = (select run_id::text from run_ids) -), -step_a_event as ( - select event_order from ordered_events where step_slug = 'step_a' -), -step_b_event as ( - select event_order from ordered_events where step_slug = 'step_b' -), -step_c_event as ( - select event_order from ordered_events where step_slug = 'step_c' -) -select ok( - (select event_order from step_a_event) < (select event_order from step_b_event) - AND (select event_order from step_b_event) < (select event_order from step_c_event), - 'Events must be in dependency order (A -> B -> C)' +-- Uses trigger-based capture for reliable ordering (no timestamp tie-break issues) +select results_eq( + $$ select step_slug + from skip_event_log + where run_id = (select run_id from run_ids) + order by seq $$, + $$ values ('step_a'), ('step_b'), ('step_c') $$, + 'Should broadcast step:skipped in dependency order (step_a before step_b before step_c)' ); -- Clean up diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql index 5faa21cff..5d0696b75 100644 --- a/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_through_multiple_levels.test.sql @@ -1,7 +1,7 @@ -- Test: _cascade_force_skip_steps - Cascade through multiple DAG levels -- Verifies skipping A cascades through A -> B -> C chain begin; -select plan(8); +select plan(9); -- Reset database and create a flow: A -> B -> C select pgflow_tests.reset_db(); @@ -17,10 +17,14 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (should cascade to step_b and step_c) -select pgflow._cascade_force_skip_steps( - (select run_id from run_ids), - 'step_a', - 'handler_failed' +select is( + (select pgflow._cascade_force_skip_steps( + (select run_id from run_ids), + 'step_a', + 'handler_failed' + )), + 3::int, + 'Should return count of 3 skipped steps (step_a + step_b + step_c)' ); -- Test 1: step_a should be skipped with handler_failed reason diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql index e6be5c41f..d7ae02f7c 100644 --- a/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/cascade_to_single_dependent.test.sql @@ -1,7 +1,7 @@ -- Test: _cascade_force_skip_steps - Cascade to single dependent -- Verifies skipping a step cascades to its direct dependent begin; -select plan(7); +select plan(8); -- Reset database and create a flow: A -> B select pgflow_tests.reset_db(); @@ -16,10 +16,14 @@ with flow as ( select run_id into temporary run_ids from flow; -- Skip step_a (should cascade to step_b) -select pgflow._cascade_force_skip_steps( - (select run_id from run_ids), - 'step_a', - 'condition_unmet' +select is( + (select pgflow._cascade_force_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' + )), + 2::int, + 'Should return count of 2 skipped steps (step_a + step_b)' ); -- Test 1: step_a should be skipped diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/idempotent_second_call.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/idempotent_second_call.test.sql new file mode 100644 index 000000000..a509c00ba --- /dev/null +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/idempotent_second_call.test.sql @@ -0,0 +1,75 @@ +\set ON_ERROR_STOP on +\set QUIET on + +begin; +select plan(5); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('idempotent_test'); +select pgflow.add_step('idempotent_test', 'map_step', '{}', max_attempts => 0, step_type => 'map', when_exhausted => 'skip'); +select pgflow.add_step('idempotent_test', 'dependent_step', ARRAY['map_step']); + +select pgflow.start_flow('idempotent_test', '[1, 2, 3]'::jsonb); + +with tasks as ( + select message_id, task_index + from pgflow.step_tasks + where flow_slug = 'idempotent_test' and step_slug = 'map_step' + order by task_index +) +select pgflow.start_tasks('idempotent_test', array[(select message_id from tasks where task_index = 0)::bigint], pgflow_tests.ensure_worker('idempotent_test')); + +create temporary table test_run as +select run_id from pgflow.runs where flow_slug = 'idempotent_test'; + +select is( + (select pgflow._cascade_force_skip_steps( + (select run_id from test_run), + 'map_step', + 'condition_unmet' + )), + 2::int, + 'First call should skip 2 steps (map_step + dependent_step)' +); + +create temporary table after_first as +select + (select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)) as remaining_steps, + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and (payload->>'run_id')::uuid = (select run_id from test_run)) as event_count, + (select count(*) from pgmq.a_idempotent_test) as archive_count; + +select is( + (select pgflow._cascade_force_skip_steps( + (select run_id from test_run), + 'map_step', + 'condition_unmet' + )), + 0::int, + 'Second call should return 0 (no new skips)' +); + +select is( + (select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)), + (select remaining_steps from after_first), + 'remaining_steps should be unchanged after second call' +); + +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and (payload->>'run_id')::uuid = (select run_id from test_run)), + (select event_count from after_first), + 'step:skipped event count should be unchanged after second call' +); + +select is( + (select count(*) from pgmq.a_idempotent_test), + (select archive_count from after_first), + 'Archive count should be unchanged after second call' +); + +select * from finish(); +rollback; diff --git a/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql b/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql index e892ef21b..b18c04c71 100644 --- a/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql +++ b/pkgs/core/supabase/tests/_cascade_force_skip_steps/single_step_skip.test.sql @@ -1,7 +1,7 @@ -- Test: _cascade_force_skip_steps - Single step skip (base case) -- Verifies the function can skip a single step without dependencies begin; -select plan(5); +select plan(6); -- Reset database and create a simple flow with no dependencies select pgflow_tests.reset_db(); @@ -24,10 +24,14 @@ select is( ); -- Skip step_a -select pgflow._cascade_force_skip_steps( - (select run_id from run_ids), - 'step_a', - 'condition_unmet' +select is( + (select pgflow._cascade_force_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' + )), + 1::int, + 'Should return count of 1 skipped step' ); -- Test 2: step_a should now have status 'skipped' diff --git a/pkgs/core/supabase/tests/add_step/condition_pattern_invalid_types.test.sql b/pkgs/core/supabase/tests/add_step/condition_pattern_invalid_types.test.sql new file mode 100644 index 000000000..3f75a2054 --- /dev/null +++ b/pkgs/core/supabase/tests/add_step/condition_pattern_invalid_types.test.sql @@ -0,0 +1,39 @@ +\set ON_ERROR_STOP on +\set QUIET on + +begin; +select plan(4); +select pgflow_tests.reset_db(); + +select pgflow.create_flow('invalid_pattern_test'); + +-- Test 1: required_input_pattern as array should fail +select throws_ok( + $$ select pgflow.add_step('invalid_pattern_test', 'step_array', required_input_pattern => '[]'::jsonb) $$, + 'new row for relation "steps" violates check constraint "required_input_pattern_is_object"', + 'Should reject array for required_input_pattern' +); + +-- Test 2: required_input_pattern as string should fail +select throws_ok( + $$ select pgflow.add_step('invalid_pattern_test', 'step_string', required_input_pattern => '"invalid"'::jsonb) $$, + 'new row for relation "steps" violates check constraint "required_input_pattern_is_object"', + 'Should reject string for required_input_pattern' +); + +-- Test 3: forbidden_input_pattern as array should fail +select throws_ok( + $$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_array', forbidden_input_pattern => '[]'::jsonb) $$, + 'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"', + 'Should reject array for forbidden_input_pattern' +); + +-- Test 4: forbidden_input_pattern as string should fail +select throws_ok( + $$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_string', forbidden_input_pattern => '"invalid"'::jsonb) $$, + 'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"', + 'Should reject string for forbidden_input_pattern' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/compare_flow_shapes/pattern_differences.test.sql b/pkgs/core/supabase/tests/compare_flow_shapes/pattern_differences.test.sql index 3d17e9115..ae5ac1ca1 100644 --- a/pkgs/core/supabase/tests/compare_flow_shapes/pattern_differences.test.sql +++ b/pkgs/core/supabase/tests/compare_flow_shapes/pattern_differences.test.sql @@ -1,5 +1,5 @@ begin; -select plan(2); +select plan(5); select pgflow_tests.reset_db(); -- Test: Patterns with same value should match @@ -38,5 +38,59 @@ select is( 'Different requiredInputPattern values should be detected' ); +-- Test: Different forbiddenInputPattern should be detected +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "admin"}}} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "banned"}}} + ] + }'::jsonb + ), + ARRAY['Step at index 0: forbiddenInputPattern differs ''{"value": {"role": "admin"}, "defined": true}'' vs ''{"value": {"role": "banned"}, "defined": true}''']::text[], + 'Different forbiddenInputPattern values should be detected' +); + +-- Test: forbiddenInputPattern defined transition should be detected +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": true, "value": {"admin": true}}} + ] + }'::jsonb + ), + ARRAY['Step at index 0: forbiddenInputPattern differs ''{"defined": false}'' vs ''{"value": {"admin": true}, "defined": true}''']::text[], + 'forbiddenInputPattern defined transition should be detected' +); + +-- Test: requiredInputPattern defined transition should be detected +select is( + pgflow._compare_flow_shapes( + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}} + ] + }'::jsonb, + '{ + "steps": [ + {"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"status": "ready"}}, "forbiddenInputPattern": {"defined": false}} + ] + }'::jsonb + ), + ARRAY['Step at index 0: requiredInputPattern differs ''{"defined": false}'' vs ''{"value": {"status": "ready"}, "defined": true}''']::text[], + 'requiredInputPattern defined transition should be detected' +); + select finish(); rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql index 03944f9fa..1b4502b85 100644 --- a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql @@ -1,5 +1,5 @@ begin; -select plan(4); +select plan(5); select pgflow_tests.reset_db(); @@ -26,6 +26,10 @@ with run as ( ) select run_id into temporary run_ids from run; +-- Capture active message IDs before failure +create temporary table pre_failure_msgs as +select message_id from pgmq.q_dependent_fail_archive; + with started as ( select * from pgflow_tests.read_and_start('dependent_fail_archive', qty => 10) ), @@ -72,15 +76,19 @@ select is( 'run failure should archive all active queue messages' ); -select ok( +-- Verify specific messages were archived +select is( ( select count(*) - from pgmq.a_dependent_fail_archive - ) >= 2, - 'archive queue should contain completed and run-failure archived messages' + from pgmq.a_dependent_fail_archive a + join pre_failure_msgs p on a.message_id = p.message_id + ), + (select count(*)::int from pre_failure_msgs), + 'previously active messages should be in archive' ); drop table if exists run_ids; +drop table if exists pre_failure_msgs; select finish(); rollback;