Skip to content

Add Long-Running Responses API Agent Template#146

Open
david-tempelmann wants to merge 9 commits intodatabricks:mainfrom
david-tempelmann:long-running-agent
Open

Add Long-Running Responses API Agent Template#146
david-tempelmann wants to merge 9 commits intodatabricks:mainfrom
david-tempelmann:long-running-agent

Conversation

@david-tempelmann
Copy link

@david-tempelmann david-tempelmann commented Mar 3, 2026

  • Adds agent-openai-agents-sdk-long-running-agent template for long-running agent queries (minutes instead of seconds).
  • Background mode: Two flows: (1) Background + Poll – POST with background: true returns immediately; client polls GET until completion. (2) Background + Stream – POST with stream: true, background: true returns an SSE stream; if the connection drops, client resumes via GET /responses/{id}?stream=true&starting_after=N to receive remaining events from sequence N+1.
  • Persistence: Lakebase (PostgreSQL) stores stream events so clients can resume or poll results.
  • LongRunningAgentServer: Extends MLflow AgentServer with background mode and retrieve endpoints.
  • Compatible with Reponses APIs Background mode (except for cancelling a background response)
  • demo_long_running_agent.py script to demonstrate how to interact with the agent using the OpenAI agents sdk. The script uses a short and a long dummy query for demo purposes. The long query is supposed to run beyond the 120 second timeout to demonstrate stream resumption.

@david-tempelmann
Copy link
Author

@bbqiu This is my PR. The current e2e-chatbot-app-next won't work with this agent and would require some changes. The corresponding client contract is defined in the README.md

@bbqiu bbqiu self-requested a review March 3, 2026 09:46
Copy link
Contributor

@bbqiu bbqiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great! i'll go over this again tmrw to fix some small things after comments are addressed!



@invoke()
async def invoke(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit to rename to invoke_handler / stream_handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be able to just steal this file from the openai agents SDK from main btw

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@stream()
async def stream(request: dict) -> AsyncGenerator[ResponsesAgentStreamEvent, None]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit to fix this type hint

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _sse_event(event_type: str, data: dict[str, Any] | str) -> str:
"""Format an SSE event per Open Responses spec: event must match type in body."""
payload = data if isinstance(data, str) else json.dumps(data)
return f"event: {event_type}\ndata: {payload}\n\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc, how did the frontend client handle this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change anything in addition to what I initially implemented to make background mode work. It still worked but I would need to check in detail how the frontend handles them.

last_output_index: int = -1


def _normalize_stream_event(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah were these the restrictions we had to get around to make it work with the .stream from the responses client? if so, we can maybe drop these requirements for now, as this seems a tad brittle

needing to remap output_index etc. is quite unfortunate, and it's a bit confusing that the openai-agents sdk doesn't produce output that is compatible w/ the client itself

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I'll revert ...

Output of some digging I did on this:

Summary: Hosted Tools, Background Mode, and .stream() Compatibility

What is documented

Background mode (official guide):

  • Setting background=true runs a Response asynchronously; the API returns immediately with status: "queued"
  • You poll via GET /v1/responses/{id} or stream via GET /v1/responses/{id}?stream=true with a starting_after cursor to resume
  • It's for long-running single model calls (the docs only show simple text generation examples)
  • background requires store=true
  • You can cancel in-flight responses

Hosted MCP tools (official guide):

  • One responses.create call returns one Response object
  • The API server connects to the remote MCP server, executes tools, feeds results back to the model -- all within that single Response
  • The output array contains mcp_list_tools, mcp_call items (with output field populated), and the final assistant message
  • "All models can choose to make multiple MCP tool calls, so you may see several of these items generated in a single API request"
  • The model can "chain another tool or return a final answer" within one request
  • During streaming, events like response.mcp_call.in_progress and response.mcp_call.completed fire within the single response.created -> response.completed lifecycle
  • The same pattern applies to other hosted tools (web search, file search, code interpreter, image generation)

The OpenAI Python client's .stream() method (from source at openai/lib/streaming/responses/_responses.py):

  • ResponseStreamState expects exactly one response.created as the first event (hard crash otherwise)
  • Accumulates a single ParsedResponseSnapshot -- no reset mechanism for a second response
  • Terminates on response.completed
  • One .stream() call = one Response lifecycle

Function calling (official guide):

  • Explicitly a multi-turn, client-driven loop: call API -> get tool calls -> execute locally -> call API again
  • Each API call is a separate Response object
  • previous_response_id is documented for chaining these separate Responses

What is NOT documented (inferred or unspecified)

  • Whether background mode works with hosted MCP tools: The background mode docs don't mention hosted tools. The MCP docs don't mention background mode. It's reasonable to infer they compose (since both operate at the Response level), but there's no explicit documentation of background=true + MCP tools together.

  • The internal mechanics of multi-step execution within one Response: The docs say tools execute and results feed back to the model, but don't specify:

    • How many internal LLM inference passes happen
    • How output_index values are assigned across chained internal tool calls
    • How sequence_number values progress during streaming of multi-step execution
    • How usage is aggregated across internal passes
    • How errors mid-chain affect the Response status
    • Whether there's a limit on internal chaining depth (beyond max_tool_calls)
  • That this is the only way to make .stream() work with multi-step tool execution: This is an inference from the client code constraints -- no documentation says "if you want server-side tool execution compatible with .stream(), you must use this pattern." It follows logically from the ResponseStreamState singleton-snapshot design, but it's not stated.

  • How to build a compatible server: There is no "Responses API server specification" or protocol doc. The API is documented from the client's perspective only.

What this means for your server

To support clients using .stream() with server-side tool execution, your server must:

Protocol contract (documented, must match):

  1. Each POST /v1/responses returns exactly one Response object with a unique id
  2. The Response has status progressing through queued -> in_progress -> completed (or failed/incomplete)
  3. The output array contains all items from the entire execution: tool list items, tool call items (with results), and the final message
  4. When stream=true, emit SSE events starting with response.created and ending with response.completed
  5. Support GET /v1/responses/{id} for polling
  6. Support GET /v1/responses/{id}?stream=true&starting_after={cursor} for stream resume
  7. Every SSE event must have a sequence_number for cursor-based resume

Internal execution (inferred, must implement but no spec to follow):

  1. Run your agent loop (LLM call -> tool execution -> LLM call -> ...) entirely within the scope of one Response
  2. Assign output_index values sequentially as items are produced across internal steps
  3. Assign sequence_number values sequentially across all streaming events
  4. Stream intermediate events (response.mcp_call.in_progress, response.output_item.added, etc.) as execution progresses
  5. Aggregate usage across all internal LLM passes into one usage object on the final Response
  6. If a mid-chain tool call fails, populate the error field on that item and let the model continue or fail the Response

Key risk: The internal event ordering and structure (items 1-6 above) is reverse-engineerable by calling the real OpenAI API with hosted MCP tools and stream=true, then recording the exact event sequence. But you'd be building against observed behavior, not a published contract. If OpenAI changes the event ordering or adds new event types, your server could drift out of compatibility without warning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""
super()._setup_routes()

# TODO: check because I don't think we need pghost ... just the LAKEBASE_INSTANCE_NAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an FYI the frontend template requires pghost for the stateful chats

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. But that requirement should not be handled/checked in for the agent server I guess? I just simplified the warning message and removed the TODO.

f41fa1d

}

if is_streaming:
asyncio.create_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we have a default timeout that's configurable of 30 min? just so stuff doesn't run forever

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

724cd90

This adds some complexity unfortunately. What this implements:

  1. asyncio.timeout in _task_scope cancels the background task after task_timeout_seconds.
  2. _deferred_mark_failed — a fire-and-forget asyncio.Task that waits a short delay, then appends an error SSE event and sets status = "failed". Its own DB work is bounded by asyncio.timeout(cleanup_timeout_seconds).
  3. Stale-run check in _handle_retrieve_request — on every client poll, if a response is still in_progress but older than task_timeout_seconds, it's marked as failed on the spot.

Supporting infrastructure:

  • statement_timeout set via a SQLAlchemy checkout event listener, so Postgres kills any query exceeding 5s server-side.
  • created_at column on the Response model to enable stale-run detection.
  • Pydantic BaseSettings for centralized, validated configuration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and e68a21e


#### Implementing with the OpenAI SDK

```mermaid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there's a syntax error with this mermaid diagram

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbqiu bbqiu force-pushed the long-running-agent branch from 66719b9 to 3ff9504 Compare March 6, 2026 00:32
@bbqiu bbqiu self-requested a review March 6, 2026 00:50
david-tempelmann and others added 9 commits March 5, 2026 22:04
Signed-off-by: Bryan Qiu <bryan.qiu@databricks.com>
Signed-off-by: Bryan Qiu <bryan.qiu@databricks.com>
Signed-off-by: Bryan Qiu <bryan.qiu@databricks.com>
@bbqiu bbqiu force-pushed the long-running-agent branch from e1c8e59 to ebb81f8 Compare March 6, 2026 06:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants