Skip to content

syv-ai/conductor

Repository files navigation

Conductor logo

Conductor

A reusable, host-agnostic graph execution engine for building DAG-based workflow systems. Register nodes as plain Python functions with type annotations, compile them into a validated execution plan, and run them with streaming events.

Built to be the shared core behind visual flow builders — define nodes once, get backend execution, input validation, and frontend UI metadata for free.

Need a short tour to share with a colleague? See docs/OVERVIEW.md for a one-page architecture summary.

Features

  • Decorator-based node registration@registry.node() turns any function into a validated, UI-renderable node
  • Widget annotationsAnnotated[str, Text(label="Input")] is the single source of truth for validation, execution, and frontend rendering
  • Union-aware type checkingstr | int on either side of an edge is checked alternative-by-alternative; any matching pair passes
  • Display hints — optional node_label / output_labels on GraphNode (mirrored on the FastAPI provider) feed ConnectionList aggregation and host UIs without affecting execution
  • Friendly validation errorsNodeValidationError renders one line per failed field; the original pydantic exception stays on .original
  • Compile-then-execute — structural errors caught before any node runs
  • Eager parallel scheduling — nodes start as soon as their dependencies finish; independent branches run concurrently
  • Retry — per-node max_retries/retry_delay with exponential backoff, or a global RetryConfig
  • Structured error hierarchyNodeValidationError, NodeExecutionError, NodeConnectionError, NodeTimeoutError, and more, all carrying node_id/node_type context
  • Streaming execution — async generator yields events (node_start, node_complete, node_retry, flow_complete, etc.)
  • Shared references — per-instance produce/consume bindings let any node feed any other without drawing an edge — including across for-each region boundaries
  • Conditional branching — SKIPPED sentinel propagates through inactive branches
  • For-each loops — compound node regions with sequential or parallel execution
  • Human-in-the-loopHumanInputRequired pauses to a JSON-serializable checkpoint; resume later
  • Class-based nodesBaseNode ABC for complex nodes needing state
  • FlowStore — side-channel key-value cache for cross-node data sharing
  • Auto-discovery — scan a package to register all @node-decorated functions
  • Extension resolver — protocol for host-app-specific node types (sub-flows, etc.)
  • Decision nodes + edge guards — first-class branching driven by CEL expressions on outgoing edges
  • CEL expression language — sandboxed mini-evaluator (conductor.expr) for guards, loop conditions, idempotency keys, signal correlation
  • Actor metadata — declare who performs each step (system/human/agent/external_service) for auditability and rendering
  • Top-level flow metadatadependencies, triggers, and on_error_default on Flow objects for portable process definitions
  • Per-node timeout and idempotency keytimeout= wraps execution with asyncio.wait_for; idempotency_key= evaluates a CEL expression once and injects a stable key across retries
  • While / until compound — condition-based loops with max_iterations safety; raises LoopRunawayError on cap
  • Subprocess compound — call another flow by id+version, with a SubprocessRegistry and runtime depth cap
  • Compensation / saga — per-node compensation= cascades on failure in reverse topological order
  • Signal / event nodesSignalRequired pauses on a named event with optional correlation and timeout, mirroring HITL
  • YAML / JSON flow formatconductor.flow_format round-trips Flow ↔ YAML/JSON/dict
  • Zero app dependencies — no FastAPI, no database, no auth in the core
  • Standard node libraryconductor-nodes ships text, math, logic, json, regex, decision, while, subprocess, signal, and canonical for-each markers
  • Framework adaptersconductor-providers.react translates conductor graphs to/from ReactFlow JSON; conductor-providers.fastapi mounts an /execute, /execute-stream, /compile, /nodes API

Quick start

Prerequisites

  • Python 3.12+
  • uv package manager

Install

From PyPI (Apache-2.0):

pip install syv-conductor                # core engine — import as `conductor`
pip install syv-conductor-nodes          # standard node library — import as `conductor_nodes`
pip install syv-conductor-providers      # framework adapters — import as `conductor_providers`
pip install "syv-conductor[yaml]"        # optional: YAML/JSON flow format

The PyPI distribution names are prefixed with syv-; Python imports are unchanged.

For local development (uv workspace):

git clone <repo-url> conductor
cd conductor
uv sync
uv run pre-commit install   # strip notebook outputs on commit

Run tests

uv run pytest tests/ -v

Run the demo playground

The demo is split into a FastAPI backend and a Next.js frontend — run them in two terminals.

# terminal 1 — backend (FastAPI, port 8765)
uv sync --group demo
uv run uvicorn demo.app:app --port 8765 --reload

# terminal 2 — frontend (Next.js 15 + shadcn + @xyflow/react, port 3000)
cd demo/web
npm install
npm run dev

Open http://localhost:3000 — drag nodes onto the canvas, connect them, and hit "Run". The frontend proxies /api/* to the backend via next.config.ts.

Usage

1. Create a registry and register nodes

from typing import Annotated
from conductor import NodeRegistry
from conductor.widgets import Text, Textarea, Dropdown, Range, Output

registry = NodeRegistry()

@registry.node("echo", version=1, name="Echo", description="Returns input unchanged")
def echo(
    text: Annotated[str, Text(label="Input", description="Text to echo")],
) -> Annotated[str, Output(label="Output")]:
    return text

@registry.node("uppercase", version=1, name="Uppercase", description="Converts to uppercase")
def uppercase(
    text: Annotated[str, Text(label="Input")],
) -> Annotated[str, Output(label="Result")]:
    return text.upper()

2. Build and execute a flow

from conductor import GraphNode, GraphEdge, compile
from conductor.execution.engine import execute_sync

compiled = compile(
    nodes=[
        GraphNode("n1", "echo@1", {"text": "hello world"}),
        GraphNode("n2", "uppercase@1", None),
    ],
    edges=[
        GraphEdge("e1", "n1", "n2", "result", "text"),
    ],
    registry=registry,
)

results = execute_sync(compiled)
print(results["n2"]["result"])  # "HELLO WORLD"

3. Stream execution events

from conductor.execution.engine import execute

async for event in execute(compiled):
    match event["type"]:
        case "node_start":
            print(f"Starting {event['node_id']}")
        case "node_complete":
            print(f"Done {event['node_id']}: {event['result']}")
        case "node_retry":
            print(f"Retry {event['node_id']} ({event['attempt']}/{event['max_retries']}): {event['error']}")
        case "flow_complete":
            print(f"Flow done: {event['results']}")

Project structure

conductor/
├── packages/
│   └── conductor/                 # Core library
│       ├── pyproject.toml          # pip install syv-conductor
│       └── src/conductor/
│           ├── types.py            # Enums: WidgetType, ResultFormat, NodeCategory
│           ├── widgets.py          # Widget ABC + Text, Dropdown, Range, Output, etc.
│           ├── metadata.py         # InputMetadata, OutputMetadata
│           ├── validation.py       # Pydantic model generation from signatures
│           ├── errors.py           # Exception hierarchy (ConductorError, NodeError, ...)
│           ├── node.py             # BaseNode ABC for class-based nodes
│           ├── _sentinel.py        # SKIPPED singleton
│           ├── registry/
│           │   ├── __init__.py     # NodeRegistry + @node decorator
│           │   ├── definition.py   # NodeDefinition dataclass
│           │   ├── discovery.py    # Auto-discovery via importlib
│           │   └── schema.py       # JSON serialization for frontends
│           ├── graph/
│           │   ├── model.py        # GraphNode (with produces/consumes), GraphEdge
│           │   ├── topology.py     # Topological sort, cycle detection
│           │   ├── compiler.py     # compile() -> CompiledGraph
│           │   ├── type_check.py   # Edge + consume type compatibility
│           │   ├── shared_refs.py  # produce/consume validation, consume_map build
│           │   └── regions.py      # Compound node region discovery
│           ├── execution/
│           │   ├── engine.py       # execute(), execute_sync(), eager scheduler, retry loop
│           │   ├── retry.py        # RetryConfig
│           │   ├── state.py        # FlowRunState
│           │   ├── store.py        # FlowStore (cross-node cache)
│           │   ├── request.py      # NodeExecRequest DTO
│           │   ├── resolver.py     # Input resolution from edges
│           │   ├── results.py      # Result normalization
│           │   ├── events.py       # Event TypedDicts + EventSink
│           │   ├── skip.py         # Skip propagation
│           │   └── checkpoint.py   # FlowCheckpoint for human-in-the-loop
│           └── compound/
│               ├── protocol.py     # CompoundNodeType, Region
│               └── for_each.py     # ForEachNode + FOR_EACH constant
├── packages/
│   ├── conductor/                  # Core library
│   ├── conductor-nodes/            # Standard node library (text, math, logic, json, regex, loop)
│   └── conductor-providers/        # Framework adapters — react today, more later
├── examples/                       # Usage notebooks (7 examples)
├── demo/                           # Interactive playground (FastAPI + browser UI)
├── tests/                          # pytest test suite (430 tests across core, nodes, providers)
├── .github/workflows/              # ci.yml (PR lint + test), docs-audit.yml (weekly)
└── docs/                           # Design specs + MkDocs site (llms.txt ships inside the package)

Concepts

Nodes

Nodes are the building blocks. Register them as decorated functions or BaseNode subclasses:

# Function-based (most nodes)
@registry.node("add", version=1, name="Add", description="Adds two numbers")
def add(
    a: Annotated[float, Text(label="A")],
    b: Annotated[float, Text(label="B")],
) -> Annotated[float, Output(label="Sum")]:
    return a + b

# Multi-output
@registry.node("split", version=1, name="Split", description="Splits text")
def split(
    text: Annotated[str, Text(label="Input")],
) -> tuple[
    Annotated[str, Output(label="First half")],
    Annotated[str, Output(label="Second half")],
]:
    mid = len(text) // 2
    return text[:mid], text[mid:]

# Class-based (complex nodes)
class MyNode(BaseNode):
    node_id = "my-node"
    node_name = "My Node"
    node_description = "A complex node"

    def execute(self, req: NodeExecRequest) -> str:
        return req.inputs["text"].upper()

registry.register_class(MyNode)

Versioning

Nodes are versioned as base_id@version. When you register a new version, the old one becomes deprecated but continues to work for existing flows:

@registry.node("echo", version=2, name="Echo v2", description="Echo with prefix")
def echo_v2(text: Annotated[str, Text(label="Input")], prefix: Annotated[str, Text(label="Prefix")] = "") -> ...:
    return f"{prefix}{text}"

registry.get("echo@1")          # Old version (deprecated)
registry.get("echo@2")          # Current version
registry.get_latest("echo")     # Returns echo@2
registry.is_deprecated("echo@1") # True

FlowStore

Side-channel key-value store for sharing data between nodes outside of edges:

from conductor.execution.store import FlowStore

@registry.node("cache-doc", version=1, name="Cache Document", description="Parses and caches")
def cache_doc(
    file: Annotated[str, Text(label="File")],
    store: FlowStore,  # Auto-injected by the engine
) -> Annotated[str, Output(label="Text")]:
    parsed = expensive_parse(file)
    store.set("parsed_doc", parsed)  # Available to downstream nodes
    return parsed.text

Auto-discovery

Scan a Python package to register all @node-decorated functions:

registry.discover("myapp.nodes")  # Imports all modules, triggering decorators

Extension resolver

Let host applications handle custom node types:

class AppNodeResolver:
    def is_known_type(self, node_type: str) -> bool:
        return node_type.startswith("app:")

    def create_executor(self, node_type: str):
        return MyAppNodeExecutor(node_type)

compiled = compile(nodes, edges, registry, extension_resolver=AppNodeResolver())

Shared references

An alternative to explicit edges for two cases they handle awkwardly: fan-out (one producer feeding many consumers) and cross-region binding (feeding a value into a for-each body from outside the loop). Every shared reference is opt-in per node instance; no changes to the node function are required.

A producer marks an output as shared. Any other node — anywhere in the graph, including inside a for-each body — can bind one of its inputs to that output. Reference identity is (producer_node_id, output_handle); the label is for UI only.

compiled = compile(
    nodes=[
        GraphNode("mapper", "build-map@1", {"seed": "x"},
                  produces={"result": "pseudonym map"}),
        GraphNode("redactor", "redact@1", {"text": "Alice met Bob."},
                  consumes={"mapping": ("mapper", "result")}),
    ],
    edges=[],           # no edge needed
    registry=registry,
)
results = execute_sync(compiled)
print(results["redactor"]["result"])   # "P001-x met P002-x."

Inside a for-each loop a consumer reads the same producer value on every iteration (broadcast, not per-iteration). This is how you inject a system prompt defined once at the top of a flow into an LLM node inside a loop over 1,000 records.

Validated at compile time: the producer must declare the handle in produces, the consumer's input handle must exist, an input cannot be both a consume target and the target of an explicit edge, and cycles through consume bindings are caught alongside edge cycles. Type checking uses the same rules as edges.

In v1, producers must be top-level (cannot sit inside a for-each or other compound region). Consumers can be anywhere.

Resolver precedence, first match wins:

  1. Explicit edge targeting the input
  2. Consume binding (shared reference)
  3. Static data on the node (GraphNode.data)
  4. Widget default (Pydantic)

Full design and rules: docs/shared-references.md. Walkthrough: examples/07_shared_references.ipynb.

Eager parallel execution

The engine schedules nodes eagerly: as soon as all of a node's dependencies finish, its task is dispatched. Independent branches run concurrently without any configuration. Sync node functions are offloaded to asyncio.to_thread, so they don't block the event loop.

  A (0.3s) ──> C (0.3s) ──┐
                           ├──> E (0.3s)
  B (0.3s) ──> D (0.3s) ──┘

Sequential would be 5 × 0.3 s = 1.5 s. Eager execution: A + B in parallel (0.3 s), C + D in parallel (0.3 s), E (0.3 s) = ~0.9 s.

No flag is needed — this is the default and only execution mode.

Retry

Nodes can retry automatically on failure. Configure retries at the node level (preferred) or the flow level:

from conductor.execution.retry import RetryConfig

# Node-level — wins over any global config
@registry.node("fetch-url", version=1, name="Fetch", description="HTTP GET",
               max_retries=3, retry_delay=0.5)
def fetch_url(url: Annotated[str, Text(label="URL")]) -> Annotated[str, Output(label="Body")]:
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.text

# Or flow-level — applies to every node that doesn't set its own max_retries
results = execute_sync(
    compiled,
    retry=RetryConfig(max_retries=2, delay=1.0, backoff_factor=2.0),
)

Delay between attempts is retry_delay * backoff_factor ** (attempt - 1) — e.g., 1s, 2s, 4s, ... with defaults. Node-level retry uses a backoff factor of 2.0.

What gets retried:

  • NodeExecutionError (anything raised from a node function)
  • NodeConnectionError (raise this from nodes for transient network/API failures)

What never gets retried:

  • NodeValidationError — pydantic rejected the inputs; retrying with the same inputs is pointless
  • HumanInputRequired — pauses immediately

Each retry emits a node_retry streaming event with {attempt, max_retries, error, delay}.

async for event in execute(compiled, retry=RetryConfig(max_retries=2, delay=0.5)):
    if event["type"] == "node_retry":
        print(f"Retrying {event['node_id']} in {event['delay']}s — {event['error']}")

Error types

All exceptions inherit from ConductorError and are importable from conductor.errors. Node-level errors carry node_id, node_type, and the original exception so they propagate with enough context to log, display, or route to an error handler.

ConductorError                     # Base — catch-all for any engine error
├── CompilationError                # Graph structure is invalid
│   ├── CycleDetectionError         # Graph contains a cycle
│   └── TypeCheckError              # Edge type mismatch (strict mode)
├── NodeError                       # Something went wrong with a specific node
│   ├── NodeValidationError         # Input validation failed (Pydantic) — never retried
│   ├── NodeExecutionError          # Node function raised — retried if configured
│   ├── NodeTimeoutError            # Node exceeded its timeout
│   └── NodeConnectionError         # External service / network failure inside a node
├── InputResolutionError            # Could not resolve inputs from edges
├── FlowExecutionError              # Flow-level failure (raised by execute_sync)
├── FlowPausedError                 # Flow paused for human input (carries checkpoint)
└── HumanInputRequired              # Signal raised by nodes to request human input

Use NodeConnectionError from your node code to mark a failure as transient and retry-worthy:

from conductor.errors import NodeConnectionError

@registry.node("fetch-api", version=1, name="Fetch", description="HTTP GET",
               max_retries=3, retry_delay=1.0)
def fetch_api(url: Annotated[str, Text(label="URL")]) -> Annotated[str, Output(label="Body")]:
    try:
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        return resp.text
    except requests.RequestException as e:
        raise NodeConnectionError(f"API call failed: {e}") from e

Legacy aliases (NodeValidationException, NodeExecutionException, FlowExecutionException, FlowPausedException) remain importable from conductor.errors and map to the new *Error names.

Human-in-the-loop

Nodes can pause execution to request human input. The engine checkpoints state (JSON-serializable), and execution resumes later with the human's response:

from conductor.errors import HumanInputRequired, FlowPausedException
from conductor.execution.engine import execute_sync, resume_sync

# A node that needs approval
@registry.node("approve", version=1, name="Approval", description="Needs approval")
def approve(text: Annotated[str, Text(label="Content")]) -> Annotated[str, Output(label="Approved")]:
    raise HumanInputRequired(
        prompt=f"Please approve: {text}",
        schema={"approved": "bool", "comment": "str"},
    )

# Execute — pauses at the approval node
try:
    results = execute_sync(compiled)
except FlowPausedException as e:
    checkpoint = e.checkpoint  # JSON-serializable dict, store in DB

# Resume later with the human's response
results = resume_sync(compiled, checkpoint, response="Approved!")

Key features:

  • Checkpoints are plain dicts — serialize to JSON, store in a database, resume hours/days later
  • FlowStore data survives the checkpoint/resume cycle
  • A flow can pause multiple times (sequential approval gates)
  • Works with both streaming (flow_paused event) and sync (FlowPausedException) APIs
  • Both function-based and class-based nodes can raise HumanInputRequired

Process-standard features

Conductor ships a full set of process-orchestration primitives designed for portable "process-as-code":

Decision nodes + edge guards — first-class branching driven by CEL expressions on outgoing edges. Compile-time validation: exactly one else edge, ≥1 guarded edge. Priority-ordered evaluation; non-taken edges are marked skipped so the SKIPPED sentinel propagates through normally.

@registry.node("decision", ..., is_decision=True)
def decision(value): return value

GraphEdge("e1", "d", "high", "result", None, when="amount > 1000", priority=10),
GraphEdge("e2", "d", "low",  "result", None),  # else

CEL expression language — sandboxed mini-evaluator (conductor.expr) for guards, loop conditions, idempotency keys, signal correlation, and subprocess input mapping. Literals, arithmetic/comparison/logical ops, ternary, in, dotted/indexed identifiers, $ root, built-in functions (size, has, contains, startsWith, matches, string/int/double, etc.).

Actor metadata@registry.node(actor={"kind": "human", "role": "manager"}). Kinds: system, human, agent, external_service. Pure metadata; surfaces in the registry JSON schema for UI rendering and audit trails.

Per-node timeout and idempotency keytimeout=0.2 (seconds) or timeout="PT30S" (ISO 8601) wraps execution with asyncio.wait_for. idempotency_key='"charge-" + string(amount)' evaluates a CEL expression against the node's inputs, surfaces the result on node_start, and injects it into the function if it declares an idempotency_key parameter. Stable across retries.

While / until compound region — CEL-driven loops, with a max_iterations safety cap that raises LoopRunawayError. Register the while-start / while-end markers and pass compound_types=[WHILE] to compile(). negate=True turns while into until.

Subprocess compound — call another flow by (flow_id, version). Register target flows in a SubprocessRegistry and pass it to compile(subprocess_registry=...). Runtime depth cap catches recursion; errors bubble as SubprocessFailedError.

Compensation / saga — per-node compensation= points at another node. On flow failure the engine walks completed nodes in reverse and dispatches each one's compensation with (target_node_id, original_inputs, original_output). Events: compensation_start/_complete/_failed. Per-node on_error policy (fail / continue / compensate) controls triggering.

Signal / event nodes — nodes raise SignalRequired(name, correlation=..., timeout_seconds=...) to pause on an external event, mirroring HITL. Host uses FlowCheckpoint.matches_signal(name, payload) to route incoming signals; correlation is evaluated as CEL on the host side.

Flow-level metadataFlow(dependencies=..., triggers=..., on_error_default=...). Node uses=[...] lists are validated against dependencies; triggers are stored for hosts to wire external machinery (cron, webhook, queue consumer).

YAML / JSON flow formatconductor.flow_format round-trips Flow ↔ YAML/JSON/dict. load_flow(dict), yaml_to_flow(str), load_flow_from_path(path), flow_to_yaml(flow). Optional PyYAML dependency (conductor[yaml]).

See spec.md for the design motivation and PROGRESS.md for the full feature-by-feature implementation log.

Custom data types

Define custom types using NewType — at runtime they're their base type, but the type string surfaces in the frontend JSON schema:

from typing import NewType, TypedDict

# Simple alias — shows as "base64str" in the schema
Base64Str = NewType("Base64Str", str)

# Structured type
class NamedFile(TypedDict):
    content: str   # Base64-encoded
    filename: str

# Use in node signatures
@registry.node("upload", version=1, name="Upload", description="Accepts a file")
def upload(file: Annotated[Base64Str, FileUpload(label="File")]) -> ...:
    ...

Built-in types: Base64Str, Date, NamedFile, MultiNamedFile. Host apps can define additional types following the same pattern.

Widgets

Widgets define how inputs render in the frontend and what validation to apply. Every WidgetType enum value has a concrete Python class so a generic frontend can render by reading the registry — no bespoke backend code per widget.

Widget Best for Key options
Text Single-line string min_length, max_length, pattern
Textarea Multi-line string rows, min_length, max_length
TemplateTextarea String with interpolation hints variables, rows
CodeEditor Code blob with syntax highlighting language, min_length, max_length
Dropdown Pick one from a fixed set choices
DependentDropdown Choices depend on another field depends_on, choices_map
Multiselect Pick many from a fixed set choices, min_selected, max_selected
EntityDropdown Host-loaded async choices entity_kind, multiple
Number Free numeric input min_val, max_val, step, integer_only
Range Numeric slider min_val, max_val, step
Checkbox Boolean toggle
Switch Boolean toggle (sibling of Checkbox)
DatePicker ISO date input min_date, max_date
FileUpload Base64 file accept, max_size_mb, multiple
List User-authored array item_widget, min_items, max_items
SchemaBuilder Structured dict / object editor schema, allow_additional
IfElseBuilder Conditional expression editor variables
ConnectionList Aggregates N upstream edges into a labeled dict
Output Return-value marker download, filename

Widgets for common types — the default mapping

When a parameter has no Annotated[T, Widget(...)] (or has Annotated[T, ...] without a widget instance), the registry infers a sensible default widget from the type:

Python type Default widget
str Text
int Number(integer_only=True)
float Number
bool Checkbox
Date DatePicker
Base64Str, NamedFile, MultiNamedFile FileUpload
list[str] List(item_widget=Text())
list[int] List(item_widget=Number(integer_only=True))
list[T] (bare or other) List (no item_widget)
dict, dict[str, T] SchemaBuilder
anything else no widget (rare — annotate one explicitly)

Explicit Annotated[T, Widget(...)] always wins. So def f(x: int) gets Number, but def f(x: Annotated[int, Range(min_val=0, max_val=100)]) still gets Range. Use defaults for plain "just a field"; annotate when you want constraints, a different widget, or a human-friendly label.

Full widget guide: docs/widgets.md — catalog, the default dispatch, and a four-step recipe for adding a new widget. Hands-on tour: examples/08_widgets.ipynb.

Execution events

The execute() async generator yields these events:

Event When
node_start Node begins execution
node_complete Node finished (includes result)
node_skipped Node skipped (all inputs SKIPPED)
node_error Node raised an unretryable (or final) exception
node_retry Node failed and will be retried (includes attempt, max_retries, error, delay)
node_progress Loop iteration progress
flow_complete All nodes done (includes all results)
flow_paused Node requested human input (includes checkpoint)
flow_error Unrecoverable error
flow_timeout Execution exceeded timeout
flow_cancelled Execution was cancelled

Using in other projects

AI context (llms.txt)

The canonical AI-readable library reference lives inside the package at packages/conductor/src/conductor/about/llms.txt. It ships as package data in the wheel, so any project that depends on conductor can pull it at runtime with no repo access — preferred for downstream projects:

python -m conductor.about                 # full reference
python -m conductor.about sections        # list section slugs
python -m conductor.about shared          # just the shared-references section (prefix match)

Useful when an agent in a downstream project needs to learn the library without you having to paste docs into its context.

Keeping docs in sync

Two channels guard against doc drift:

  • /docs-audit Claude Code slash command — run it at the end of a session that added public API or changed default behavior. It diffs the last N commits against CLAUDE.md, README.md, packages/conductor/src/conductor/about/llms.txt, docs/shared-references.md, and docs/index.md, and applies edits in place. Does not commit; you review the diff.
  • Weekly CI audit.github/workflows/docs-audit.yml runs the same audit every Monday and opens a PR if anything drifted. Requires ANTHROPIC_API_KEY as a repo secret.

Documentation

For full documentation, we recommend MkDocs Material. To set it up:

uv add --group docs mkdocs-material mkdocstrings[python]
uv run mkdocs serve  # Local preview at http://localhost:8000
uv run mkdocs gh-deploy  # Deploy to GitHub Pages

Standard node library (conductor-nodes)

A workspace sibling to conductor that ships common nodes so downstream flows don't have to re-author them. Distributed on PyPI as syv-conductor-nodes; the Python import path is conductor_nodes. Pick categories you want:

from conductor import NodeRegistry
from conductor_nodes import register_all, text, math

reg = NodeRegistry()
register_all(reg)                                   # everything
register_all(reg, categories=["text", "math"])      # a subset
# or per-module:
text.register(reg)
math.register(reg)

Alternatively, grab a pre-populated registry and merge it into yours — useful when composing multiple sources:

from conductor_nodes import get_default_registry

mine = NodeRegistry()
# ... register your own nodes ...
mine.merge(get_default_registry())              # raises on full-id collisions
mine.merge(other_registry, on_conflict="skip")  # or tolerate existing wins

Categories and highlights:

Module Node IDs
text text-uppercase, text-lowercase, text-trim, text-length, text-concat, text-replace, text-contains, text-split, text-join, text-reverse
math math-add, math-subtract, math-multiply, math-divide, math-modulo, math-round, math-min, math-max, math-abs
logic logic-if-empty, logic-if-equals, logic-not (branch via SKIPPED sentinel)
loop for-each-start, for-each-end — canonical markers for the FOR_EACH compound
json_ops json-parse, json-stringify, json-get (dotted path)
regex_ops regex-match, regex-replace, regex-extract

Node IDs are category-prefixed to avoid colliding with application-level IDs. Registering twice with the same ID raises — pick one source.

Frontend providers (conductor-providers)

Framework adapters. Each provider is a subpackage translating between conductor's Python objects and the framework's wire format. Distributed on PyPI as syv-conductor-providers; the Python import path is conductor_providers. The initial provider is conductor_providers.react:

from conductor_providers import react

# Registry → node palette JSON for a sidebar
palette = react.palette_from_registry(registry)

# GraphNode/GraphEdge → ReactFlow JSON (positions auto-assigned if omitted)
flow_json = react.graph_to_react(nodes, edges)

# ReactFlow JSON → GraphNode/GraphEdge (tuples restored from JSON lists)
nodes2, edges2 = react.react_to_graph(flow_json)

Shared references survive the round-trip: produces and consumes ride on each node's data payload and come back as the same dicts. Unknown keys in the wire format are ignored, so hosts can decorate without breaking compatibility.

New providers (Svelte, Vue, Gradio, …) go in sibling subpackages under conductor_providers. — no abstract base class to satisfy; each provider picks the shape that matches its framework.

Examples

The examples are Jupyter notebooks under examples/ — open them in VS Code, JupyterLab, or any notebook UI and run the cells interactively.

Notebook What it covers
01_basic_nodes.ipynb Widgets, multi-output, optional params
02_build_and_run_flow.ipynb Graph building, collecting results, streaming
03_class_nodes_and_store.ipynb BaseNode ABC, FlowStore injection
04_control_flow.ipynb Conditionals (SKIPPED), for-each loops
05_auto_discovery.ipynb Package scanning, JSON schema for frontends
06_human_in_the_loop.ipynb Pause, checkpoint, resume
07_shared_references.ipynb Producers, consumers, fan-out, broadcast into loop bodies
08_widgets.ipynb Type defaults, every widget, inspecting the schema, writing a custom widget
uv sync                       # includes the ipykernel used by the notebooks
uv run jupyter lab examples/  # or open the .ipynb files in VS Code

The notebooks use await collect(execute(compiled)) because the kernel already owns an event loop. From a plain .py script, use execute_sync(compiled) instead.

Stability and versioning

From 1.0.0 onward, conductor follows Semantic Versioning. This is the contract host applications can rely on:

Public API. A name is part of the public API if and only if it is listed in a module's __all__, or it is documented in this README / docs/. Anything else — private helpers, _-prefixed names, modules not re-exported from a public surface — is internal and may change in any release without warning. The audited public surface lives in:

  • Top-level: conductor.__init__
  • Widgets / metadata / types: conductor.widgets, conductor.metadata, conductor.types
  • Errors: conductor.errors
  • Registry: conductor.registry (NodeRegistry, NodeDefinition, Actor) and conductor.registry.dynamic_outputs (ComputeOutputsContext, IncomingBinding, ComputeOutputsFn, strip_sub_output_prefix)
  • Graph: conductor.graph.compiler (compile, CompiledGraph, ExtensionResolver, DecisionGuard)
  • Compound nodes: conductor.compound (ForEachNode, WhileNode, SubprocessNode, the matching *_TYPE constants, plus Region, NodeExecutor, CompoundNodeType)
  • Execution: conductor.execution.engine (execute, execute_sync, resume, resume_sync, collect), conductor.execution.events (the *Event TypedDicts, ExecutionEvent, EventSink), conductor.execution.results (normalize_result, extract_output, filter_skipped, filter_all_skipped), conductor.execution.state (FlowRunState)

Compatibility guarantees from 1.0.0.

  • No breaking changes without a major bump. If a 1.x release removes a public name, changes a public signature in a way that breaks callers, or alters documented behavior, the version that ships that change is 2.0.0 (or later).
  • Deprecation policy. When a public name is scheduled for removal it stays live for at least one minor release after deprecation, with a DeprecationWarning raised at import or call time. The CHANGELOG.md entry that introduces the deprecation lists the target removal version.
  • Internal modules are fair game. conductor.expr.engine, conductor.execution.resolver, conductor.graph.topology, conductor.graph.regions, conductor.graph.shared_refs, conductor.graph.type_check, conductor.graph.dynamic_outputs, conductor.validation, conductor._sentinel, and any other module not on the list above are internal. They may be renamed, restructured, or removed in any release.
  • The three workspace packages release in lockstep. syv-conductor, syv-conductor-nodes, syv-conductor-providers share a single version. The syv-conductor[nodes] / [providers] / [all] extras pin sibling packages with == to prevent resolver skew.

Pre-1.0 (0.x) iteration. The 0.1.x series was iterated freely without semver guarantees; consumers were advised to pin exact versions (==0.1.6, etc.). The CHANGELOG.md carries the full 0.1.0 → 0.1.7 history and flags any wart that is a candidate for a future major bump under "Future deprecation candidates".

License

Apache-2.0. See LICENSE at the repo root; each PyPI wheel ships the same license file.

About

Compile-then-execute DAG engine for building visual workflow systems

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages