Skip to content

Commit 6630477

Browse files
author
James N.
committed
add workflow demo
1 parent 12d694d commit 6630477

28 files changed

+13040
-1274
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ mcp.json
99
*.py[cod]
1010
*$py.class
1111
videos
12+
**/checkpoints
13+
**/reference
14+
**/examples
1215
**/fastmcp
1316
# C extensions
1417
*.so

agentic_ai/applications/uv.lock

Lines changed: 1274 additions & 1274 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agentic_ai/workflow/.env.sample

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Azure OpenAI Configuration
2+
AZURE_OPENAI_API_KEY=your-azure-openai-api-key-here
3+
AZURE_OPENAI_CHAT_DEPLOYMENT=your-deployment-name
4+
AZURE_OPENAI_ENDPOINT=https://your-resource-name.openai.azure.com/
5+
AZURE_OPENAI_API_VERSION=2024-10-01-preview
6+
7+
# MCP Server Configuration
8+
MCP_SERVER_URI=http://localhost:8000
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Workflow Architecture
2+
3+
The Agent Framework workflow system is a **directed-graph execution engine** modeled after Google's [Pregel](https://research.google/pubs/pub36726/) distributed graph computation model, adapted for orchestrating AI agents, tools, and arbitrary compute steps in a type-safe, checkpointable, and observable manner.
4+
5+
## Core abstractions
6+
7+
| Component | Purpose |
8+
|-----------|---------|
9+
| **Executor** (`_executor.py`) | A unit of work with typed handlers that process messages. Can be a class (subclassing `Executor`) or a decorated function (`@executor`). Executors define what input types they accept and what they emit. |
10+
| **Edge / EdgeGroup** (`_edge.py`) | Defines how messages flow between executors. Supports single, fan-out (1→N), fan-in (N→1 aggregation), and switch/case routing patterns. |
11+
| **WorkflowContext** (`_workflow_context.py`) | Injected into each executor handler; provides `send_message()`, `yield_output()`, state persistence APIs (`set_state`, `get_state`, `set_shared_state`). Enforces type safety through generic parameters. |
12+
| **Runner** (`_runner.py`) | Orchestrates execution in synchronized **supersteps**: delivers messages, invokes executors concurrently, drains events, creates checkpoints. Runs until the graph becomes idle (no pending messages). |
13+
| **Workflow** (`_workflow.py`) | The user-facing API that wraps the Runner and provides entry points (`run()`, `run_stream()`, `run_from_checkpoint()`). Built via `WorkflowBuilder`. |
14+
15+
---
16+
17+
## Execution model: Pregel-style supersteps
18+
19+
### 1. Initialization phase
20+
21+
- User calls `workflow.run(initial_message)`.
22+
- The starting executor receives the message and runs its handler.
23+
- Handler can emit messages via `ctx.send_message()` or final outputs via `ctx.yield_output()`.
24+
- All emitted messages are queued in the `RunnerContext`.
25+
26+
### 2. Superstep iteration
27+
28+
- The Runner **drains** all pending messages from the queue.
29+
- Messages are routed through `EdgeRunner` implementations based on edge topology:
30+
- **SingleEdgeRunner**: Delivers to one target if type and condition match.
31+
- **FanOutEdgeRunner**: Broadcasts to multiple targets or selects a subset dynamically.
32+
- **FanInEdgeRunner**: Buffers messages from multiple sources; delivers aggregated list when all sources have sent.
33+
- **SwitchCaseEdgeRunner**: Evaluates predicates and routes to the first matching case.
34+
- All deliverable messages invoke their target executors **concurrently** (via `asyncio.gather`).
35+
- Each executor processes its messages and may emit new messages or outputs.
36+
- At the end of the superstep:
37+
- Events (outputs, custom events) are streamed to the caller.
38+
- A checkpoint is optionally created (if `CheckpointStorage` is configured).
39+
- The Runner checks if new messages are pending; if yes, starts the next superstep.
40+
41+
### 3. Convergence / termination
42+
43+
- The workflow runs until **no messages remain** or the **max iteration limit** is hit.
44+
- Final state is emitted as a `WorkflowStatusEvent`:
45+
- `IDLE`: Clean completion, no pending requests.
46+
- `IDLE_WITH_PENDING_REQUESTS`: Waiting for external input (via `RequestInfoExecutor`).
47+
- `FAILED`: An executor raised an exception.
48+
49+
---
50+
51+
## Message routing and type safety
52+
53+
- Each executor declares **input types** via handler parameter annotations (`text: str`, `data: MyModel`, etc.).
54+
- `WorkflowContext[T_Out]` declares the **output message type** the executor can emit.
55+
- `WorkflowContext[T_Out, T_W_Out]` adds workflow-level output types (for `yield_output`).
56+
- Edge runners use `executor.can_handle(message_data)` to enforce type compatibility at runtime.
57+
- Routing predicates (`edge.should_route(data)`) and selection functions (`selection_func(data, targets)`) allow dynamic control flow.
58+
59+
---
60+
61+
## State and persistence
62+
63+
| Layer | Mechanism |
64+
|-------|-----------|
65+
| **Executor-local state** | `ctx.set_state(key, value)` / `ctx.get_state(key)` stores per-executor JSON blobs in the `RunnerContext`. Executors can override `snapshot_state()` / `restore_state()` for custom serialization. |
66+
| **Shared state** | `WorkflowContext.set_shared_state(key, value)` writes to a `SharedState` dictionary visible to all executors. Protected by an async lock to prevent race conditions. |
67+
| **Checkpoints** | After each superstep, the Runner calls `_auto_snapshot_executor_states()`, then serializes: <br> - Pending messages per executor <br> - Shared state dictionary <br> - Executor state snapshots <br> - Iteration counter / metadata <br><br> `CheckpointStorage` (in-memory, file, Redis, Cosmos DB) persists `WorkflowCheckpoint` objects. |
68+
| **Restoration** | `workflow.run_from_checkpoint(checkpoint_id)` rehydrates the full runner context, re-injects shared state, restores iteration count, and validates graph topology (via a hash of the executor/edge structure). |
69+
70+
Checkpoints are **delta-neutral**: the graph structure itself is not serialized, only the runtime state. You must rebuild the workflow with the same topology before restoring.
71+
72+
---
73+
74+
## Observability and tracing
75+
76+
- **OpenTelemetry integration**: The workflow creates a root span (`workflow_run`) that encompasses all supersteps. Each executor invocation and edge delivery gets nested spans.
77+
- **Trace context propagation**: Messages carry `trace_contexts` and `source_span_ids` to link spans across async boundaries (following W3C Trace Context).
78+
- **Event streaming**: The Runner emits `WorkflowEvent` subclasses:
79+
- `WorkflowStartedEvent`, `WorkflowStatusEvent` (lifecycle).
80+
- `WorkflowOutputEvent` (from `yield_output`).
81+
- `RequestInfoEvent` (external input requests).
82+
- Custom events via `ctx.add_event()`.
83+
- Events are streamed live via `run_stream()` or collected in `WorkflowRunResult` for batch runs.
84+
85+
---
86+
87+
## Composition patterns
88+
89+
1. **Nested workflows**: `WorkflowExecutor` wraps a child workflow as an executor. When invoked, it runs the child to completion and processes outputs.
90+
2. **Human-in-the-loop**: `RequestInfoExecutor` emits `RequestInfoEvent`, transitions the workflow to `IDLE_WITH_PENDING_REQUESTS`, and waits for external responses via `send_responses()`.
91+
3. **Multi-agent teams**: `MagenticOrchestratorExecutor` (in `_magentic.py`) wraps multiple agents, manages broadcast/targeted communication, and snapshots each participant's conversation history.
92+
93+
---
94+
95+
## Key design decisions
96+
97+
- **Type-driven routing**: Edge runners and executors use Python type annotations to enforce contracts at runtime, providing early feedback for wiring errors.
98+
- **Separation of data/control planes**: Executor invocations and message passing happen "under the hood"; only workflow-level events (outputs, requests) are exposed to callers. This keeps the event stream clean and hides internal coordination.
99+
- **Checkpointing by convention**: Executors opt into persistence by implementing `snapshot_state()` or exposing a `state` attribute. The framework handles serialization (including Pydantic models and dataclasses) transparently.
100+
- **Graph immutability**: Once built, workflows are immutable. This enables safe checkpoint restoration and parallel invocations (if you construct separate `Workflow` instances).
101+
- **Concurrency within supersteps**: All deliverable messages in a superstep execute concurrently. This parallelizes work but requires shared state to be protected (via `SharedState`'s async lock).
102+
103+
---
104+
105+
## Validation and safety
106+
107+
- **Graph validation**: `validate_workflow_graph()` (in `_validation.py`) checks for unreachable executors, missing start nodes, and cycles (for non-cyclic workflows).
108+
- **Concurrent execution guard**: The `Workflow` class prevents multiple `run()` calls on the same instance to avoid state corruption.
109+
- **Max iterations**: Prevents infinite loops by bounding superstep counts (default 100, configurable).
110+
- **Graph signature hashing**: Before restoring a checkpoint, the Runner compares a hash of the workflow topology to the checkpoint metadata to detect structural changes.
111+
112+
---
113+
114+
## Sample execution trace
115+
116+
```
117+
User calls workflow.run("hello world")
118+
119+
Workflow emits WorkflowStartedEvent, WorkflowStatusEvent(IN_PROGRESS)
120+
121+
Executor "upper_case_executor" receives "hello world"
122+
→ Handler: to_upper_case(text: str, ctx: WorkflowContext[str])
123+
→ Calls ctx.send_message("HELLO WORLD")
124+
→ Message queued
125+
126+
Runner drains messages → SingleEdgeRunner delivers to "reverse_text_executor"
127+
128+
Executor "reverse_text_executor" receives "HELLO WORLD"
129+
→ Handler: reverse_text(text: str, ctx: WorkflowContext[Never, str])
130+
→ Calls ctx.yield_output("DLROW OLLEH")
131+
→ WorkflowOutputEvent emitted
132+
133+
No more messages → Workflow emits WorkflowStatusEvent(IDLE)
134+
135+
workflow.run() returns WorkflowRunResult([WorkflowOutputEvent("DLROW OLLEH"), ...])
136+
```
137+
138+
---
139+
140+
## Additional references
141+
142+
- Full workflow builder API: `WorkflowBuilder` in `_workflow.py`.
143+
- Edge runner implementations: `_edge_runner.py`.
144+
- Checkpoint encoding: `_runner_context.py` (`_encode_checkpoint_value`, `_decode_checkpoint_value`).
145+
- Magentic multi-agent orchestration: `_magentic.py`.
146+
147+
This architecture balances **expressiveness** (flexible routing, composition), **type safety** (runtime contract enforcement), **observability** (OpenTelemetry spans, event streams), and **durability** (checkpointing for long-running workflows), making it suitable for both simple pipelines and complex multi-agent systems.

0 commit comments

Comments
 (0)