feat(control-plane): AG-UI protocol adapter (POC)#554
Draft
AbirAbbas wants to merge 23 commits into
Draft
Conversation
Introduces an internal/agui package with typed structs for the lifecycle and text-message subset of the AG-UI protocol (https://docs.ag-ui.com/concepts/events): RunStarted, RunFinished, RunError, TextMessageStart, TextMessageContent, TextMessageEnd. Each event MarshalJSON-injects the AG-UI `type` discriminator so callers construct events without setting it manually. WriteSSE emits one event in canonical AG-UI wire format (`event: <Type>\ndata: <json>\n\n`). Unit tests pin the wire shape and the discriminator agreement between the event line and JSON body. ToolCall and State events are deliberately omitted from this first slice; they belong with the reasoner-streaming work that this package will later support. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires AgentField as an AG-UI-compatible backend so frontends like
CopilotKit can consume runs from the control plane without a custom
adapter layer.
Behavior:
- POST body {reasoner, input, threadId?, runId?}; reasoner takes the
usual node_id.reasoner_name form.
- Pre-stream validation (missing/malformed reasoner, unknown node,
unknown reasoner) returns plain JSON 4xx so clients can detect
"stream never opened" before reading the first frame.
- On success, emits RunStarted -> TextMessageStart ->
TextMessageContent (one chunk carrying the reasoner's `result`) ->
TextMessageEnd -> RunFinished, with auto-generated thread/run/message
IDs when the client omits them.
- On agent failure after stream open, emits RunError as the terminal
frame; never a partial happy-path-shaped sequence.
The agent invocation path is abstracted through an agentInvoker
interface so this slice does not yet plumb token-level streaming; the
handler currently does a synchronous POST + io.ReadAll mirroring
ExecuteReasonerHandler. Replacing that with chunk-relay is the next
iteration and will allow per-token TextMessageContent emission.
Tests cover: canonical event sequence + ID propagation + result
surfacing, ID auto-generation, post-stream agent failure, and the four
pre-stream validation paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
📊 Coverage gateThresholds from
✅ Gate passedNo surface regressed past the allowed threshold and the aggregate stayed above the floor. |
Contributor
📐 Patch coverage gateThreshold: 80% on lines this PR touches vs
✅ Patch gate passedEvery surface whose lines were touched by this PR has patch coverage at or above the threshold. |
Cross-checked the live POC stream against the reference @ag-ui/core (TS) and
ag_ui (Python) SDKs at github.com/ag-ui-protocol/ag-ui and ran our endpoint
end-to-end through the canonical @ag-ui/client HttpAgent. Several deviations
from the spec needed fixing before the wire was actually compatible:
- Event type discriminator must be UPPER_SNAKE_CASE (RUN_STARTED,
TEXT_MESSAGE_CONTENT, RUN_FINISHED, …) — the values of the EventType enum
that both reference SDKs validate against. Was emitting PascalCase per
the human-readable docs page, which clients reject.
- SSE wire format is `data: <json>\n\n` only. Both reference encoders
(encoder.ts and encoder.py) emit just the data line; the `event:` line
was non-canonical. Removed.
- RUN_FINISHED requires both `threadId` and `runId` per the schema; was
omitting them. Now propagated from the request through the lifecycle.
- `timestamp` is `Optional[int]` (Unix milliseconds), not an RFC3339
string. Renamed agui.Now → agui.NowMillis returning int64.
- Dropped `input` from RUN_STARTED. The schema types it as RunAgentInput
(threadId/runId/state/messages/tools/context/forwardedProps), not a
freeform map, so emitting our reasoner input under that field would
fail strict validation. Re-added once we plumb the structured shape.
Also added an SSE comment heartbeat (`: keep-alive` every 15s while waiting
for a slow reasoner). AG-UI has no heartbeat event, but `:`-prefixed comment
lines are valid SSE that the canonical parsers silently drop, while
intermediaries (nginx, ALBs) see traffic and don't idle out the connection.
The agent invocation now runs in a goroutine while the handler's main loop
selects on a ticker, the result channel, and the request context. Exposed
AGUIHeartbeatInterval as a package var so tests can override.
Validation:
- Unit: TestWriteSSE_FrameShape pins UPPER_SNAKE types, the data-only
wire format, and timestamp typing.
- Unit: TestAGUIRunHandler_HappyPath_EmitsCanonicalEventSequence asserts
threadId/runId on RUN_FINISHED and that `input` is absent from
RUN_STARTED until the structured shape lands.
- Unit: TestAGUIRunHandler_EmitsHeartbeatWhileReasonerIsSlow drives a
50ms-heartbeat / 250ms-blocking-reasoner case and asserts the
`: keep-alive` line appears.
- Live (slow_task duration_seconds=3): every frame validated against
@ag-ui/core Zod schemas via /tmp/agui-validate/validate.mjs.
- Live (slow_task duration_seconds=25): heartbeat fires at t=15s, full
lifecycle still completes correctly.
- Live (agent killed mid-flight): RUN_STARTED → RUN_ERROR terminal
sequence, both frames pass schemas.
- Live (HttpAgent end-to-end): @ag-ui/client HttpAgent successfully
consumes our endpoint, fires onRunStartedEvent / onTextMessage*Event /
onRunFinishedEvent in order, and synthesizes an assistant message with
the reasoner result as content.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI patch-coverage gate flagged 75% on 260 touched lines (floor 80%).
Existing tests drove the handler through an `agentInvoker` interface stub,
so the concrete `httpAgentInvoker` body and several defensive error branches
were unreachable.
Added:
- Direct httpAgentInvoker tests: happy path, 5xx → callError-shaped error,
dial failure (closed listener), invalid URL → request-construction error.
- WriteSSE marshal-error and write-error branches via a stub Event whose
MarshalJSON returns an error and a failingWriter.
- Handler with malformed JSON body → 400 (covers c.ShouldBindJSON branch).
- Agent body without a `result` key → fallthrough that stringifies the
whole map; agent body that isn't JSON at all → string(body) fallthrough.
- stringifyResult branch coverage: string passthrough, nil, slice, map.
- Mid-flight context cancellation: cancel after RUN_STARTED, assert no
happy-path frames follow and the handler returns within 2s.
Per-function coverage on the touched files now:
internal/agui/events.go every function 100%
AGUIRunHandler 100%
aguiRunHandler 89.2%
httpAgentInvoker.Invoke 94.4%
reasonerExists 100%
stringifyResult 87.5%
Remaining uncovered lines are defensive (json.Marshal of a map[string]any
that came from JSON deserialization can't fail in practice, io.ReadAll on
an httptest body is reliable). All files comfortably above the 80% floor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…events Adds the type surface needed for full AG-UI protocol coverage so a @ag-ui/client HttpAgent (and the CopilotKit runtime that wraps it) can talk to AgentField with no custom adapter: - internal/agui/types.go: RunAgentInput, Message, Tool, ToolCall, ContextItem mirroring RunAgentInputSchema from the reference TS SDK (sdks/typescript/packages/core/src/types.ts). Permissive on sub-fields so unrecognized props pass through. - internal/agui/events.go: TOOL_CALL_START/_ARGS/_END/_RESULT, STATE_SNAPSHOT, STATE_DELTA, and MESSAGES_SNAPSHOT, each with a type-injecting MarshalJSON, matching the canonical UPPER_SNAKE wire shape. - Direct tests in events_test.go and types_test.go cover every Type/ MarshalJSON branch and the LastUserMessageText extractor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…/snapshot
Switches POST /api/v1/agui/runs to /api/v1/agui/runs/:node_id/:reasoner_name
and accepts the canonical RunAgentInputSchema body, so the vanilla
@ag-ui/client HttpAgent (and the CopilotKit runtime that wraps it)
plug into AgentField with zero custom adapter code. Verified end-to-end
against @ag-ui/core EventSchemas with the unmodified HttpAgent.
Output stream now carries the full Generative-UI surface:
- lifecycle: RUN_STARTED / RUN_FINISHED / RUN_ERROR (unchanged)
- text: TEXT_MESSAGE_START / _CONTENT / _END (unchanged)
- tool calls: when the reasoner returns a `toolCalls` array, emits
TOOL_CALL_START -> _ARGS -> _END before the text turn closes, and
attaches the calls to the assistant message in MESSAGES_SNAPSHOT.
Frontend pattern-matches `toolCallName` against useCopilotAction
registrations to render custom React components.
- shared state: when the reasoner returns a top-level `state` field,
emits STATE_SNAPSHOT before MESSAGES_SNAPSHOT, the value useCoAgent
reads on the client. Inbound `state` (and forwardedProps) plumbed
through to the reasoner input.
- MESSAGES_SNAPSHOT closes every successful run with inbound history
plus the assistant turn, so multi-turn clients can persist it.
Tests cover happy path, ID auto-gen, agent failure, slow-reasoner
heartbeat, mid-flight context cancel, malformed JSON, validation 4xx,
non-JSON agent body, tool-call triad emission + parentMessageId
stitching, tool-call auto-id and malformed-entry skipping, inbound
tool-role message passthrough, state snapshot ordering and
opt-in-only emission, and the four httpAgentInvoker error branches.
extractAssistantText / extractToolCalls / extractState branch tests
cover the non-map, content-key, filtered-empty, and absent-state paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes the gap where defined event types had no producer in the handler. Reasoners can now opt into: - TOOL_CALL_RESULT — when a `toolCalls` entry includes a `result` field, the handler emits TOOL_CALL_RESULT after TOOL_CALL_END so server-side tool traces (e.g. from app.ai(tools=...)) render as completed in the UI instead of pending placeholders. - STATE_DELTA — when the reasoner returns a `stateDelta` array of RFC 6902 patch ops, the handler emits STATE_DELTA after the snapshot, giving useCoAgent fine-grained updates without re-sending the full state every turn. - Chunked text streaming — long `result` values are auto-split into multiple TEXT_MESSAGE_CONTENT deltas (default 256 chars) on rune boundaries, so the frontend paints progressively even though the reasoner is synchronous. Exact-byte concatenation reproduces the full text; the start/end frames stay singletons. Also adds an httptest-driven integration test that exercises the full AG-UI sequence end to end with a real reasoner stub — the previous unit tests stubbed the agentInvoker interface and never asserted the cross-process input/output contract, so a future SDK envelope rename would have silently broken Generative UI without failing CI. The new test asserts the reasoner receives the full RunAgentInput envelope (prompt/messages/tools/state/context/forwardedProps) and the wire output carries the canonical 12-event sequence with all fields stitched correctly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the /execute group: when AGENTFIELD_FEATURES_DID_AUTHORIZATION_ENABLED is true and the access-policy/DID services are wired, calls to /api/v1/agui/runs/* must carry a valid DID-signed request. This closes the auth-middleware gap on the AG-UI endpoint without affecting local development (the middleware is a no-op when authorization is disabled). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reasoner authors who want Generative UI / shared state through the
control plane's AG-UI adapter have to return specific fields in their
response — until now that contract was implicit and undocumented. This
module makes it the canonical, tested public surface:
- agui.tool_call(name, arguments, id?, result?) builds one tool-call
entry in the shape the control plane translates into TOOL_CALL_*
frames. Setting `result` opts into TOOL_CALL_RESULT so server-side
traces show as completed.
- agui.tool_calls_from_trace(trace) converts a ToolCallTrace from
app.ai(tools=...) into the toolCalls list — errors land as
{"error": "..."} on the result field so the UI can show a final
state instead of a pending placeholder.
- agui.state_delta_replace(path, value) and state_delta_from_diff(...)
build RFC 6902 patches for the `stateDelta` field, the wire shape
the handler turns into STATE_DELTA events.
Re-exported as `agentfield.agui` and covered by direct unit tests.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Go SDK reasoners receive the AG-UI envelope as a flat map[string]any
input today (no SDK changes needed there). The new agui package gives
authors typed helpers for the structured response fields the control
plane consumes:
- agui.ToolCall(id, name, arguments, result) builds an AG-UI tool-call
entry; an empty name returns nil so authors surface the misuse
eagerly instead of having the control plane silently drop it.
- agui.ToolCallsFromTrace(*ai.ToolCallTrace) converts the trace from
Client.ExecuteToolCallLoopResult into the toolCalls list shape;
errored calls surface as {"error":"..."} on the result field.
- agui.StateDeltaReplace(path, value) builds a single RFC 6902 op,
validating the leading slash so callers can't ship malformed paths.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Documents the canonical topology, the URL/body contract, the reasoner response fields that drive each AG-UI event type, end-to-end Python and Go reasoner examples (including the app.ai(tools=...) trace bridge and HITL frontend tools via renderAndWaitForResponse), and the deliberately-out-of-scope items (live token streaming, live tool-arg streaming, per-provider .harness() relay, STEP_*/RAW/CUSTOM events). The "reasoner contract" table is the load-bearing piece — without it, "AgentField speaks AG-UI" is not actionable for someone trying to ship Generative UI on top of an existing reasoner. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
Performance
✓ No regressions detected |
Defines every remaining event type from the canonical AG-UI schema so the protocol surface is complete: - STEP_STARTED / STEP_FINISHED — named-step boundaries inside a run. CopilotKit's chat UI ignores these, but other AG-UI consumers (trace viewers, debuggers) render them as a hierarchical activity log. - RAW — passes a foreign-system event through verbatim, with a `source` tag. Frontends that subscribed via onRawEvent see it; others ignore. - CUSTOM — application-defined events with a name + freeform value. - REASONING_START / _MESSAGE_START / _MESSAGE_CONTENT / _MESSAGE_END / REASONING_END — chain-of-thought events. CopilotKit renders these in a collapsible "Thinking…" pane, surfacing extended-thinking from Claude / o-series models. - TEXT_MESSAGE_CHUNK — compact form of TEXT_MESSAGE_START → _CONTENT → _END (one chunk opens an implicit message, attaches a delta, an empty delta closes). - TOOL_CALL_CHUNK — same compact shape for tool-call args. Each gets a Type() method and a MarshalJSON that auto-injects the AG-UI type discriminator. Direct table-driven tests cover every Type + MarshalJSON branch (events_test.go now exercises 18 event types). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Buffered reasoners surface chain-of-thought to CopilotKit's "Thinking…"
pane by returning a new `reasoning` field — either a plain string (one
thinking block) or a list (multiple segments, each with optional
explicit id):
return {
"result": "Booked AA-12.",
"reasoning": [
"Looking up flights for SFO->JFK...",
"AA-12 is the cheapest non-stop.",
],
}
The handler emits REASONING_START → one REASONING_MESSAGE_*/CONTENT/END
triad per segment → REASONING_END, before the tool-call/text turn (so
the frontend renders thinking above the answer, matching the typical UX
flow). Long segments are auto-chunked across multiple
REASONING_MESSAGE_CONTENT deltas.
Tests cover all input shapes (string, list of strings, list of
{id,content} dicts), the reject paths (non-map parsed value, empty
content, missing key, wrong type), and ordering (REASONING_END must
land before TEXT_MESSAGE_START).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the live-streaming code path: when a reasoner returns Content-Type: application/x-ndjson, the handler skips the buffered JSON-decode flow and instead reads the body line-by-line, dispatching each tagged chunk into its AG-UI counterpart and flushing live. agentInvoker now returns an *agentInvocation that holds either a buffered Body or a live Stream (io.ReadCloser) — chosen by the reasoner's Content-Type. The handler picks the right path; existing buffered reasoners are unaffected. Recognized streaming chunk types (matches the Python and Go SDK helpers in the companion commits): text / reasoning / reasoning_end -> TEXT_MESSAGE_* / REASONING_* tool_call_start / tool_call_args / tool_call_end -> TOOL_CALL_* tool_call_result -> TOOL_CALL_RESULT state / state_delta -> STATE_SNAPSHOT / STATE_DELTA step_started / step_finished -> STEP_* raw / custom -> RAW / CUSTOM final -> applies a buffered envelope error -> RUN_ERROR (terminal) The dispatcher manages text and reasoning open/close lifecycle automatically (lazy START on first delta, END on last delta or stream end), so reasoner authors don't track session state. Malformed NDJSON lines surface as RAW + a decode_error tag and the stream continues — one bad chunk shouldn't kill the run. Stream end emits MESSAGES_SNAPSHOT (history + accumulated assistant turn with tool calls stitched on) + RUN_FINISHED, matching the buffered path's close shape so frontends don't have to branch on streaming-vs-buffered. Three integration tests cover the full streaming sequence (with chunked-delay reasoner asserting all 20 frames in canonical order), mid-stream error termination (RUN_ERROR is terminal; subsequent chunks ignored), and malformed-line resilience. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ss relay
Extends agentfield.agui with:
- 14 chunk builders matching the control plane's streaming dispatcher:
text_chunk, reasoning_chunk, reasoning_end_chunk, tool_call_*_chunk
(start/args/end/result), state_chunk, state_delta_chunk,
step_started/finished_chunk, raw_chunk, custom_chunk, final_chunk,
error_chunk. Each produces the exact wire shape the handler expects.
- serialize_stream(generator) — turns an async generator yielding
chunk dicts (or bare strings, auto-wrapped as text) into an async
iterator of NDJSON-encoded bytes suitable for FastAPI's
StreamingResponse. Reasoners get live AG-UI events with three lines
of glue.
- relay_harness_stream(harness_iter) — bridge for Claude Agent SDK
(.harness()) async iterators. Translates harness message blocks
into AG-UI chunks per type: text -> text_chunk, thinking ->
reasoning_chunk, tool_use -> tool_call_start+end, tool_result ->
tool_call_result_chunk. Anything unrecognized becomes a raw_chunk so
traces never silently drop. The Claude harness streams per-message
rather than per-token, so this delivers message-level streaming;
true per-token requires the raw Anthropic API.
- reasoning(*segments) and reasoning_segment(content, id=...) helpers
for the buffered REASONING_* path, mirroring the streaming shape.
- STREAMING_CONTENT_TYPE constant ("application/x-ndjson") so callers
don't typo it.
23 new tests cover every chunk builder, serialize_stream's three
acceptance modes (chunk dict, bare string, type error), an
ASGITransport+httpx round-trip that asserts NDJSON output through a
real FastAPI route, and the harness relay's seven message-shape
branches.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Mirrors the Python agentfield.agui streaming helpers so Go reasoners have the same opt-in path to live AG-UI events. Includes: - 13 chunk builders (TextChunk, ReasoningChunk, ToolCallStart/Args/ End/Result, StateChunk/StateDeltaChunk, StepStarted/Finished, Raw, Custom, Final, Error) producing the wire shapes the handler's streaming dispatcher consumes. - SerializeStream(ctx, w, chunks) — drains a `chan map[string]any` into NDJSON lines on an io.Writer, flushing after each, honoring context cancellation. Producer is responsible for closing the channel when done. - StreamingContentType constant. Tests cover every chunk builder's optional-field branches (e.g. omit parentMessageId / source / value when zero) and SerializeStream's context-cancellation path (returns ctx.Err() on cancel even when no chunks are flowing). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds three CI-gated load tests that backfill the production-readiness signal the earlier 5×concurrent test couldn't provide: - TestAGUI_Load_ConcurrentBuffered — fires 200 buffered requests at 50× concurrency against a fast in-process httptest reasoner. Asserts every request returns the canonical event sequence, goroutines settle back to baseline (no leaks, < 50 delta), and p95 latency stays under 250ms. Logs p50/p95/p99 for visibility. - TestAGUI_Load_ConcurrentStreaming — same shape against a streaming NDJSON reasoner so the streaming dispatch path is also load-tested and leak-checked. - BenchmarkAGUI_BufferedHandler — per-request cost baseline for regression detection (~389µs/op, 26KB/op on a quiet box). All three tests skip in -short mode so they don't slow down `go test -short`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extends the CopilotKit integration guide with: - Live streaming reasoner pattern in Python (FastAPI + agui.serialize_stream) and Go (chunk channel + SerializeStream). - .harness() relay example using agui.relay_harness_stream to pipe a Claude Agent SDK iterator straight to AG-UI. - Full chunk-type reference table — every NDJSON `type` and the AG-UI event(s) it maps to, so reasoner authors don't have to read the dispatcher source. - Performance section with measured load-test numbers (50× concurrent buffered: p99 77ms; benchmark: 389µs/op, 26KB/op). - Trimmed the "what we don't yet do" section to the actually-deferred items (per-token streaming through the buffered contract; reasoner- side cancellation propagation). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a new @agentfield/sdk agui namespace exposing the same surface as
sdk/python/agentfield/agui.py:
- Buffered helpers: toolCall, toolCallsFromTrace, stateDeltaReplace,
stateDeltaFromDiff, reasoning, reasoningSegment.
- 14 streaming chunk builders (textChunk, reasoningChunk,
toolCallStartChunk, etc.) producing the same wire shape the control
plane's NDJSON dispatcher consumes.
- serializeStream(asyncIter) → AsyncIterable<Uint8Array> for
Express/Fastify/Hono/native http response bodies.
- relayHarnessStream(asyncIter) translating @anthropic-ai/claude-agent-sdk
messages into AG-UI chunks (text, thinking, tool_use, tool_result).
31 vitest cases covering every chunk builder, serializer auto-wrap and
context behavior, and the full harness relay translation matrix.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes the parity gap with the Python and TypeScript SDKs:
- ReasoningSegment(content, id) / Reasoning(...segments) — buffered
REASONING_* emission builders matching reasoning_segment / reasoning
in the Python SDK.
- RelayHarnessResult(*harness.Result) — translates the buffered
Claude Agent harness messages slice into AG-UI streaming chunks
(text, reasoning, tool_use → start+end, tool_result, system,
unknowns → raw). Mirrors relay_harness_stream in Python.
Tests cover the reasoning helpers' string + segment branches and the
harness relay's full message-shape matrix (system, assistant text +
thinking + tool_use + unknown blocks, tool_result string + list
content, terminal result envelope, unknown top-level messages).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the missing TypeScript surface to the CopilotKit integration doc:
- Buffered TypeScript reasoner example using agui.toolCall and
agui.toolCallsFromTrace.
- TypeScript streaming reasoner example using agui.serializeStream
against an Express response (the same chunks plug into
Fastify/Hono/Web Response).
- Note in the .harness() relay section that
agui.relayHarnessStream is the TS equivalent of the Python helper,
and agui.RelayHarnessResult is the buffered Go equivalent.
- SDK parity matrix mapping every helper concept (chunk builders,
serializer, harness relay) across Python, Go and TypeScript.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pers
The patch-coverage gate flagged the streaming dispatcher because the
existing integration tests drive the happy path but skip many of the
guard branches (empty deltas, missing IDs, write failures) and several
chunk types only used in advanced flows.
Adds focused unit coverage that drives dispatchChunk and applyFinal
directly with a captureWriter, lifting:
- dispatchChunk: 50.6% -> 88.6%
- applyFinal: 0.0% -> 97.6%
- closeTextSession: 75.0% -> 100.0%
- closeReasoningSession: 77.8% -> 88.9%
Covered branches:
- All early-return guards (empty deltas, missing IDs/names, empty ops).
- tool_call lifecycle: start (with and without inline arguments),
args appending to the in-flight call, end, result with default and
explicit role.
- state / state_delta / step_started / step_finished / raw / custom
happy paths plus unknown-chunk-type fallback to RAW.
- Error chunk emits RUN_ERROR and short-circuits the loop.
- reasoning_end is idempotent (no-op without an open segment).
- text chunk auto-closes any open reasoning context first.
- applyFinal: full envelope (reasoning + toolCalls + state +
stateDelta + result), nil-data no-op, reuses the open reasoning
context instead of opening a new one.
- closeTextSession / closeReasoningSession write-failure short-circuit
paths for the rare client-disconnect-mid-close case.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Member
|
@AbirAbbas , on this. Will post few archi comments soon. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a complete AG-UI protocol adapter so AgentField is the agent backend for any AG-UI-based frontend, including CopilotKit's
<CopilotChat>,useCopilotAction(Generative UI + HITL), anduseCoAgent(shared state). Buffered and live-streaming reasoner contracts both supported across all three SDKs (Python, Go, TypeScript); the canonical@ag-ui/clientHttpAgent works zero-config;CopilotRuntimeproxies through unmodified.What's in this PR
Protocol surface — every AG-UI event type
internal/agui/events.godefines andWriteSSE-encodes every AG-UI event type:RUN_STARTED,RUN_FINISHED,RUN_ERRORTEXT_MESSAGE_START/_CONTENT/_END/_CHUNKTOOL_CALL_START/_ARGS/_END/_RESULT/_CHUNKSTATE_SNAPSHOT,STATE_DELTA(RFC 6902)REASONING_START/_MESSAGE_START/_MESSAGE_CONTENT/_MESSAGE_END/REASONING_END— drives CopilotKit's "Thinking…" paneSTEP_STARTED/STEP_FINISHEDMESSAGES_SNAPSHOTRAW,CUSTOMWire format byte-perfect:
data: <json>\n\n(noevent:line) — matches the reference TSEventEncoder.encodeSSEand the Python_encode_sse.Endpoint
POST /api/v1/agui/runs/:node_id/:reasoner_nameAccepts the canonical
RunAgentInputSchemabody. The handler picks one of two paths based on the reasoner's response Content-Type:application/json): reasoner returns a single dict; handler emits the canonical event sequence with reasoning → tool calls → text (auto-chunked) → state → snapshot → finished.application/x-ndjson): reasoner streams NDJSON tagged chunks; handler dispatches each into its AG-UI counterpart in real time, lazily managing text/reasoning open/close lifecycle, then closes withMESSAGES_SNAPSHOT+RUN_FINISHED.Sits behind the same DID/VC permission middleware as
/executewhen authorization is enabled.Reasoner authoring SDKs (full parity across Python / Go / TypeScript)
agui.STREAMING_CONTENT_TYPEagui.StreamingContentTypeagui.STREAMING_CONTENT_TYPEtext_chunk,reasoning_chunk,tool_call_*_chunk, …TextChunk,ReasoningChunk,ToolCall*Chunk, …textChunk,reasoningChunk,toolCall*Chunk, …tool_calls_from_traceToolCallsFromTracetoolCallsFromTracestate_delta_replaceStateDeltaReplacestateDeltaReplacereasoning_segment / reasoningReasoningSegment / ReasoningreasoningSegment / reasoningserialize_stream(gen)(FastAPI)SerializeStream(ctx, w, ch)serializeStream(iter)(Express/Fastify/Hono/Web).harness()relayrelay_harness_stream(iter)RelayHarnessResult(*Result)relayHarnessStream(iter)Full doc + examples per SDK:
docs/integrations/copilotkit.md.Performance
Load tests in
internal/handlers/agui_runs_load_test.go(CI-gated, skipped in-short):go test -bench=BenchmarkAGUI): ~389µs/op, 26KB/op, 214 allocs/opVerification
Control plane (Go)
go test ./internal/agui/... ./internal/handlers/...— greenType()+MarshalJSON()paths covered (events_test.go)Python SDK
pytest tests/test_agui_helpers.py— 43 tests, all passruff check— cleanserialize_stream(3 modes),relay_harness_stream(8 message-shape branches), FastAPI ASGI round-trip via httpx ASGITransport,tool_calls_from_tracefrom.ai()traces,state_delta_*,reasoning_segment/reasoning.Go SDK
go test ./agent/agui/...— 8 test functions, all passToolCall,ToolCallsFromTrace,StateDeltaReplace, 13 streaming chunk builders,SerializeStream(happy path + context cancellation),ReasoningSegment+Reasoning(new),RelayHarnessResult(new — buffered Claude Agent harness messages → AG-UI chunks: text, reasoning, tool_use → start+end, tool_result string + list, system, unknowns → raw).TypeScript SDK
npx vitest run tests/agui.test.ts— 31 tests, all passnpx tsc --noEmit— clean@agentfield/sdkaguinamespace mirrors Python 1:1: 14 streaming chunk builders,serializeStream(async + sync iterables),relayHarnessStream(full Claude Agent SDK message-shape matrix),reasoningSegment / reasoning,toolCall / toolCallsFromTrace,stateDeltaReplace / stateDeltaFromDiff.End-to-end protocol parity (live round-trip per SDK)
Live local rig (control plane + agent registration + curl +
@ag-ui/clientZod-schema validation, torn down after) against each SDK's streaming reasoner. All three produced bit-identical 23-frame canonical sequences:Every event passes
@ag-ui/coreEventSchemas.safeParse(...)— same libraryCopilotRuntimedelegates to — for all three SDKs.Real CopilotKit React UI (Playwright, off-CI)
Browser →
<CopilotChat>→CopilotRuntime→HttpAgent→ AgentField, exercised end to end:<CopilotChat>from a synthetic agent<FlightCard>React component painted viauseCopilotAction({render})driven by a backendTOOL_CALL_*triaduseCoAgent({state})updated41 → 42fromSTATE_SNAPSHOTconfirmBooking→ ReactrenderAndWaitForResponseYes/No → user clicks → result POSTs back asrole:"tool"→ next agent turn replies "Booking confirmed"Out of scope (truly deferred to follow-ups)
@app.reasoner()decorator. The buffered contract returns a synchronous dict; for live UX, reasoners use the FastAPI / chunk-channel pattern shown above. Buffered responses are auto-chunked on emission so the UX is acceptable; the source-of-truth is still synchronous.ctx.Donepropagates throughhttpAgentInvoker), but the reasoner's own context plumbing has to actually stop the work.Test plan
go test ./internal/agui/... ./internal/handlers/...— buffered + streaming + load + benchmarks greengofmt,go vetclean on touched filespytest tests/test_agui_helpers.py— 43/43 pass;ruff checkcleango test ./agent/agui/...— green (Go SDK helpers, including newRelayHarnessResult+Reasoning)vitest run tests/agui.test.ts— 31/31 pass;tsc --noEmitclean (new TS SDKaguinamespace)@ag-ui/client+@ag-ui/coreZod schema validation green for all three SDKs