Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkgs/core/schemas/0050_tables_definitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ create table pgflow.steps (
opt_start_delay int,
required_input_pattern jsonb, -- JSON pattern for @> containment check (if)
forbidden_input_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
constraint required_input_pattern_is_object check (
required_input_pattern is null or jsonb_typeof(required_input_pattern) = 'object'
),
constraint forbidden_input_pattern_is_object check (
forbidden_input_pattern is null or jsonb_typeof(forbidden_input_pattern) = 'object'
),
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
when_exhausted text not null default 'fail', -- What to do when handler fails after retries
created_at timestamptz not null default now(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
-- Create index "idx_step_states_skipped" to table: "step_states"
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
-- Modify "steps" table
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail';
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "forbidden_input_pattern_is_object" CHECK ((forbidden_input_pattern IS NULL) OR (jsonb_typeof(forbidden_input_pattern) = 'object'::text)), ADD CONSTRAINT "required_input_pattern_is_object" CHECK ((required_input_pattern IS NULL) OR (jsonb_typeof(required_input_pattern) = 'object'::text)), ADD CONSTRAINT "when_exhausted_is_valid" CHECK (when_exhausted = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "required_input_pattern" jsonb NULL, ADD COLUMN "forbidden_input_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_exhausted" text NOT NULL DEFAULT 'fail';
-- Modify "_compare_flow_shapes" function
CREATE OR REPLACE FUNCTION "pgflow"."_compare_flow_shapes" ("p_local" jsonb, "p_db" jsonb) RETURNS text[] LANGUAGE plpgsql STABLE SET "search_path" = '' AS $BODY$
DECLARE
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,4 +18,4 @@ h1:Hk7WDNVqZP9iYz/vW2Dqe/G3qKdw6i2FVIYl05jn6Kk=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260214181656_pgflow_step_conditions.sql h1:uLPoOD/hPrerMACS6CThb7t7T5LKLpMMrdFXXi4ZQ5s=
20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE=
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,28 @@ select pgflow.add_step('order_test', 'step_a');
select pgflow.add_step('order_test', 'step_b', ARRAY['step_a']);
select pgflow.add_step('order_test', 'step_c', ARRAY['step_b']);

-- Setup capture table for reliable insertion order tracking
create temporary table skip_event_log (
seq bigserial primary key,
run_id uuid not null,
step_slug text not null
);

create or replace function pg_temp.capture_skip_event()
returns trigger language plpgsql as $$
begin
if new.payload->>'event_type' = 'step:skipped' then
insert into skip_event_log(run_id, step_slug)
values ((new.payload->>'run_id')::uuid, new.payload->>'step_slug');
end if;
return new;
end;
$$;

create trigger capture_skip_event_trigger
after insert on realtime.messages
for each row execute function pg_temp.capture_skip_event();

-- Start flow
with flow as (
select * from pgflow.start_flow('order_test', '{}'::jsonb)
Expand All @@ -33,28 +55,14 @@ select is(
);

-- Test 2: Events should be in dependency order (A before B before C)
with ordered_events as (
select
inserted_at,
payload->>'step_slug' as step_slug,
row_number() over (order by inserted_at, payload->>'step_slug') as event_order
from realtime.messages
where payload->>'event_type' = 'step:skipped'
and payload->>'run_id' = (select run_id::text from run_ids)
),
step_a_event as (
select event_order from ordered_events where step_slug = 'step_a'
),
step_b_event as (
select event_order from ordered_events where step_slug = 'step_b'
),
step_c_event as (
select event_order from ordered_events where step_slug = 'step_c'
)
select ok(
(select event_order from step_a_event) < (select event_order from step_b_event)
AND (select event_order from step_b_event) < (select event_order from step_c_event),
'Events must be in dependency order (A -> B -> C)'
-- Uses trigger-based capture for reliable ordering (no timestamp tie-break issues)
select results_eq(
$$ select step_slug
from skip_event_log
where run_id = (select run_id from run_ids)
order by seq $$,
$$ values ('step_a'), ('step_b'), ('step_c') $$,
'Should broadcast step:skipped in dependency order (step_a before step_b before step_c)'
);

-- Clean up
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Test: _cascade_force_skip_steps - Cascade through multiple DAG levels
-- Verifies skipping A cascades through A -> B -> C chain
begin;
select plan(8);
select plan(9);

-- Reset database and create a flow: A -> B -> C
select pgflow_tests.reset_db();
Expand All @@ -17,10 +17,14 @@ with flow as (
select run_id into temporary run_ids from flow;

-- Skip step_a (should cascade to step_b and step_c)
select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'handler_failed'
select is(
(select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'handler_failed'
)),
3::int,
'Should return count of 3 skipped steps (step_a + step_b + step_c)'
);

-- Test 1: step_a should be skipped with handler_failed reason
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Test: _cascade_force_skip_steps - Cascade to single dependent
-- Verifies skipping a step cascades to its direct dependent
begin;
select plan(7);
select plan(8);

-- Reset database and create a flow: A -> B
select pgflow_tests.reset_db();
Expand All @@ -16,10 +16,14 @@ with flow as (
select run_id into temporary run_ids from flow;

-- Skip step_a (should cascade to step_b)
select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'condition_unmet'
select is(
(select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'condition_unmet'
)),
2::int,
'Should return count of 2 skipped steps (step_a + step_b)'
);

-- Test 1: step_a should be skipped
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
\set ON_ERROR_STOP on
\set QUIET on

begin;
select plan(5);

select pgflow_tests.reset_db();

select pgflow.create_flow('idempotent_test');
select pgflow.add_step('idempotent_test', 'map_step', '{}', max_attempts => 0, step_type => 'map', when_exhausted => 'skip');
select pgflow.add_step('idempotent_test', 'dependent_step', ARRAY['map_step']);

select pgflow.start_flow('idempotent_test', '[1, 2, 3]'::jsonb);

with tasks as (
select message_id, task_index
from pgflow.step_tasks
where flow_slug = 'idempotent_test' and step_slug = 'map_step'
order by task_index
)
select pgflow.start_tasks('idempotent_test', array[(select message_id from tasks where task_index = 0)::bigint], pgflow_tests.ensure_worker('idempotent_test'));

create temporary table test_run as
select run_id from pgflow.runs where flow_slug = 'idempotent_test';

select is(
(select pgflow._cascade_force_skip_steps(
(select run_id from test_run),
'map_step',
'condition_unmet'
)),
2::int,
'First call should skip 2 steps (map_step + dependent_step)'
);

create temporary table after_first as
select
(select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)) as remaining_steps,
(select count(*) from realtime.messages
where payload->>'event_type' = 'step:skipped'
and (payload->>'run_id')::uuid = (select run_id from test_run)) as event_count,
(select count(*) from pgmq.a_idempotent_test) as archive_count;

select is(
(select pgflow._cascade_force_skip_steps(
(select run_id from test_run),
'map_step',
'condition_unmet'
)),
0::int,
'Second call should return 0 (no new skips)'
);

select is(
(select remaining_steps from pgflow.runs where run_id = (select run_id from test_run)),
(select remaining_steps from after_first),
'remaining_steps should be unchanged after second call'
);

select is(
(select count(*) from realtime.messages
where payload->>'event_type' = 'step:skipped'
and (payload->>'run_id')::uuid = (select run_id from test_run)),
(select event_count from after_first),
'step:skipped event count should be unchanged after second call'
);

select is(
(select count(*) from pgmq.a_idempotent_test),
(select archive_count from after_first),
'Archive count should be unchanged after second call'
);

select * from finish();
rollback;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- Test: _cascade_force_skip_steps - Single step skip (base case)
-- Verifies the function can skip a single step without dependencies
begin;
select plan(5);
select plan(6);

-- Reset database and create a simple flow with no dependencies
select pgflow_tests.reset_db();
Expand All @@ -24,10 +24,14 @@ select is(
);

-- Skip step_a
select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'condition_unmet'
select is(
(select pgflow._cascade_force_skip_steps(
(select run_id from run_ids),
'step_a',
'condition_unmet'
)),
1::int,
'Should return count of 1 skipped step'
);

-- Test 2: step_a should now have status 'skipped'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
\set ON_ERROR_STOP on
\set QUIET on

begin;
select plan(4);
select pgflow_tests.reset_db();

select pgflow.create_flow('invalid_pattern_test');

-- Test 1: required_input_pattern as array should fail
select throws_ok(
$$ select pgflow.add_step('invalid_pattern_test', 'step_array', required_input_pattern => '[]'::jsonb) $$,
'new row for relation "steps" violates check constraint "required_input_pattern_is_object"',
'Should reject array for required_input_pattern'
);

-- Test 2: required_input_pattern as string should fail
select throws_ok(
$$ select pgflow.add_step('invalid_pattern_test', 'step_string', required_input_pattern => '"invalid"'::jsonb) $$,
'new row for relation "steps" violates check constraint "required_input_pattern_is_object"',
'Should reject string for required_input_pattern'
);

-- Test 3: forbidden_input_pattern as array should fail
select throws_ok(
$$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_array', forbidden_input_pattern => '[]'::jsonb) $$,
'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"',
'Should reject array for forbidden_input_pattern'
);

-- Test 4: forbidden_input_pattern as string should fail
select throws_ok(
$$ select pgflow.add_step('invalid_pattern_test', 'step_forbidden_string', forbidden_input_pattern => '"invalid"'::jsonb) $$,
'new row for relation "steps" violates check constraint "forbidden_input_pattern_is_object"',
'Should reject string for forbidden_input_pattern'
);

select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(2);
select plan(5);
select pgflow_tests.reset_db();

-- Test: Patterns with same value should match
Expand Down Expand Up @@ -38,5 +38,59 @@ select is(
'Different requiredInputPattern values should be detected'
);

-- Test: Different forbiddenInputPattern should be detected
select is(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "admin"}}}
]
}'::jsonb,
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"role": "user"}}, "forbiddenInputPattern": {"defined": true, "value": {"role": "banned"}}}
]
}'::jsonb
),
ARRAY['Step at index 0: forbiddenInputPattern differs ''{"value": {"role": "admin"}, "defined": true}'' vs ''{"value": {"role": "banned"}, "defined": true}''']::text[],
'Different forbiddenInputPattern values should be detected'
);

-- Test: forbiddenInputPattern defined transition should be detected
select is(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}}
]
}'::jsonb,
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": true, "value": {"admin": true}}}
]
}'::jsonb
),
ARRAY['Step at index 0: forbiddenInputPattern differs ''{"defined": false}'' vs ''{"value": {"admin": true}, "defined": true}''']::text[],
'forbiddenInputPattern defined transition should be detected'
);

-- Test: requiredInputPattern defined transition should be detected
select is(
pgflow._compare_flow_shapes(
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": false}, "forbiddenInputPattern": {"defined": false}}
]
}'::jsonb,
'{
"steps": [
{"slug": "step1", "stepType": "single", "dependencies": [], "whenUnmet": "skip", "whenExhausted": "fail", "requiredInputPattern": {"defined": true, "value": {"status": "ready"}}, "forbiddenInputPattern": {"defined": false}}
]
}'::jsonb
),
ARRAY['Step at index 0: requiredInputPattern differs ''{"defined": false}'' vs ''{"value": {"status": "ready"}, "defined": true}''']::text[],
'requiredInputPattern defined transition should be detected'
);

select finish();
rollback;
Loading
Loading