Add Long-Running Responses API Agent Template#146
Add Long-Running Responses API Agent Template#146david-tempelmann wants to merge 9 commits intodatabricks:mainfrom
Conversation
|
@bbqiu This is my PR. The current e2e-chatbot-app-next won't work with this agent and would require some changes. The corresponding client contract is defined in the README.md |
bbqiu
left a comment
There was a problem hiding this comment.
this looks great! i'll go over this again tmrw to fix some small things after comments are addressed!
|
|
||
|
|
||
| @invoke() | ||
| async def invoke(request: ResponsesAgentRequest) -> ResponsesAgentResponse: |
There was a problem hiding this comment.
small nit to rename to invoke_handler / stream_handler
There was a problem hiding this comment.
should be able to just steal this file from the openai agents SDK from main btw
|
|
||
|
|
||
| @stream() | ||
| async def stream(request: dict) -> AsyncGenerator[ResponsesAgentStreamEvent, None]: |
There was a problem hiding this comment.
small nit to fix this type hint
| def _sse_event(event_type: str, data: dict[str, Any] | str) -> str: | ||
| """Format an SSE event per Open Responses spec: event must match type in body.""" | ||
| payload = data if isinstance(data, str) else json.dumps(data) | ||
| return f"event: {event_type}\ndata: {payload}\n\n" |
There was a problem hiding this comment.
ooc, how did the frontend client handle this?
There was a problem hiding this comment.
I did not change anything in addition to what I initially implemented to make background mode work. It still worked but I would need to check in detail how the frontend handles them.
| last_output_index: int = -1 | ||
|
|
||
|
|
||
| def _normalize_stream_event( |
There was a problem hiding this comment.
ah were these the restrictions we had to get around to make it work with the .stream from the responses client? if so, we can maybe drop these requirements for now, as this seems a tad brittle
needing to remap output_index etc. is quite unfortunate, and it's a bit confusing that the openai-agents sdk doesn't produce output that is compatible w/ the client itself
There was a problem hiding this comment.
Yes, I agree. I'll revert ...
Output of some digging I did on this:
Summary: Hosted Tools, Background Mode, and
.stream()CompatibilityWhat is documented
Background mode (official guide):
- Setting
background=trueruns a Response asynchronously; the API returns immediately withstatus: "queued"- You poll via
GET /v1/responses/{id}or stream viaGET /v1/responses/{id}?stream=truewith astarting_aftercursor to resume- It's for long-running single model calls (the docs only show simple text generation examples)
backgroundrequiresstore=true- You can cancel in-flight responses
Hosted MCP tools (official guide):
- One
responses.createcall returns oneResponseobject- The API server connects to the remote MCP server, executes tools, feeds results back to the model -- all within that single Response
- The
outputarray containsmcp_list_tools,mcp_callitems (withoutputfield populated), and the final assistant message- "All models can choose to make multiple MCP tool calls, so you may see several of these items generated in a single API request"
- The model can "chain another tool or return a final answer" within one request
- During streaming, events like
response.mcp_call.in_progressandresponse.mcp_call.completedfire within the singleresponse.created->response.completedlifecycle- The same pattern applies to other hosted tools (web search, file search, code interpreter, image generation)
The OpenAI Python client's
.stream()method (from source atopenai/lib/streaming/responses/_responses.py):
ResponseStreamStateexpects exactly oneresponse.createdas the first event (hard crash otherwise)- Accumulates a single
ParsedResponseSnapshot-- no reset mechanism for a second response- Terminates on
response.completed- One
.stream()call = one Response lifecycleFunction calling (official guide):
- Explicitly a multi-turn, client-driven loop: call API -> get tool calls -> execute locally -> call API again
- Each API call is a separate Response object
previous_response_idis documented for chaining these separate ResponsesWhat is NOT documented (inferred or unspecified)
Whether background mode works with hosted MCP tools: The background mode docs don't mention hosted tools. The MCP docs don't mention background mode. It's reasonable to infer they compose (since both operate at the Response level), but there's no explicit documentation of
background=true+ MCP tools together.The internal mechanics of multi-step execution within one Response: The docs say tools execute and results feed back to the model, but don't specify:
- How many internal LLM inference passes happen
- How
output_indexvalues are assigned across chained internal tool calls- How
sequence_numbervalues progress during streaming of multi-step execution- How
usageis aggregated across internal passes- How errors mid-chain affect the Response status
- Whether there's a limit on internal chaining depth (beyond
max_tool_calls)That this is the only way to make
.stream()work with multi-step tool execution: This is an inference from the client code constraints -- no documentation says "if you want server-side tool execution compatible with.stream(), you must use this pattern." It follows logically from theResponseStreamStatesingleton-snapshot design, but it's not stated.How to build a compatible server: There is no "Responses API server specification" or protocol doc. The API is documented from the client's perspective only.
What this means for your server
To support clients using
.stream()with server-side tool execution, your server must:Protocol contract (documented, must match):
- Each
POST /v1/responsesreturns exactly one Response object with a uniqueid- The Response has
statusprogressing throughqueued->in_progress->completed(orfailed/incomplete)- The
outputarray contains all items from the entire execution: tool list items, tool call items (with results), and the final message- When
stream=true, emit SSE events starting withresponse.createdand ending withresponse.completed- Support
GET /v1/responses/{id}for polling- Support
GET /v1/responses/{id}?stream=true&starting_after={cursor}for stream resume- Every SSE event must have a
sequence_numberfor cursor-based resumeInternal execution (inferred, must implement but no spec to follow):
- Run your agent loop (LLM call -> tool execution -> LLM call -> ...) entirely within the scope of one Response
- Assign
output_indexvalues sequentially as items are produced across internal steps- Assign
sequence_numbervalues sequentially across all streaming events- Stream intermediate events (
response.mcp_call.in_progress,response.output_item.added, etc.) as execution progresses- Aggregate
usageacross all internal LLM passes into oneusageobject on the final Response- If a mid-chain tool call fails, populate the
errorfield on that item and let the model continue or fail the ResponseKey risk: The internal event ordering and structure (items 1-6 above) is reverse-engineerable by calling the real OpenAI API with hosted MCP tools and
stream=true, then recording the exact event sequence. But you'd be building against observed behavior, not a published contract. If OpenAI changes the event ordering or adds new event types, your server could drift out of compatibility without warning.
| """ | ||
| super()._setup_routes() | ||
|
|
||
| # TODO: check because I don't think we need pghost ... just the LAKEBASE_INSTANCE_NAME |
There was a problem hiding this comment.
as an FYI the frontend template requires pghost for the stateful chats
There was a problem hiding this comment.
ack. But that requirement should not be handled/checked in for the agent server I guess? I just simplified the warning message and removed the TODO.
| } | ||
|
|
||
| if is_streaming: | ||
| asyncio.create_task( |
There was a problem hiding this comment.
nit: should we have a default timeout that's configurable of 30 min? just so stuff doesn't run forever
There was a problem hiding this comment.
This adds some complexity unfortunately. What this implements:
asyncio.timeoutin_task_scopecancels the background task after task_timeout_seconds._deferred_mark_failed— a fire-and-forgetasyncio.Taskthat waits a short delay, then appends an error SSE event and setsstatus = "failed". Its own DB work is bounded byasyncio.timeout(cleanup_timeout_seconds).- Stale-run check in
_handle_retrieve_request— on every client poll, if a response is stillin_progressbut older thantask_timeout_seconds, it's marked as failed on the spot.
Supporting infrastructure:
statement_timeoutset via a SQLAlchemy checkout event listener, so Postgres kills any query exceeding 5s server-side.created_atcolumn on the Response model to enable stale-run detection.- Pydantic
BaseSettingsfor centralized, validated configuration.
|
|
||
| #### Implementing with the OpenAI SDK | ||
|
|
||
| ```mermaid |
There was a problem hiding this comment.
i think there's a syntax error with this mermaid diagram
66719b9 to
3ff9504
Compare
Signed-off-by: Bryan Qiu <bryan.qiu@databricks.com>
Signed-off-by: Bryan Qiu <bryan.qiu@databricks.com>
e1c8e59 to
ebb81f8
Compare
agent-openai-agents-sdk-long-running-agenttemplate for long-running agent queries (minutes instead of seconds).background: truereturns immediately; client polls GET until completion. (2) Background + Stream – POST withstream: true,background: truereturns an SSE stream; if the connection drops, client resumes viaGET /responses/{id}?stream=true&starting_after=Nto receive remaining events from sequence N+1.demo_long_running_agent.pyscript to demonstrate how to interact with the agent using the OpenAI agents sdk. The script uses a short and a long dummy query for demo purposes. The long query is supposed to run beyond the 120 second timeout to demonstrate stream resumption.