feat(agents): paged, multi-fidelity context memory for McpClient#1915
feat(agents): paged, multi-fidelity context memory for McpClient#1915iamMihirT wants to merge 4 commits intodimensionalOS:devfrom
Conversation
Introduces a new ``dimos.agents.memory`` package that manages agent conversation state as a set of typed, multi-fidelity ``Page`` objects rather than an unbounded list of LangChain messages. Each page stores four fidelity rungs eagerly (POINTER, STRUCTURED, COMPRESSED, FULL), and a two-phase greedy selector picks per-page rungs to assemble a prompt that fits the active model's token budget. Core components: - ``pages.py``: ``Page`` dataclass + fidelity / page-type enums, with invariants enforced in ``__post_init__``. - ``page_table.py``: thread-safe store with explicit and auto pins; ``pin_recent_evidence`` keeps the N most recent EVIDENCE pages at FULL fidelity. - ``tokens.py``: ``TokenCounter`` protocol, tiktoken-backed counter for OpenAI models and a heuristic fallback for others. Counters return content-only tokens; per-message ChatML overhead lives on the budget. - ``budget.py``: ``ModelBudget`` with per-model context windows, output reserve, and per-message overhead; longest-prefix model name resolution. - ``faults.py``: structured fault events (PAGE_EVICTED, PAGE_DEGRADED, REFETCH_FAULT, PHYSICAL_INSUFFICIENCY, PIN_REBALANCE) emitted to structlog plus an optional ``Out[FaultEvent]`` stream for live UI. - ``ingestion.py``: turns a LangChain ``BaseMessage`` into one or more pages. Multimodal HumanMessages split into a CONVERSATION page plus one EVIDENCE page per image; AIMessage ``tool_calls`` payloads are cost-counted uniformly across all four fidelity rungs. - ``selector.py``: two-phase greedy ``assemble_prompt`` that respects pin invariants and the effective (per-message-overhead-aware) budget. - ``engine.py``: ``MemoryEngine`` façade coordinating ingestion, assembly, pin rebalancing, fault emission, and artefact rehydration. - ``artefact_tool.py``: ``build_get_artefact_tool`` returns a ``StructuredTool`` an LLM can call to rehydrate a degraded EVIDENCE page back to FULL. Adds ``tiktoken>=0.8.0`` to the ``agents`` extra in ``pyproject.toml``. 168 unit tests ship alongside the package covering fidelity invariants, pin policy, token accounting, budget resolution, fault emission, multimodal split rules, selector edge cases, and the rehydration path.
Replaces the unbounded ``self._history: list[BaseMessage]`` with a
``MemoryEngine`` that ingests every inbound message, hands the LLM an
assembled prompt that fits the active model's context window, and
emits fault events whenever pages are degraded or evicted to make room.
Changes:
- New ``McpClientConfig`` fields (all optional, default-equivalent to
the model's full budget):
- ``token_budget``: override the per-model context window.
- ``pin_recent_evidence`` (default 3): keep the N most recent image
artefacts at FULL fidelity.
- ``output_reserve_tokens`` (default 4096): carved out of the
context window for the model's response.
- New ``faults: Out[FaultEvent]`` stream on the module so operators
can observe memory-layer events alongside the existing ``agent``
and ``agent_idle`` streams.
- ``on_system_modules`` now appends the engine's ``get_artefact``
StructuredTool to the agent's tool list so the LLM can rehydrate a
degraded EVIDENCE page back to FULL on demand.
- ``_process_message`` ingests the inbound message through the engine
and streams the assembled ``list[BaseMessage]`` into the state graph
instead of the raw history.
- ``_thread_loop`` wraps ``_process_message`` in ``try/except`` so a
turn-level failure no longer kills the worker thread silently; the
exception is forwarded to the fault stream via
``emit_physical_insufficiency`` and the loop continues draining
queued messages.
Fixes the original motivating bug: once the worker thread died on a
``context_length_exceeded`` or any other turn-level error, subsequent
messages piled up in the queue unprocessed and ``agent_idle`` stayed
at ``False`` forever.
…hread recovery Adds two integration tests that drive a real ``MemoryEngine`` from a bypass-constructed ``McpClient`` (no MCP server needed). - ``test_history_compaction_keeps_recent_images_at_full_under_pressure``: ingests a system bootstrap + three camera-snapshot images interleaved with 100 chatty text turns under a tight 8k token budget. Asserts that (1) the assembled prompt respects ``budget.effective_budget_for_messages(n_surviving)``, (2) the three most recent EVIDENCE pages stay at FULL fidelity, (3) older CONVERSATION pages get degraded and a PAGE_DEGRADED fault is emitted, and (4) ``physical_insufficient`` stays False. - ``test_thread_loop_recovers_from_process_message_exception``: regression guard for the motivating bug. Enqueues three messages and monkeypatches ``_process_message`` to raise on the second. Asserts that all three messages are processed (the thread survived), a PHYSICAL_INSUFFICIENCY fault carrying ``details["exception"]`` was emitted, ``agent_idle`` returns to True, and the worker exits cleanly on shutdown.
Adds a new ``Context Memory`` subsection under the Agent System block covering: the original unbounded-history problem, the paged multi-fidelity memory layer, the four fidelity rungs, the pin policy, the ``McpClient`` configuration knobs, the fault event taxonomy, the ``get_artefact`` tool, and the ``_thread_loop`` recovery contract. Also adds a Further Reading bullet pointing at the ``dimos.agents.memory`` package.
Greptile SummaryThis PR replaces the unbounded
Confidence Score: 3/5Not safe to merge as-is: the One P1 defect present: the
Important Files Changed
Sequence DiagramsequenceDiagram
participant Q as MessageQueue
participant TL as _thread_loop
participant PM as _process_message
participant ME as MemoryEngine
participant PT as PageTable
participant SEL as assemble_prompt
participant LG as LangGraph
Q->>TL: message (dequeued)
TL->>PM: _process_message(state_graph, message)
PM->>ME: ingest(message)
ME->>PT: add(page(s))
ME->>PT: rebalance_evidence_pins()
PT-->>ME: PinRebalance diff
ME-->>PM: list[Page]
PM->>ME: assemble()
ME->>PT: ordered()
ME->>SEL: assemble_prompt(pages, budget)
SEL-->>ME: AssembledPrompt
ME-->>PM: AssembledPrompt
PM->>LG: state_graph.stream(messages)
LG-->>PM: streamed node outputs
PM->>ME: ingest(response_msg)
PM->>PM: finally → agent_idle.publish(True)
TL->>TL: except → emit_physical_insufficiency()
|
| self, | ||
| state_graph: CompiledStateGraph[Any, Any, Any, Any], | ||
| message: BaseMessage, | ||
| ) -> None: | ||
| self.agent_idle.publish(False) | ||
| self._history.append(message) | ||
| # Ingest the inbound message into the memory engine; this becomes | ||
| # a Page (or several, for multimodal) and drives the next | ||
| # assemble. Edge case: under a physically-insufficient budget | ||
| # the fresh input itself may be degraded. The engine emits a | ||
| # PHYSICAL_INSUFFICIENCY fault in that case. | ||
| self._engine.ingest(message) | ||
| pretty_print_langchain_message(message) | ||
| self.agent.publish(message) | ||
|
|
||
| for update in state_graph.stream({"messages": self._history}, stream_mode="updates"): | ||
| for node_output in update.values(): | ||
| for msg in node_output.get("messages", []): | ||
| self._history.append(msg) | ||
| pretty_print_langchain_message(msg) | ||
| self.agent.publish(msg) | ||
|
|
||
| if self._message_queue.empty(): | ||
| self.agent_idle.publish(True) | ||
|
|
||
|
|
||
| def _append_image_to_history( | ||
| assembled = self._engine.assemble() | ||
| try: | ||
| for update in state_graph.stream( | ||
| {"messages": assembled.messages}, stream_mode="updates" | ||
| ): | ||
| for node_output in update.values(): | ||
| for msg in node_output.get("messages", []): | ||
| self._engine.ingest(msg) | ||
| pretty_print_langchain_message(msg) | ||
| self.agent.publish(msg) | ||
| finally: | ||
| if self._message_queue.empty(): | ||
| self.agent_idle.publish(True) | ||
|
|
||
|
|
||
| def _queue_image_artefact_message( | ||
| mcp_client: McpClient, func_name: str, uuid_: str, result: Any | ||
| ) -> None: | ||
| """Queue a HumanMessage containing an image artefact returned by a | ||
| non-text tool result. | ||
|
|
||
| Ingestion splits this multimodal message into a CONVERSATION page | ||
| (with the preamble text) plus one EVIDENCE page per image artefact | ||
| automatically — see ``dimos.agents.memory.ingestion``. The caller | ||
| doesn't need to set a page type hint. | ||
| """ | ||
| mcp_client.add_message( | ||
| HumanMessage( | ||
| content=[ |
There was a problem hiding this comment.
agent_idle stuck at False when ingest raises
The try/finally that resets agent_idle only wraps the state_graph.stream(...) call. If self._engine.ingest(message) or self._engine.assemble() raises — for example, ingestion._role_of raises TypeError for any BaseMessage subclass beyond the four explicit cases (FunctionMessage, etc.) — the exception propagates out of _process_message before the try block is ever entered. _thread_loop catches it (preventing thread death), but agent_idle remains False forever. This is the same stuck-idle symptom the PR intends to cure, re-introduced for a narrower but still real failure mode.
Move the finally to cover the entire post-publish(False) body:
def _process_message(
self,
state_graph: CompiledStateGraph[Any, Any, Any, Any],
message: BaseMessage,
) -> None:
self.agent_idle.publish(False)
try:
self._engine.ingest(message)
pretty_print_langchain_message(message)
self.agent.publish(message)
assembled = self._engine.assemble()
for update in state_graph.stream(
{"messages": assembled.messages}, stream_mode="updates"
):
for node_output in update.values():
for msg in node_output.get("messages", []):
self._engine.ingest(msg)
pretty_print_langchain_message(msg)
self.agent.publish(msg)
finally:
if self._message_queue.empty():
self.agent_idle.publish(True)| def _default_provenance(msg: BaseMessage) -> str: | ||
| if isinstance(msg, SystemMessage): | ||
| return "system" | ||
| if isinstance(msg, HumanMessage): | ||
| return "user_input" | ||
| if isinstance(msg, AIMessage): | ||
| return "llm_response" | ||
| if isinstance(msg, ToolMessage): | ||
| return "tool_response" |
There was a problem hiding this comment.
_role_of raises TypeError for unrecognised message types
_role_of has an explicit raise TypeError(...) fall-through for any BaseMessage subclass beyond System/Human/AI/Tool. LangChain occasionally surfaces FunctionMessage or vendor-specific subclasses. When that happens, engine.ingest() raises, escaping _process_message before the try/finally that resets agent_idle. Combined with the companion comment on _process_message, this path is the concrete trigger for the stuck-idle bug.
Consider adding a fallback that maps unknown subclasses to "human" (or emits a structured warning and skips ingestion) rather than raising, so a single unexpected message type doesn't stall the agent.
| system_overhead: int = DEFAULT_SYSTEM_OVERHEAD, | ||
| per_message_overhead: int = DEFAULT_PER_MESSAGE_OVERHEAD, | ||
| ) -> ModelBudget: | ||
| """Resolve a :class:`ModelBudget` for *model_name*. | ||
|
|
||
| All arguments after ``model_name`` are keyword-only; callers must pass | ||
| ``override=...`` etc. explicitly. | ||
|
|
||
| Args: | ||
| model_name: Provider-qualified model string (e.g. ``"gpt-4o"`` or | ||
| ``"ollama:llama3.2:latest"``). | ||
| override: If set, use this as the ``context_window`` instead of the | ||
| table lookup. Useful when operators want to keep prompts | ||
| artificially small for cost / latency control. | ||
| output_reserve: Tokens held back for the response. | ||
| system_overhead: Formatting / tool-schema slack. | ||
| per_message_overhead: Per-message role/framing cost. See | ||
| :class:`ModelBudget`. |
There was a problem hiding this comment.
_normalize_model comment overstates what it strips
The docstring says it drops "OpenAI -YYYY-MM-DD date suffixes", but the implementation only strips the :tag suffix. Date-stamped strings like "gpt-4o-2024-08-06" pass through unchanged and are resolved by the longest-prefix fallback in resolve_budget. The fallback works correctly, but the mismatch makes the intended contract ambiguous for future editors. Either update the comment or add the date-suffix strip here.
TL;DR
Replaces the unbounded
list[BaseMessage]history inMcpClientwith apaged, multi-fidelity memory layer that keeps the assembled prompt inside
the active model's context window. The worker thread also recovers from
turn-level exceptions instead of dying silently, which was the original
user-visible symptom (messages piling up in the queue with
agent_idlestuck at
False).Closes #1899.
Before / after
Before: every
BaseMessagethe agent saw was appended toself._historyand replayed in full on every turn. As the conversation grew — especially
with image artefacts — the request eventually exceeded the model's context
window. OpenAI returned
context_length_exceeded, the exception bubbledout of
_process_message, the worker thread died, and every subsequentenqueued message sat unprocessed forever.
After:
McpClientowns aMemoryEnginethat converts each inboundmessage into one or more typed
Pageobjects, each storing four fidelityrepresentations (
POINTER,STRUCTURED,COMPRESSED,FULL). Atwo-phase greedy selector chooses per-page fidelities to fit the active
model's token budget. Recent image artefacts are pinned at
FULL; olderpages gracefully degrade. The worker thread wraps each turn in
try/except, emits aPHYSICAL_INSUFFICIENCYfault, and keeps drainingthe queue.
Usage
Existing callers get the new behaviour automatically with sensible defaults
(full model window, 3 recent images pinned at FULL, 4096-token output
reserve). Three new optional knobs on
McpClientConfig:token_budgetNonepin_recent_evidence3FULL.output_reserve_tokens4096Plus a new
faults: Out[FaultEvent]stream on the module forobservability (eviction, fidelity-drop, rehydrate, and
physical-insufficiency events flow through it alongside
structlog).Component walkthrough
Everything new lives under
dimos/agents/memory/. Each file is a singleresponsibility; the public surface is deliberately small.
pages.pyPagedataclass, fidelity enum, page-type enum. Invariants enforced in__post_init__.page_table.pytokens.pyTokenCounterprotocol +TiktokenCounter(OpenAI) andHeuristicCounter(fallback).budget.pyModelBudget: per-model context window, output reserve, per-message overhead.faults.pyFaultEvent+FaultObserver(routes tostructlogand an optionalOutstream).ingestion.pyingest_message: turns aBaseMessageintolist[Page], with multimodal split rules.selector.pyassemble_promptrespecting pins and the effective token budget.engine.pyMemoryEnginefacade: ingestion + assembly + pin rebalancing + fault emission + rehydration.artefact_tool.pybuild_get_artefact_tool: LLM-callable rehydration of a degradedEVIDENCEpage back toFULL.Integration is a single-file change in
dimos/agents/mcp/mcp_client.py:replace
self._historywithself._engine, feedstate_graph.streamfromengine.assemble(), add theget_artefacttool to the agent's tool list,and wrap
_process_messageintry/exceptinside_thread_loop.Testing
170 tests ship with this PR (all green):
dimos/agents/memory/test_*.py— cover fidelityinvariants, pin policy, token accounting contracts, budget resolution,
fault emission, multimodal split rules, selector edge cases, and the
rehydration path.
dimos/agents/mcp/test_mcp_client_history_compaction.py:test_history_compaction_keeps_recent_images_at_full_under_pressuredrives the real
MemoryEnginefrom a bypass-constructedMcpClientunder an 8k-token budget and verifies the effective-cap,
recent-image-pinning, and fault-emission contracts.
test_thread_loop_recovers_from_process_message_exceptionis theregression guard for the motivating bug: makes the second of three
turns raise and verifies all three are processed, a
PHYSICAL_INSUFFICIENCYfault is emitted, and the worker exitscleanly.
Run them with:
uv run pytest dimos/agents/memory/ \ dimos/agents/mcp/test_mcp_client_history_compaction.pyNot included in this PR
dimos.agents.agent.Agentwas removedupstream before this work landed. Only
McpClientis wired up.integration tests bypass-construct
McpClientand drive the enginedirectly, which is fast and deterministic but does not exercise the
full request round-trip.
ModelBudgettable is hardcodedfor current OpenAI/Anthropic/Gemini/Ollama model names (with
longest-prefix matching). Add new entries in
budget.pyas models ship.Reviewer test plan
dimos/agents/memory/engine.pyanddimos/agents/memory/selector.py— these are the two mostarchitecturally load-bearing files.
dimos/agents/memory/ingestion.pymultimodal split rules(the
ingest_messagedocstring lists the full matrix).McpClientConfigknobs are optional anddefault-equivalent to the old full-history behaviour for small
conversations.
unitree-go2-agentic)against a long conversation with image artefacts to watch the
fault stream in action.
Files changed
dimos/agents/memory/— 10 production files + 9 unit test files (new)dimos/agents/mcp/mcp_client.py— modifieddimos/agents/mcp/test_mcp_client_history_compaction.py— newAGENTS.md— newContext Memorysubsectionpyproject.toml—tiktoken>=0.8.0added to theagentsextrauv.lock— lockfile update fortiktokenTotal: 24 files, ~6.3k insertions / ~18 deletions across 4 commits.
Contributor License Agreement
Made with Cursor