Skip to content

feat(control-plane): AG-UI protocol adapter (POC)#554

Draft
AbirAbbas wants to merge 23 commits into
mainfrom
feat/agui-protocol-poc
Draft

feat(control-plane): AG-UI protocol adapter (POC)#554
AbirAbbas wants to merge 23 commits into
mainfrom
feat/agui-protocol-poc

Conversation

@AbirAbbas
Copy link
Copy Markdown
Contributor

@AbirAbbas AbirAbbas commented May 8, 2026

Summary

Adds a complete AG-UI protocol adapter so AgentField is the agent backend for any AG-UI-based frontend, including CopilotKit's <CopilotChat>, useCopilotAction (Generative UI + HITL), and useCoAgent (shared state). Buffered and live-streaming reasoner contracts both supported across all three SDKs (Python, Go, TypeScript); the canonical @ag-ui/client HttpAgent works zero-config; CopilotRuntime proxies through unmodified.

What's in this PR

Protocol surface — every AG-UI event type

internal/agui/events.go defines and WriteSSE-encodes every AG-UI event type:

  • Lifecycle: RUN_STARTED, RUN_FINISHED, RUN_ERROR
  • Text: TEXT_MESSAGE_START / _CONTENT / _END / _CHUNK
  • Tool calls: TOOL_CALL_START / _ARGS / _END / _RESULT / _CHUNK
  • State: STATE_SNAPSHOT, STATE_DELTA (RFC 6902)
  • Reasoning: REASONING_START / _MESSAGE_START / _MESSAGE_CONTENT / _MESSAGE_END / REASONING_END — drives CopilotKit's "Thinking…" pane
  • Steps: STEP_STARTED / STEP_FINISHED
  • Snapshots: MESSAGES_SNAPSHOT
  • Passthrough: RAW, CUSTOM

Wire format byte-perfect: data: <json>\n\n (no event: line) — matches the reference TS EventEncoder.encodeSSE and the Python _encode_sse.

Endpoint

POST /api/v1/agui/runs/:node_id/:reasoner_name

Accepts the canonical RunAgentInputSchema body. The handler picks one of two paths based on the reasoner's response Content-Type:

  • Buffered (application/json): reasoner returns a single dict; handler emits the canonical event sequence with reasoning → tool calls → text (auto-chunked) → state → snapshot → finished.
  • Streaming (application/x-ndjson): reasoner streams NDJSON tagged chunks; handler dispatches each into its AG-UI counterpart in real time, lazily managing text/reasoning open/close lifecycle, then closes with MESSAGES_SNAPSHOT + RUN_FINISHED.

Sits behind the same DID/VC permission middleware as /execute when authorization is enabled.

Reasoner authoring SDKs (full parity across Python / Go / TypeScript)

Concept Python Go TypeScript
Streaming content type agui.STREAMING_CONTENT_TYPE agui.StreamingContentType agui.STREAMING_CONTENT_TYPE
14 streaming chunk builders text_chunk, reasoning_chunk, tool_call_*_chunk, … TextChunk, ReasoningChunk, ToolCall*Chunk, … textChunk, reasoningChunk, toolCall*Chunk, …
Tool-call from AI trace tool_calls_from_trace ToolCallsFromTrace toolCallsFromTrace
State delta replace state_delta_replace StateDeltaReplace stateDeltaReplace
Reasoning helpers reasoning_segment / reasoning ReasoningSegment / Reasoning reasoningSegment / reasoning
Stream serializer serialize_stream(gen) (FastAPI) SerializeStream(ctx, w, ch) serializeStream(iter) (Express/Fastify/Hono/Web)
.harness() relay relay_harness_stream(iter) RelayHarnessResult(*Result) relayHarnessStream(iter)

Full doc + examples per SDK: docs/integrations/copilotkit.md.

Performance

Load tests in internal/handlers/agui_runs_load_test.go (CI-gated, skipped in -short):

  • Buffered at 50× concurrent: 200 requests in ~90ms wall, p50 ≈ 4ms, p95 ≈ 75ms, p99 ≈ 77ms
  • Streaming dispatcher at 25× concurrent: 100 requests in ~18ms wall
  • Goroutine leak guard: < 50 goroutines remaining after load (actual delta = 6, runtime noise)
  • Benchmark (go test -bench=BenchmarkAGUI): ~389µs/op, 26KB/op, 214 allocs/op

Verification

Control plane (Go)

  • go test ./internal/agui/... ./internal/handlers/... — green
  • 18 event types, all Type() + MarshalJSON() paths covered (events_test.go)
  • Buffered handler: full happy path, ID generation, agent failure, slow heartbeat, cancellation, malformed JSON, validation 4xx, non-JSON agent body, tool-call triad, tool-call result, tool-call auto-id, tool-message passthrough, state snapshot ordering, state delta, reasoning string + list forms, chunked text streaming, branch coverage on every helper.
  • Streaming handler: full sequence (20 frames), error chunk terminates, malformed line surfaces as RAW.
  • Load tests: 200×50 buffered, 100×25 streaming, no goroutine leaks, p95 within budget.

Python SDK

  • pytest tests/test_agui_helpers.py — 43 tests, all pass
  • ruff check — clean
  • Coverage: 14 chunk builders, serialize_stream (3 modes), relay_harness_stream (8 message-shape branches), FastAPI ASGI round-trip via httpx ASGITransport, tool_calls_from_trace from .ai() traces, state_delta_*, reasoning_segment / reasoning.

Go SDK

  • go test ./agent/agui/... — 8 test functions, all pass
  • ToolCall, ToolCallsFromTrace, StateDeltaReplace, 13 streaming chunk builders, SerializeStream (happy path + context cancellation), ReasoningSegment + Reasoning (new), RelayHarnessResult (new — buffered Claude Agent harness messages → AG-UI chunks: text, reasoning, tool_use → start+end, tool_result string + list, system, unknowns → raw).

TypeScript SDK

  • npx vitest run tests/agui.test.ts — 31 tests, all pass
  • npx tsc --noEmit — clean
  • New @agentfield/sdk agui namespace mirrors Python 1:1: 14 streaming chunk builders, serializeStream (async + sync iterables), relayHarnessStream (full Claude Agent SDK message-shape matrix), reasoningSegment / reasoning, toolCall / toolCallsFromTrace, stateDeltaReplace / stateDeltaFromDiff.

End-to-end protocol parity (live round-trip per SDK)

Live local rig (control plane + agent registration + curl + @ag-ui/client Zod-schema validation, torn down after) against each SDK's streaming reasoner. All three produced bit-identical 23-frame canonical sequences:

RUN_STARTED → STEP_STARTED → REASONING_START → REASONING_MESSAGE_START
→ REASONING_MESSAGE_CONTENT × 2 → REASONING_MESSAGE_END
→ TOOL_CALL_START → TOOL_CALL_ARGS → TOOL_CALL_END → TOOL_CALL_RESULT
→ REASONING_END → TEXT_MESSAGE_START → TEXT_MESSAGE_CONTENT × 2
→ STATE_SNAPSHOT → STATE_DELTA → STEP_FINISHED → CUSTOM
→ TEXT_MESSAGE_CONTENT (final-chunk synthesized) → TEXT_MESSAGE_END
→ MESSAGES_SNAPSHOT → RUN_FINISHED

Every event passes @ag-ui/core EventSchemas.safeParse(...) — same library CopilotRuntime delegates to — for all three SDKs.

Real CopilotKit React UI (Playwright, off-CI)

Browser → <CopilotChat>CopilotRuntimeHttpAgent → AgentField, exercised end to end:

  • chat reply renders in <CopilotChat> from a synthetic agent
  • <FlightCard> React component painted via useCopilotAction({render}) driven by a backend TOOL_CALL_* triad
  • useCoAgent({state}) updated 41 → 42 from STATE_SNAPSHOT
  • click-confirm round-trip: agent emits confirmBooking → React renderAndWaitForResponse Yes/No → user clicks → result POSTs back as role:"tool" → next agent turn replies "Booking confirmed"

Out of scope (truly deferred to follow-ups)

  • Per-token streaming via the buffered @app.reasoner() decorator. The buffered contract returns a synchronous dict; for live UX, reasoners use the FastAPI / chunk-channel pattern shown above. Buffered responses are auto-chunked on emission so the UX is acceptable; the source-of-truth is still synchronous.
  • Bidirectional cancellation propagation into a streaming reasoner. Client disconnect aborts the streaming HTTP read on our end (ctx.Done propagates through httpAgentInvoker), but the reasoner's own context plumbing has to actually stop the work.

Test plan

  • go test ./internal/agui/... ./internal/handlers/... — buffered + streaming + load + benchmarks green
  • gofmt, go vet clean on touched files
  • pytest tests/test_agui_helpers.py — 43/43 pass; ruff check clean
  • go test ./agent/agui/... — green (Go SDK helpers, including new RelayHarnessResult + Reasoning)
  • vitest run tests/agui.test.ts — 31/31 pass; tsc --noEmit clean (new TS SDK agui namespace)
  • Live round-trip per SDK against a real running control plane — Python, Go, and TypeScript reasoners all produce identical canonical event sequences
  • @ag-ui/client + @ag-ui/core Zod schema validation green for all three SDKs
  • Real CopilotKit React UI with Playwright: chat, generative UI, useCoAgent state, click-confirm round-trip

AbirAbbas and others added 2 commits May 8, 2026 13:37
Introduces an internal/agui package with typed structs for the lifecycle
and text-message subset of the AG-UI protocol
(https://docs.ag-ui.com/concepts/events): RunStarted, RunFinished,
RunError, TextMessageStart, TextMessageContent, TextMessageEnd. Each
event MarshalJSON-injects the AG-UI `type` discriminator so callers
construct events without setting it manually.

WriteSSE emits one event in canonical AG-UI wire format
(`event: <Type>\ndata: <json>\n\n`). Unit tests pin the wire shape and
the discriminator agreement between the event line and JSON body.

ToolCall and State events are deliberately omitted from this first
slice; they belong with the reasoner-streaming work that this package
will later support.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires AgentField as an AG-UI-compatible backend so frontends like
CopilotKit can consume runs from the control plane without a custom
adapter layer.

Behavior:
  - POST body {reasoner, input, threadId?, runId?}; reasoner takes the
    usual node_id.reasoner_name form.
  - Pre-stream validation (missing/malformed reasoner, unknown node,
    unknown reasoner) returns plain JSON 4xx so clients can detect
    "stream never opened" before reading the first frame.
  - On success, emits RunStarted -> TextMessageStart ->
    TextMessageContent (one chunk carrying the reasoner's `result`) ->
    TextMessageEnd -> RunFinished, with auto-generated thread/run/message
    IDs when the client omits them.
  - On agent failure after stream open, emits RunError as the terminal
    frame; never a partial happy-path-shaped sequence.

The agent invocation path is abstracted through an agentInvoker
interface so this slice does not yet plumb token-level streaming; the
handler currently does a synchronous POST + io.ReadAll mirroring
ExecuteReasonerHandler. Replacing that with chunk-relay is the next
iteration and will allow per-token TextMessageContent emission.

Tests cover: canonical event sequence + ID propagation + result
surfacing, ID auto-generation, post-stream agent failure, and the four
pre-stream validation paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

📊 Coverage gate

Thresholds from .coverage-gate.toml: per-surface ≥ 86%, aggregate ≥ 88%, max per-surface regression ≤ 1.0 pp, max aggregate regression ≤ 0.50 pp.

Surface Current Baseline Δ
control-plane 87.50% 87.30% ↑ +0.20 pp 🟡
sdk-go 91.80% 90.70% ↑ +1.10 pp 🟢
sdk-python 93.68% 93.63% ↑ +0.05 pp 🟢
sdk-typescript 92.76% 92.56% ↑ +0.20 pp 🟢
web-ui 89.91% 90.01% ↓ -0.10 pp 🟡
aggregate 89.02% 89.01% ↑ +0.01 pp 🟡

✅ Gate passed

No surface regressed past the allowed threshold and the aggregate stayed above the floor.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

📐 Patch coverage gate

Threshold: 80% on lines this PR touches vs origin/main (from .coverage-gate.toml:thresholds.min_patch).

Surface Touched lines Patch coverage Status
control-plane 1059 88.00%
sdk-go 244 84.00%
sdk-python 0 ➖ no changes
sdk-typescript 247 94.00%
web-ui 0 ➖ no changes

✅ Patch gate passed

Every surface whose lines were touched by this PR has patch coverage at or above the threshold.

AbirAbbas and others added 9 commits May 8, 2026 14:01
Cross-checked the live POC stream against the reference @ag-ui/core (TS) and
ag_ui (Python) SDKs at github.com/ag-ui-protocol/ag-ui and ran our endpoint
end-to-end through the canonical @ag-ui/client HttpAgent. Several deviations
from the spec needed fixing before the wire was actually compatible:

  - Event type discriminator must be UPPER_SNAKE_CASE (RUN_STARTED,
    TEXT_MESSAGE_CONTENT, RUN_FINISHED, …) — the values of the EventType enum
    that both reference SDKs validate against. Was emitting PascalCase per
    the human-readable docs page, which clients reject.
  - SSE wire format is `data: <json>\n\n` only. Both reference encoders
    (encoder.ts and encoder.py) emit just the data line; the `event:` line
    was non-canonical. Removed.
  - RUN_FINISHED requires both `threadId` and `runId` per the schema; was
    omitting them. Now propagated from the request through the lifecycle.
  - `timestamp` is `Optional[int]` (Unix milliseconds), not an RFC3339
    string. Renamed agui.Now → agui.NowMillis returning int64.
  - Dropped `input` from RUN_STARTED. The schema types it as RunAgentInput
    (threadId/runId/state/messages/tools/context/forwardedProps), not a
    freeform map, so emitting our reasoner input under that field would
    fail strict validation. Re-added once we plumb the structured shape.

Also added an SSE comment heartbeat (`: keep-alive` every 15s while waiting
for a slow reasoner). AG-UI has no heartbeat event, but `:`-prefixed comment
lines are valid SSE that the canonical parsers silently drop, while
intermediaries (nginx, ALBs) see traffic and don't idle out the connection.
The agent invocation now runs in a goroutine while the handler's main loop
selects on a ticker, the result channel, and the request context. Exposed
AGUIHeartbeatInterval as a package var so tests can override.

Validation:
  - Unit: TestWriteSSE_FrameShape pins UPPER_SNAKE types, the data-only
    wire format, and timestamp typing.
  - Unit: TestAGUIRunHandler_HappyPath_EmitsCanonicalEventSequence asserts
    threadId/runId on RUN_FINISHED and that `input` is absent from
    RUN_STARTED until the structured shape lands.
  - Unit: TestAGUIRunHandler_EmitsHeartbeatWhileReasonerIsSlow drives a
    50ms-heartbeat / 250ms-blocking-reasoner case and asserts the
    `: keep-alive` line appears.
  - Live (slow_task duration_seconds=3): every frame validated against
    @ag-ui/core Zod schemas via /tmp/agui-validate/validate.mjs.
  - Live (slow_task duration_seconds=25): heartbeat fires at t=15s, full
    lifecycle still completes correctly.
  - Live (agent killed mid-flight): RUN_STARTED → RUN_ERROR terminal
    sequence, both frames pass schemas.
  - Live (HttpAgent end-to-end): @ag-ui/client HttpAgent successfully
    consumes our endpoint, fires onRunStartedEvent / onTextMessage*Event /
    onRunFinishedEvent in order, and synthesizes an assistant message with
    the reasoner result as content.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI patch-coverage gate flagged 75% on 260 touched lines (floor 80%).
Existing tests drove the handler through an `agentInvoker` interface stub,
so the concrete `httpAgentInvoker` body and several defensive error branches
were unreachable.

Added:
  - Direct httpAgentInvoker tests: happy path, 5xx → callError-shaped error,
    dial failure (closed listener), invalid URL → request-construction error.
  - WriteSSE marshal-error and write-error branches via a stub Event whose
    MarshalJSON returns an error and a failingWriter.
  - Handler with malformed JSON body → 400 (covers c.ShouldBindJSON branch).
  - Agent body without a `result` key → fallthrough that stringifies the
    whole map; agent body that isn't JSON at all → string(body) fallthrough.
  - stringifyResult branch coverage: string passthrough, nil, slice, map.
  - Mid-flight context cancellation: cancel after RUN_STARTED, assert no
    happy-path frames follow and the handler returns within 2s.

Per-function coverage on the touched files now:
  internal/agui/events.go         every function 100%
  AGUIRunHandler                  100%
  aguiRunHandler                  89.2%
  httpAgentInvoker.Invoke         94.4%
  reasonerExists                  100%
  stringifyResult                 87.5%

Remaining uncovered lines are defensive (json.Marshal of a map[string]any
that came from JSON deserialization can't fail in practice, io.ReadAll on
an httptest body is reliable). All files comfortably above the 80% floor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…events

Adds the type surface needed for full AG-UI protocol coverage so a
@ag-ui/client HttpAgent (and the CopilotKit runtime that wraps it) can
talk to AgentField with no custom adapter:

- internal/agui/types.go: RunAgentInput, Message, Tool, ToolCall,
  ContextItem mirroring RunAgentInputSchema from the reference TS SDK
  (sdks/typescript/packages/core/src/types.ts). Permissive on sub-fields
  so unrecognized props pass through.
- internal/agui/events.go: TOOL_CALL_START/_ARGS/_END/_RESULT,
  STATE_SNAPSHOT, STATE_DELTA, and MESSAGES_SNAPSHOT, each with a
  type-injecting MarshalJSON, matching the canonical UPPER_SNAKE wire
  shape.
- Direct tests in events_test.go and types_test.go cover every Type/
  MarshalJSON branch and the LastUserMessageText extractor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…/snapshot

Switches POST /api/v1/agui/runs to /api/v1/agui/runs/:node_id/:reasoner_name
and accepts the canonical RunAgentInputSchema body, so the vanilla
@ag-ui/client HttpAgent (and the CopilotKit runtime that wraps it)
plug into AgentField with zero custom adapter code. Verified end-to-end
against @ag-ui/core EventSchemas with the unmodified HttpAgent.

Output stream now carries the full Generative-UI surface:

  - lifecycle: RUN_STARTED / RUN_FINISHED / RUN_ERROR (unchanged)
  - text: TEXT_MESSAGE_START / _CONTENT / _END (unchanged)
  - tool calls: when the reasoner returns a `toolCalls` array, emits
    TOOL_CALL_START -> _ARGS -> _END before the text turn closes, and
    attaches the calls to the assistant message in MESSAGES_SNAPSHOT.
    Frontend pattern-matches `toolCallName` against useCopilotAction
    registrations to render custom React components.
  - shared state: when the reasoner returns a top-level `state` field,
    emits STATE_SNAPSHOT before MESSAGES_SNAPSHOT, the value useCoAgent
    reads on the client. Inbound `state` (and forwardedProps) plumbed
    through to the reasoner input.
  - MESSAGES_SNAPSHOT closes every successful run with inbound history
    plus the assistant turn, so multi-turn clients can persist it.

Tests cover happy path, ID auto-gen, agent failure, slow-reasoner
heartbeat, mid-flight context cancel, malformed JSON, validation 4xx,
non-JSON agent body, tool-call triad emission + parentMessageId
stitching, tool-call auto-id and malformed-entry skipping, inbound
tool-role message passthrough, state snapshot ordering and
opt-in-only emission, and the four httpAgentInvoker error branches.
extractAssistantText / extractToolCalls / extractState branch tests
cover the non-map, content-key, filtered-empty, and absent-state paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes the gap where defined event types had no producer in the
handler. Reasoners can now opt into:

- TOOL_CALL_RESULT — when a `toolCalls` entry includes a `result`
  field, the handler emits TOOL_CALL_RESULT after TOOL_CALL_END so
  server-side tool traces (e.g. from app.ai(tools=...)) render as
  completed in the UI instead of pending placeholders.
- STATE_DELTA — when the reasoner returns a `stateDelta` array of
  RFC 6902 patch ops, the handler emits STATE_DELTA after the
  snapshot, giving useCoAgent fine-grained updates without re-sending
  the full state every turn.
- Chunked text streaming — long `result` values are auto-split into
  multiple TEXT_MESSAGE_CONTENT deltas (default 256 chars) on rune
  boundaries, so the frontend paints progressively even though the
  reasoner is synchronous. Exact-byte concatenation reproduces the
  full text; the start/end frames stay singletons.

Also adds an httptest-driven integration test that exercises the full
AG-UI sequence end to end with a real reasoner stub — the previous
unit tests stubbed the agentInvoker interface and never asserted the
cross-process input/output contract, so a future SDK envelope rename
would have silently broken Generative UI without failing CI. The new
test asserts the reasoner receives the full RunAgentInput envelope
(prompt/messages/tools/state/context/forwardedProps) and the wire
output carries the canonical 12-event sequence with all fields stitched
correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the /execute group: when AGENTFIELD_FEATURES_DID_AUTHORIZATION_ENABLED
is true and the access-policy/DID services are wired, calls to
/api/v1/agui/runs/* must carry a valid DID-signed request. This closes
the auth-middleware gap on the AG-UI endpoint without affecting local
development (the middleware is a no-op when authorization is disabled).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reasoner authors who want Generative UI / shared state through the
control plane's AG-UI adapter have to return specific fields in their
response — until now that contract was implicit and undocumented. This
module makes it the canonical, tested public surface:

- agui.tool_call(name, arguments, id?, result?) builds one tool-call
  entry in the shape the control plane translates into TOOL_CALL_*
  frames. Setting `result` opts into TOOL_CALL_RESULT so server-side
  traces show as completed.
- agui.tool_calls_from_trace(trace) converts a ToolCallTrace from
  app.ai(tools=...) into the toolCalls list — errors land as
  {"error": "..."} on the result field so the UI can show a final
  state instead of a pending placeholder.
- agui.state_delta_replace(path, value) and state_delta_from_diff(...)
  build RFC 6902 patches for the `stateDelta` field, the wire shape
  the handler turns into STATE_DELTA events.

Re-exported as `agentfield.agui` and covered by direct unit tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Go SDK reasoners receive the AG-UI envelope as a flat map[string]any
input today (no SDK changes needed there). The new agui package gives
authors typed helpers for the structured response fields the control
plane consumes:

- agui.ToolCall(id, name, arguments, result) builds an AG-UI tool-call
  entry; an empty name returns nil so authors surface the misuse
  eagerly instead of having the control plane silently drop it.
- agui.ToolCallsFromTrace(*ai.ToolCallTrace) converts the trace from
  Client.ExecuteToolCallLoopResult into the toolCalls list shape;
  errored calls surface as {"error":"..."} on the result field.
- agui.StateDeltaReplace(path, value) builds a single RFC 6902 op,
  validating the leading slash so callers can't ship malformed paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Documents the canonical topology, the URL/body contract, the reasoner
response fields that drive each AG-UI event type, end-to-end Python
and Go reasoner examples (including the app.ai(tools=...) trace bridge
and HITL frontend tools via renderAndWaitForResponse), and the
deliberately-out-of-scope items (live token streaming, live tool-arg
streaming, per-provider .harness() relay, STEP_*/RAW/CUSTOM events).

The "reasoner contract" table is the load-bearing piece — without it,
"AgentField speaks AG-UI" is not actionable for someone trying to ship
Generative UI on top of an existing reasoner.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

Performance

SDK Memory Δ Latency Δ Tests Status
Python 9.4 KB +4% 0.24 µs -31%
Go 234 B -16% 0.63 µs -37%
TS 379 B +8% 3.52 µs +76%

✓ No regressions detected

AbirAbbas and others added 12 commits May 10, 2026 09:58
Defines every remaining event type from the canonical AG-UI schema so
the protocol surface is complete:

- STEP_STARTED / STEP_FINISHED — named-step boundaries inside a run.
  CopilotKit's chat UI ignores these, but other AG-UI consumers (trace
  viewers, debuggers) render them as a hierarchical activity log.
- RAW — passes a foreign-system event through verbatim, with a `source`
  tag. Frontends that subscribed via onRawEvent see it; others ignore.
- CUSTOM — application-defined events with a name + freeform value.
- REASONING_START / _MESSAGE_START / _MESSAGE_CONTENT / _MESSAGE_END /
  REASONING_END — chain-of-thought events. CopilotKit renders these in
  a collapsible "Thinking…" pane, surfacing extended-thinking from
  Claude / o-series models.
- TEXT_MESSAGE_CHUNK — compact form of TEXT_MESSAGE_START → _CONTENT →
  _END (one chunk opens an implicit message, attaches a delta, an
  empty delta closes).
- TOOL_CALL_CHUNK — same compact shape for tool-call args.

Each gets a Type() method and a MarshalJSON that auto-injects the AG-UI
type discriminator. Direct table-driven tests cover every Type +
MarshalJSON branch (events_test.go now exercises 18 event types).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Buffered reasoners surface chain-of-thought to CopilotKit's "Thinking…"
pane by returning a new `reasoning` field — either a plain string (one
thinking block) or a list (multiple segments, each with optional
explicit id):

  return {
    "result": "Booked AA-12.",
    "reasoning": [
      "Looking up flights for SFO->JFK...",
      "AA-12 is the cheapest non-stop.",
    ],
  }

The handler emits REASONING_START → one REASONING_MESSAGE_*/CONTENT/END
triad per segment → REASONING_END, before the tool-call/text turn (so
the frontend renders thinking above the answer, matching the typical UX
flow). Long segments are auto-chunked across multiple
REASONING_MESSAGE_CONTENT deltas.

Tests cover all input shapes (string, list of strings, list of
{id,content} dicts), the reject paths (non-map parsed value, empty
content, missing key, wrong type), and ordering (REASONING_END must
land before TEXT_MESSAGE_START).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the live-streaming code path: when a reasoner returns
Content-Type: application/x-ndjson, the handler skips the buffered
JSON-decode flow and instead reads the body line-by-line, dispatching
each tagged chunk into its AG-UI counterpart and flushing live.

agentInvoker now returns an *agentInvocation that holds either a
buffered Body or a live Stream (io.ReadCloser) — chosen by the
reasoner's Content-Type. The handler picks the right path; existing
buffered reasoners are unaffected.

Recognized streaming chunk types (matches the Python and Go SDK
helpers in the companion commits):

  text / reasoning / reasoning_end                  -> TEXT_MESSAGE_* / REASONING_*
  tool_call_start / tool_call_args / tool_call_end  -> TOOL_CALL_*
  tool_call_result                                  -> TOOL_CALL_RESULT
  state / state_delta                               -> STATE_SNAPSHOT / STATE_DELTA
  step_started / step_finished                      -> STEP_*
  raw / custom                                      -> RAW / CUSTOM
  final                                             -> applies a buffered envelope
  error                                             -> RUN_ERROR (terminal)

The dispatcher manages text and reasoning open/close lifecycle
automatically (lazy START on first delta, END on last delta or stream
end), so reasoner authors don't track session state. Malformed NDJSON
lines surface as RAW + a decode_error tag and the stream continues —
one bad chunk shouldn't kill the run.

Stream end emits MESSAGES_SNAPSHOT (history + accumulated assistant
turn with tool calls stitched on) + RUN_FINISHED, matching the
buffered path's close shape so frontends don't have to branch on
streaming-vs-buffered.

Three integration tests cover the full streaming sequence (with
chunked-delay reasoner asserting all 20 frames in canonical order),
mid-stream error termination (RUN_ERROR is terminal; subsequent chunks
ignored), and malformed-line resilience.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ss relay

Extends agentfield.agui with:

- 14 chunk builders matching the control plane's streaming dispatcher:
  text_chunk, reasoning_chunk, reasoning_end_chunk, tool_call_*_chunk
  (start/args/end/result), state_chunk, state_delta_chunk,
  step_started/finished_chunk, raw_chunk, custom_chunk, final_chunk,
  error_chunk. Each produces the exact wire shape the handler expects.
- serialize_stream(generator) — turns an async generator yielding
  chunk dicts (or bare strings, auto-wrapped as text) into an async
  iterator of NDJSON-encoded bytes suitable for FastAPI's
  StreamingResponse. Reasoners get live AG-UI events with three lines
  of glue.
- relay_harness_stream(harness_iter) — bridge for Claude Agent SDK
  (.harness()) async iterators. Translates harness message blocks
  into AG-UI chunks per type: text -> text_chunk, thinking ->
  reasoning_chunk, tool_use -> tool_call_start+end, tool_result ->
  tool_call_result_chunk. Anything unrecognized becomes a raw_chunk so
  traces never silently drop. The Claude harness streams per-message
  rather than per-token, so this delivers message-level streaming;
  true per-token requires the raw Anthropic API.
- reasoning(*segments) and reasoning_segment(content, id=...) helpers
  for the buffered REASONING_* path, mirroring the streaming shape.
- STREAMING_CONTENT_TYPE constant ("application/x-ndjson") so callers
  don't typo it.

23 new tests cover every chunk builder, serialize_stream's three
acceptance modes (chunk dict, bare string, type error), an
ASGITransport+httpx round-trip that asserts NDJSON output through a
real FastAPI route, and the harness relay's seven message-shape
branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the Python agentfield.agui streaming helpers so Go reasoners
have the same opt-in path to live AG-UI events. Includes:

- 13 chunk builders (TextChunk, ReasoningChunk, ToolCallStart/Args/
  End/Result, StateChunk/StateDeltaChunk, StepStarted/Finished, Raw,
  Custom, Final, Error) producing the wire shapes the handler's
  streaming dispatcher consumes.
- SerializeStream(ctx, w, chunks) — drains a `chan map[string]any`
  into NDJSON lines on an io.Writer, flushing after each, honoring
  context cancellation. Producer is responsible for closing the
  channel when done.
- StreamingContentType constant.

Tests cover every chunk builder's optional-field branches (e.g. omit
parentMessageId / source / value when zero) and SerializeStream's
context-cancellation path (returns ctx.Err() on cancel even when no
chunks are flowing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds three CI-gated load tests that backfill the production-readiness
signal the earlier 5×concurrent test couldn't provide:

- TestAGUI_Load_ConcurrentBuffered — fires 200 buffered requests at
  50× concurrency against a fast in-process httptest reasoner.
  Asserts every request returns the canonical event sequence,
  goroutines settle back to baseline (no leaks, < 50 delta), and
  p95 latency stays under 250ms. Logs p50/p95/p99 for visibility.
- TestAGUI_Load_ConcurrentStreaming — same shape against a streaming
  NDJSON reasoner so the streaming dispatch path is also load-tested
  and leak-checked.
- BenchmarkAGUI_BufferedHandler — per-request cost baseline for
  regression detection (~389µs/op, 26KB/op on a quiet box).

All three tests skip in -short mode so they don't slow down `go test
-short`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the CopilotKit integration guide with:

- Live streaming reasoner pattern in Python (FastAPI +
  agui.serialize_stream) and Go (chunk channel + SerializeStream).
- .harness() relay example using agui.relay_harness_stream to pipe a
  Claude Agent SDK iterator straight to AG-UI.
- Full chunk-type reference table — every NDJSON `type` and the AG-UI
  event(s) it maps to, so reasoner authors don't have to read the
  dispatcher source.
- Performance section with measured load-test numbers (50× concurrent
  buffered: p99 77ms; benchmark: 389µs/op, 26KB/op).
- Trimmed the "what we don't yet do" section to the actually-deferred
  items (per-token streaming through the buffered contract; reasoner-
  side cancellation propagation).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a new @agentfield/sdk agui namespace exposing the same surface as
sdk/python/agentfield/agui.py:

  - Buffered helpers: toolCall, toolCallsFromTrace, stateDeltaReplace,
    stateDeltaFromDiff, reasoning, reasoningSegment.
  - 14 streaming chunk builders (textChunk, reasoningChunk,
    toolCallStartChunk, etc.) producing the same wire shape the control
    plane's NDJSON dispatcher consumes.
  - serializeStream(asyncIter) → AsyncIterable<Uint8Array> for
    Express/Fastify/Hono/native http response bodies.
  - relayHarnessStream(asyncIter) translating @anthropic-ai/claude-agent-sdk
    messages into AG-UI chunks (text, thinking, tool_use, tool_result).

31 vitest cases covering every chunk builder, serializer auto-wrap and
context behavior, and the full harness relay translation matrix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes the parity gap with the Python and TypeScript SDKs:

  - ReasoningSegment(content, id) / Reasoning(...segments) — buffered
    REASONING_* emission builders matching reasoning_segment / reasoning
    in the Python SDK.
  - RelayHarnessResult(*harness.Result) — translates the buffered
    Claude Agent harness messages slice into AG-UI streaming chunks
    (text, reasoning, tool_use → start+end, tool_result, system,
    unknowns → raw). Mirrors relay_harness_stream in Python.

Tests cover the reasoning helpers' string + segment branches and the
harness relay's full message-shape matrix (system, assistant text +
thinking + tool_use + unknown blocks, tool_result string + list
content, terminal result envelope, unknown top-level messages).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the missing TypeScript surface to the CopilotKit integration doc:

  - Buffered TypeScript reasoner example using agui.toolCall and
    agui.toolCallsFromTrace.
  - TypeScript streaming reasoner example using agui.serializeStream
    against an Express response (the same chunks plug into
    Fastify/Hono/Web Response).
  - Note in the .harness() relay section that
    agui.relayHarnessStream is the TS equivalent of the Python helper,
    and agui.RelayHarnessResult is the buffered Go equivalent.
  - SDK parity matrix mapping every helper concept (chunk builders,
    serializer, harness relay) across Python, Go and TypeScript.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pers

The patch-coverage gate flagged the streaming dispatcher because the
existing integration tests drive the happy path but skip many of the
guard branches (empty deltas, missing IDs, write failures) and several
chunk types only used in advanced flows.

Adds focused unit coverage that drives dispatchChunk and applyFinal
directly with a captureWriter, lifting:

  - dispatchChunk:         50.6%  -> 88.6%
  - applyFinal:            0.0%   -> 97.6%
  - closeTextSession:      75.0%  -> 100.0%
  - closeReasoningSession: 77.8%  -> 88.9%

Covered branches:
  - All early-return guards (empty deltas, missing IDs/names, empty ops).
  - tool_call lifecycle: start (with and without inline arguments),
    args appending to the in-flight call, end, result with default and
    explicit role.
  - state / state_delta / step_started / step_finished / raw / custom
    happy paths plus unknown-chunk-type fallback to RAW.
  - Error chunk emits RUN_ERROR and short-circuits the loop.
  - reasoning_end is idempotent (no-op without an open segment).
  - text chunk auto-closes any open reasoning context first.
  - applyFinal: full envelope (reasoning + toolCalls + state +
    stateDelta + result), nil-data no-op, reuses the open reasoning
    context instead of opening a new one.
  - closeTextSession / closeReasoningSession write-failure short-circuit
    paths for the rare client-disconnect-mid-close case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@santoshkumarradha
Copy link
Copy Markdown
Member

@AbirAbbas , on this. Will post few archi comments soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants