Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api-contracts/dispatcher/dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ enum ActionType {
START_GET_GROUP_KEY = 2;
}

message WorkflowGraphLayer {
repeated string task_names = 1;
}


message AssignedAction {
// the tenant id
string tenant_id = 1;
Expand Down Expand Up @@ -195,6 +200,8 @@ message AssignedAction {

// (optional) the key of the event that triggered this workflow run (if any)
optional string triggering_event_key = 23;

repeated WorkflowGraphLayer workflow_graph = 24;
}

message WorkerListenRequest {
Expand Down
3 changes: 3 additions & 0 deletions api-contracts/v1/shared/trigger.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@ message TriggerWorkflowRequest {

// (optional) the desired worker labels for the workflow run, which will be used to determine which workers can pick up the workflow's tasks. if not set, defaults to an empty set of labels, which means any worker can pick up the tasks.
map<string, DesiredWorkerLabels> desired_worker_labels = 10;

// (optional) workflow run ids of parent tasks whose outputs should be injected into this task's parents map. used by durable dag orchestration.
repeated string dag_parent_workflow_run_ids = 11;
}
4 changes: 4 additions & 0 deletions api-contracts/v1/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ enum RunStatus {
EVICTED = 5;
}

message ChildTaskList {
repeated string child_task_name = 1;
}

// CreateWorkflowVersionRequest represents options to create a workflow version.
message CreateWorkflowVersionRequest {
Expand All @@ -111,6 +114,7 @@ message CreateWorkflowVersionRequest {
repeated Concurrency concurrency_arr = 12; // (optional) the workflow concurrency options
repeated DefaultFilter default_filters = 13; // (optional) the default filters for the workflow
optional bytes input_json_schema = 14; // (optional) the JSON schema for the workflow input
map<string, ChildTaskList> task_name_to_child_names = 15;
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE "WorkflowVersion" ADD COLUMN "taskNameToChildTaskNames" JSONB;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE "WorkflowVersion" DROP COLUMN "taskNameToChildTaskNames";
-- +goose StatementEnd
39 changes: 27 additions & 12 deletions internal/services/admin/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,19 +897,34 @@ func getCreateWorkflowOpts(req *contracts.CreateWorkflowVersionRequest) (*v1.Cre
})
}

taskNameToChildTaskNames := make(map[string][]string)

for taskName, childNames := range req.TaskNameToChildNames {
if childNames == nil {
continue
}

if len(childNames.ChildTaskName) == 0 {
continue
}

taskNameToChildTaskNames[taskName] = childNames.ChildTaskName
}

return &v1.CreateWorkflowVersionOpts{
Name: req.Name,
Concurrency: concurrency,
Description: &req.Description,
EventTriggers: req.EventTriggers,
CronTriggers: req.CronTriggers,
CronInput: cronInput,
Tasks: tasks,
OnFailure: onFailureTask,
Sticky: sticky,
DefaultPriority: req.DefaultPriority,
DefaultFilters: defaultFilters,
InputJsonSchema: req.InputJsonSchema,
Name: req.Name,
Concurrency: concurrency,
Description: &req.Description,
EventTriggers: req.EventTriggers,
CronTriggers: req.CronTriggers,
CronInput: cronInput,
Tasks: tasks,
OnFailure: onFailureTask,
Sticky: sticky,
DefaultPriority: req.DefaultPriority,
DefaultFilters: defaultFilters,
InputJsonSchema: req.InputJsonSchema,
TaskNameToChildTaskNames: taskNameToChildTaskNames,
}, nil
}

Expand Down
Loading
Loading