From 8500a342da5ff2960bb5e386a23749cad25fa379 Mon Sep 17 00:00:00 2001 From: Vineeth Voruganti <13438633+VVoruganti@users.noreply.github.com> Date: Wed, 20 May 2026 22:42:52 -0400 Subject: [PATCH 1/5] feat: Add queue work unit endpoint --- src/crud/__init__.py | 3 +- src/crud/deriver.py | 278 ++++++++++++++++++++++++++---- src/routers/workspaces.py | 37 ++++ src/schemas/__init__.py | 4 + src/schemas/api.py | 87 ++++++++++ src/schemas/internal.py | 8 + tests/routes/test_queue_status.py | 82 +++++++++ 7 files changed, 467 insertions(+), 32 deletions(-) diff --git a/src/crud/__init__.py b/src/crud/__init__.py index 58bcd9f01..011b2a0b8 100644 --- a/src/crud/__init__.py +++ b/src/crud/__init__.py @@ -3,7 +3,7 @@ get_or_create_collection, update_collection_internal_metadata, ) -from .deriver import get_deriver_status, get_queue_status +from .deriver import get_deriver_status, get_queue_status, get_queue_work_units from .document import ( create_documents, create_observations, @@ -82,6 +82,7 @@ # Deriver "get_deriver_status", "get_queue_status", + "get_queue_work_units", # Document "create_documents", "create_observations", diff --git a/src/crud/deriver.py b/src/crud/deriver.py index 0852a4792..898e6ae2a 100644 --- a/src/crud/deriver.py +++ b/src/crud/deriver.py @@ -2,15 +2,23 @@ from logging import getLogger from typing import Any -from sqlalchemy import Select, case, func, or_, select +from sqlalchemy import Select, and_, case, false, func, or_, select from sqlalchemy.engine import Row from sqlalchemy.ext.asyncio import AsyncSession from src import models, schemas +from src.config import settings logger = getLogger(__name__) +# Task types surfaced by the queue status endpoint. +_TRACKED_TASK_TYPES = ("representation", "summary", "dream") + +# Only this task type is gated by DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. +_THRESHOLD_GATED_TASK_TYPE = "representation" + + async def get_queue_status( db: AsyncSession, workspace_name: str, @@ -25,6 +33,11 @@ async def get_queue_status( Only tracks user-facing task types: representation, summary, and dream. Internal infrastructure tasks (reconciler, webhook, deletion) are excluded. + Pending work units are further split into "stalled" (representation work + units below DERIVER_REPRESENTATION_BATCH_MAX_TOKENS) vs "ready" (everything + else). When DERIVER_FLUSH_ENABLED is true, the threshold is bypassed and + nothing is stalled. + Note: completed_work_units reflects items since the last periodic queue cleanup, not lifetime totals. @@ -75,8 +88,78 @@ async def get_deriver_status( ) -# Task types surfaced by the queue status endpoint. -_TRACKED_TASK_TYPES = ("representation", "summary", "dream") +async def get_queue_work_units( + db: AsyncSession, + workspace_name: str, + session_name: str | None = None, + *, + observer: str | None = None, + observed: str | None = None, +) -> schemas.QueueWorkUnitsResponse: + """ + Return one row per unprocessed work unit in the queue, with token totals, + in-progress flag, and threshold classification. + + Useful for debugging "why isn't this work unit advancing?" — distinguishes + work units stalled below DERIVER_REPRESENTATION_BATCH_MAX_TOKENS from those + that are claimed by a worker or eligible to be claimed. + + Same filter semantics as get_queue_status: observer/observed match the + queue item payload, session_name filters via the sessions table. + """ + normalized_observer = observer if observer else None + normalized_observed = observed if observed else None + normalized_session_name = session_name if session_name else None + + batch_max_tokens = settings.DERIVER.REPRESENTATION_BATCH_MAX_TOKENS + flush_enabled = settings.DERIVER.FLUSH_ENABLED + + stmt = _build_queue_work_units_query( + workspace_name, + normalized_session_name, + observer=normalized_observer, + observed=normalized_observed, + ) + result = await db.execute(stmt) + rows = result.fetchall() + + work_units: list[schemas.QueueWorkUnit] = [] + for row in rows: + threshold_gated = ( + row.task_type == _THRESHOLD_GATED_TASK_TYPE + and not flush_enabled + and batch_max_tokens > 0 + ) + if threshold_gated: + hit_threshold = row.pending_tokens >= batch_max_tokens + tokens_until_threshold = max(batch_max_tokens - row.pending_tokens, 0) + else: + hit_threshold = True + tokens_until_threshold = 0 + + work_units.append( + schemas.QueueWorkUnit( + work_unit_key=row.work_unit_key, + task_type=row.task_type, + session_id=row.session_id, + session_name=row.session_name, + observer=row.observer, + observed=row.observed, + pending_items=row.pending_items, + pending_tokens=row.pending_tokens, + tokens_until_threshold=tokens_until_threshold, + hit_threshold=hit_threshold, + in_progress=bool(row.in_progress), + oldest_item_at=row.oldest_item_at, + newest_item_at=row.newest_item_at, + ) + ) + + return schemas.QueueWorkUnitsResponse( + representation_batch_max_tokens=batch_max_tokens, + flush_enabled=flush_enabled, + work_units=work_units, + ) def _build_queue_status_query( @@ -86,56 +169,177 @@ def _build_queue_status_query( observer: str | None = None, observed: str | None = None, ) -> Select[Any]: - """Build SQL query for queue status with validation and aggregation.""" + """Build SQL query for queue status with validation and aggregation. + + Two-layer structure: an inner per-queue-item subquery joins messages to + compute the per-work_unit_key pending-token sum (a window function), then + the outer query classifies each row and tallies overall + per-session + counts via additional window functions. + """ observer_name_expr = models.QueueItem.payload["observer"].astext observed_name_expr = models.QueueItem.payload["observed"].astext - # Define conditions for cleaner window functions - is_completed = models.QueueItem.processed - is_in_progress = (~models.QueueItem.processed) & ( - models.ActiveQueueSession.id.isnot(None) - ) - is_pending = (~models.QueueItem.processed) & ( + inner_is_in_progress = models.ActiveQueueSession.id.isnot(None) + inner_is_pending = (~models.QueueItem.processed) & ( models.ActiveQueueSession.id.is_(None) ) - # Use window functions to calculate totals and per-session counts in SQL + # Per-work_unit_key sum of token_count, restricted to pending items. + # Computed in the inner subquery because window functions cannot reference + # each other directly in the outer SELECT. + pending_tokens_per_wuk = func.sum( + case( + (inner_is_pending, func.coalesce(models.Message.token_count, 0)), + else_=0, + ) + ).over(partition_by=models.QueueItem.work_unit_key) + + inner = ( + select( + models.QueueItem.session_id.label("session_id"), + models.QueueItem.task_type.label("task_type"), + models.QueueItem.processed.label("processed"), + inner_is_in_progress.label("is_in_progress_flag"), + pending_tokens_per_wuk.label("wuk_pending_tokens"), + ) + .select_from(models.QueueItem) + .outerjoin( + models.ActiveQueueSession, + models.QueueItem.work_unit_key == models.ActiveQueueSession.work_unit_key, + ) + .outerjoin( + models.Message, + models.QueueItem.message_id == models.Message.id, + ) + ) + + inner = inner.where(models.QueueItem.workspace_name == workspace_name) + inner = inner.where(models.QueueItem.task_type.in_(_TRACKED_TASK_TYPES)) + + if session_name is not None: + inner = inner.join( + models.Session, models.QueueItem.session_id == models.Session.id + ) + inner = inner.where(models.Session.name == session_name) + + peer_conditions = [] + if observer is not None: + peer_conditions.append(observer_name_expr == observer) # pyright: ignore + if observed is not None: + peer_conditions.append(observed_name_expr == observed) # pyright: ignore + if peer_conditions: + inner = inner.where(or_(*peer_conditions)) # pyright: ignore + + inner_subq = inner.subquery() + + # Outer classification + is_completed = inner_subq.c.processed + is_in_progress = (~inner_subq.c.processed) & inner_subq.c.is_in_progress_flag + is_pending = (~inner_subq.c.processed) & ~inner_subq.c.is_in_progress_flag + + batch_max_tokens = settings.DERIVER.REPRESENTATION_BATCH_MAX_TOKENS + flush_enabled = settings.DERIVER.FLUSH_ENABLED + + if not flush_enabled and batch_max_tokens > 0: + is_stalled = and_( + is_pending, + inner_subq.c.task_type == _THRESHOLD_GATED_TASK_TYPE, + inner_subq.c.wuk_pending_tokens < batch_max_tokens, + ) + else: + is_stalled = false() + + is_pending_ready = and_(is_pending, ~is_stalled) + stmt = select( - models.QueueItem.session_id, - # Overall totals using window functions + inner_subq.c.session_id, + # Overall totals func.count().over().label("total"), func.count(case((is_completed, 1))).over().label("completed"), func.count(case((is_in_progress, 1))).over().label("in_progress"), func.count(case((is_pending, 1))).over().label("pending"), - # Per-session totals using partitioned window functions - func.count() - .over(partition_by=models.QueueItem.session_id) - .label("session_total"), + func.count(case((is_stalled, 1))).over().label("pending_stalled"), + func.count(case((is_pending_ready, 1))).over().label("pending_ready"), + # Per-session totals + func.count().over(partition_by=inner_subq.c.session_id).label("session_total"), func.count(case((is_completed, 1))) - .over(partition_by=models.QueueItem.session_id) + .over(partition_by=inner_subq.c.session_id) .label("session_completed"), func.count(case((is_in_progress, 1))) - .over(partition_by=models.QueueItem.session_id) + .over(partition_by=inner_subq.c.session_id) .label("session_in_progress"), func.count(case((is_pending, 1))) - .over(partition_by=models.QueueItem.session_id) + .over(partition_by=inner_subq.c.session_id) .label("session_pending"), - ).select_from(models.QueueItem) - - stmt = stmt.outerjoin( - models.ActiveQueueSession, - models.QueueItem.work_unit_key == models.ActiveQueueSession.work_unit_key, + func.count(case((is_stalled, 1))) + .over(partition_by=inner_subq.c.session_id) + .label("session_pending_stalled"), + func.count(case((is_pending_ready, 1))) + .over(partition_by=inner_subq.c.session_id) + .label("session_pending_ready"), ) - stmt = stmt.where(models.QueueItem.workspace_name == workspace_name) + return stmt - # Only include user-facing task types - stmt = stmt.where(models.QueueItem.task_type.in_(_TRACKED_TASK_TYPES)) - if session_name is not None: - stmt = stmt.join( - models.Session, models.QueueItem.session_id == models.Session.id +def _build_queue_work_units_query( + workspace_name: str, + session_name: str | None, + *, + observer: str | None = None, + observed: str | None = None, +) -> Select[Any]: + """One row per unprocessed work_unit_key, aggregating queue items + tokens.""" + observer_name_expr = models.QueueItem.payload["observer"].astext + observed_name_expr = models.QueueItem.payload["observed"].astext + + stmt = ( + select( + models.QueueItem.work_unit_key.label("work_unit_key"), + models.QueueItem.task_type.label("task_type"), + models.QueueItem.session_id.label("session_id"), + models.Session.name.label("session_name"), + func.min(observer_name_expr).label("observer"), + func.min(observed_name_expr).label("observed"), + func.count().label("pending_items"), + func.coalesce( + func.sum(func.coalesce(models.Message.token_count, 0)), 0 + ).label("pending_tokens"), + func.min(models.QueueItem.created_at).label("oldest_item_at"), + func.max(models.QueueItem.created_at).label("newest_item_at"), + func.bool_or(models.ActiveQueueSession.id.isnot(None)).label("in_progress"), + ) + .select_from(models.QueueItem) + .outerjoin( + models.ActiveQueueSession, + models.QueueItem.work_unit_key == models.ActiveQueueSession.work_unit_key, + ) + .outerjoin( + models.Message, + models.QueueItem.message_id == models.Message.id, + ) + .outerjoin( + models.Session, + models.QueueItem.session_id == models.Session.id, ) + .where(models.QueueItem.workspace_name == workspace_name) + .where(models.QueueItem.task_type.in_(_TRACKED_TASK_TYPES)) + .where(~models.QueueItem.processed) + .group_by( + models.QueueItem.work_unit_key, + models.QueueItem.task_type, + models.QueueItem.session_id, + models.Session.name, + ) + .order_by( + # Stalled-first ordering would require a HAVING-style classification; + # instead order by oldest pending so debugging surfaces oldest stuck + # work first. + func.min(models.QueueItem.created_at) + ) + ) + + if session_name is not None: stmt = stmt.where(models.Session.name == session_name) peer_conditions = [] @@ -157,6 +361,8 @@ def _process_queue_rows(rows: Sequence[Row[Any]]) -> schemas.QueueCounts: completed=0, in_progress=0, pending=0, + pending_stalled=0, + pending_ready=0, sessions={}, ) @@ -174,6 +380,8 @@ def _process_queue_rows(rows: Sequence[Row[Any]]) -> schemas.QueueCounts: completed=row.session_completed, in_progress=row.session_in_progress, pending=row.session_pending, + pending_stalled=row.session_pending_stalled, + pending_ready=row.session_pending_ready, ) seen_sessions.add(row.session_id) @@ -182,6 +390,8 @@ def _process_queue_rows(rows: Sequence[Row[Any]]) -> schemas.QueueCounts: completed=first_row.completed, in_progress=first_row.in_progress, pending=first_row.pending, + pending_stalled=first_row.pending_stalled, + pending_ready=first_row.pending_ready, sessions=sessions, ) @@ -198,6 +408,8 @@ def _build_status_response( completed_work_units=counts.completed, in_progress_work_units=counts.in_progress, pending_work_units=counts.pending, + pending_stalled_work_units=counts.pending_stalled, + pending_ready_work_units=counts.pending_ready, ) sessions: dict[str, schemas.SessionQueueStatus] = {} @@ -209,6 +421,8 @@ def _build_status_response( completed_work_units=data.completed, in_progress_work_units=data.in_progress, pending_work_units=data.pending, + pending_stalled_work_units=data.pending_stalled, + pending_ready_work_units=data.pending_ready, ) return schemas.QueueStatus( @@ -217,4 +431,6 @@ def _build_status_response( completed_work_units=counts.completed, in_progress_work_units=counts.in_progress, pending_work_units=counts.pending, + pending_stalled_work_units=counts.pending_stalled, + pending_ready_work_units=counts.pending_ready, ) diff --git a/src/routers/workspaces.py b/src/routers/workspaces.py index 2ce7cdda4..46b91cbc3 100644 --- a/src/routers/workspaces.py +++ b/src/routers/workspaces.py @@ -190,6 +190,43 @@ async def get_queue_status( raise HTTPException(status_code=400, detail=str(e)) from e +@router.get( + "/{workspace_id}/queue/work-units", + response_model=schemas.QueueWorkUnitsResponse, + dependencies=[Depends(require_auth(workspace_name="workspace_id"))], +) +async def get_queue_work_units( + workspace_id: str = Path(...), + observer_id: str | None = Query( + None, description="Optional observer ID to filter by" + ), + sender_id: str | None = Query(None, description="Optional sender ID to filter by"), + session_id: str | None = Query( + None, description="Optional session ID to filter by" + ), + db: AsyncSession = db, +): + """ + Return one row per unprocessed work unit in the Workspace's queue, with + token totals, in-progress flag, and threshold classification. + + Useful for debugging "why isn't this work unit advancing?" — distinguishes + work units stalled below the batch token threshold from those claimed by a + worker or eligible to be claimed. Same filter semantics as /queue/status. + """ + try: + return await crud.get_queue_work_units( + db, + workspace_name=workspace_id, + session_name=session_id, + observer=observer_id, + observed=sender_id, + ) + except ValueError as e: + logger.warning(f"Invalid request parameters: {str(e)}") + raise HTTPException(status_code=400, detail=str(e)) from e + + @router.post( "/{workspace_id}/schedule_dream", status_code=204, diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py index 85d5da44d..b86675f2f 100644 --- a/src/schemas/__init__.py +++ b/src/schemas/__init__.py @@ -33,6 +33,8 @@ PeerRepresentationGet, PeerUpdate, QueueStatus, + QueueWorkUnit, + QueueWorkUnitsResponse, RepresentationResponse, ScheduleDreamRequest, Session, @@ -128,6 +130,8 @@ "PeerRepresentationGet", "PeerUpdate", "QueueStatus", + "QueueWorkUnit", + "QueueWorkUnitsResponse", "RESOURCE_NAME_PATTERN", "RepresentationResponse", "ScheduleDreamRequest", diff --git a/src/schemas/api.py b/src/schemas/api.py index 237c4e143..b9deb3998 100644 --- a/src/schemas/api.py +++ b/src/schemas/api.py @@ -611,6 +611,23 @@ class SessionQueueStatus(BaseModel): description="Work units currently being processed" ) pending_work_units: int = Field(description="Work units waiting to be processed") + pending_stalled_work_units: int = Field( + default=0, + description=( + "Pending representation work units waiting to accumulate enough " + "tokens to hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 " + "when DERIVER_FLUSH_ENABLED is true." + ), + ) + pending_ready_work_units: int = Field( + default=0, + description=( + "Pending work units eligible to be claimed: non-representation " + "task types, plus representation work units whose pending tokens " + "are at/above the batch threshold (or when flush is enabled). " + "pending_stalled_work_units + pending_ready_work_units == pending_work_units." + ), + ) class QueueStatus(BaseModel): @@ -631,12 +648,82 @@ class QueueStatus(BaseModel): description="Work units currently being processed" ) pending_work_units: int = Field(description="Work units waiting to be processed") + pending_stalled_work_units: int = Field( + default=0, + description=( + "Pending representation work units waiting to accumulate enough " + "tokens to hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 " + "when DERIVER_FLUSH_ENABLED is true." + ), + ) + pending_ready_work_units: int = Field( + default=0, + description=( + "Pending work units eligible to be claimed: non-representation " + "task types, plus representation work units whose pending tokens " + "are at/above the batch threshold (or when flush is enabled). " + "pending_stalled_work_units + pending_ready_work_units == pending_work_units." + ), + ) sessions: dict[str, SessionQueueStatus] | None = Field( default=None, description="Per-session status when not filtered by session", ) +class QueueWorkUnit(BaseModel): + """Per-work-unit breakdown returned by /queue/work-units. + + `hit_threshold` and `tokens_until_threshold` apply only to representation + work units (the only task type gated by DERIVER_REPRESENTATION_BATCH_MAX_TOKENS). + For other task types, `hit_threshold` is always True and + `tokens_until_threshold` is 0. + """ + + work_unit_key: str + task_type: str + session_id: str | None = None + session_name: str | None = None + observer: str | None = None + observed: str | None = None + pending_items: int = Field(description="Unprocessed queue items in this work unit") + pending_tokens: int = Field( + description="Sum of token_count across messages on unprocessed queue items" + ) + tokens_until_threshold: int = Field( + description=( + "Tokens still needed before the deriver will claim this batch. " + "0 for non-representation task types and when flush is enabled." + ) + ) + hit_threshold: bool = Field( + description=( + "True if this work unit is eligible to be claimed (not threshold-" + "gated, threshold met, or flush enabled). False means stalled." + ) + ) + in_progress: bool = Field( + description="True if a deriver worker has claimed this work unit" + ) + oldest_item_at: datetime.datetime + newest_item_at: datetime.datetime + + +class QueueWorkUnitsResponse(BaseModel): + """Response for /queue/work-units — per-work-unit listing of unprocessed queue items.""" + + representation_batch_max_tokens: int = Field( + description="DERIVER_REPRESENTATION_BATCH_MAX_TOKENS at the time of the request" + ) + flush_enabled: bool = Field( + description=( + "DERIVER_FLUSH_ENABLED. When true, the batch threshold is bypassed " + "and all pending representation work units are eligible to be claimed." + ) + ) + work_units: list[QueueWorkUnit] + + # --------------------------------------------------------------------------- # Dream scheduling schemas # --------------------------------------------------------------------------- diff --git a/src/schemas/internal.py b/src/schemas/internal.py index e014431f1..e3fbeea08 100644 --- a/src/schemas/internal.py +++ b/src/schemas/internal.py @@ -132,6 +132,8 @@ class SessionCounts(BaseModel): completed: int in_progress: int pending: int + pending_stalled: int = 0 + pending_ready: int = 0 class QueueCounts(BaseModel): @@ -141,6 +143,8 @@ class QueueCounts(BaseModel): completed: int in_progress: int pending: int + pending_stalled: int = 0 + pending_ready: int = 0 sessions: dict[str, SessionCounts] @@ -152,10 +156,14 @@ class QueueStatusRow(BaseModel): completed: int in_progress: int pending: int + pending_stalled: int + pending_ready: int session_total: int session_completed: int session_in_progress: int session_pending: int + session_pending_stalled: int + session_pending_ready: int # --------------------------------------------------------------------------- diff --git a/tests/routes/test_queue_status.py b/tests/routes/test_queue_status.py index 279ad913d..c52a137fe 100644 --- a/tests/routes/test_queue_status.py +++ b/tests/routes/test_queue_status.py @@ -356,3 +356,85 @@ async def test_get_deriver_status_response_consistency( responses.append(response.json()) # pyright: ignore # Check consistency assert all(r == responses[0] for r in responses) # pyright: ignore + + +@pytest.mark.asyncio +class TestQueueWorkUnitsEndpoint: + """Test suite for the /queue/work-units endpoint""" + + async def test_empty_workspace_returns_empty_work_units( + self, + client: TestClient, + sample_data: tuple[models.Workspace, models.Peer], + ): + workspace, _ = sample_data + response = client.get(f"/v3/workspaces/{workspace.name}/queue/work-units") + assert response.status_code == 200 + body = response.json() + assert body["work_units"] == [] + assert body["representation_batch_max_tokens"] > 0 + assert "flush_enabled" in body + + async def test_pending_representation_below_threshold_is_stalled( + self, + client: TestClient, + db_session: AsyncSession, + sample_data: tuple[models.Workspace, models.Peer], + ): + """Default token_count is 0 → below threshold → stalled (when flush disabled).""" + workspace, peer = sample_data + session = models.Session(workspace_name=workspace.name, name="wu_session") + db_session.add(session) + await db_session.commit() + await db_session.refresh(session) + + payload = { + "observed": peer.name, + "observer": peer.name, + "task_type": "representation", + "workspace_name": workspace.name, + "session_name": session.name, + } + for _ in range(3): + db_session.add( + models.QueueItem( + session_id=session.id, + task_type="representation", + work_unit_key=construct_work_unit_key(workspace.name, payload), + payload=payload, + processed=False, + workspace_name=workspace.name, + ) + ) + await db_session.commit() + + # Check the new aggregate fields on /queue/status + status_resp = client.get(f"/v3/workspaces/{workspace.name}/queue/status") + assert status_resp.status_code == 200 + status_body = status_resp.json() + assert status_body["pending_work_units"] == 3 + # With default settings (FLUSH_ENABLED=False, threshold=1024) and 0 tokens, + # all 3 are stalled below the threshold + assert status_body["pending_stalled_work_units"] == 3 + assert status_body["pending_ready_work_units"] == 0 + assert ( + status_body["pending_stalled_work_units"] + + status_body["pending_ready_work_units"] + == status_body["pending_work_units"] + ) + + # Check per-work-unit endpoint + wu_resp = client.get(f"/v3/workspaces/{workspace.name}/queue/work-units") + assert wu_resp.status_code == 200 + wu_body = wu_resp.json() + assert len(wu_body["work_units"]) == 1 # 3 items collapse into 1 work unit + work_unit = wu_body["work_units"][0] + assert work_unit["task_type"] == "representation" + assert work_unit["pending_items"] == 3 + assert work_unit["pending_tokens"] == 0 + assert work_unit["hit_threshold"] is False + assert work_unit["in_progress"] is False + assert ( + work_unit["tokens_until_threshold"] + == wu_body["representation_batch_max_tokens"] + ) From 147edc91a10ad381f3a6e113ed291b376e39d7f1 Mon Sep 17 00:00:00 2001 From: Vineeth Voruganti <13438633+VVoruganti@users.noreply.github.com> Date: Wed, 20 May 2026 23:47:29 -0400 Subject: [PATCH 2/5] feat(queue): cursor-paginate /queue/work-units endpoint Swap the per-work-unit listing to cursor pagination (CursorPage[T] via fastapi-pagination + sqlakeyset). Offset pagination would skip or duplicate rows as workers continuously claim/process items between page fetches; cursor pagination on (oldest_item_at, work_unit_key) is stable across these mutations. Threshold config (representation_batch_max_tokens, flush_enabled) is carried in the page envelope via additional_data. --- pyproject.toml | 1 + src/crud/__init__.py | 10 +++- src/crud/deriver.py | 85 ++++++++++++------------------- src/routers/workspaces.py | 46 +++++++++++++++-- src/schemas/__init__.py | 4 +- src/schemas/api.py | 16 ++++-- tests/routes/test_queue_status.py | 75 +++++++++++++++++++++++++-- uv.lock | 19 ++++++- 8 files changed, 187 insertions(+), 69 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a2913ea52..2041902dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "python-dotenv>=1.0.0", "sqlalchemy>=2.0.30", "fastapi-pagination>=0.14.2", + "sqlakeyset>=2.0.1745022416", "pgvector>=0.2.5", "sentry-sdk[anthropic,fastapi,sqlalchemy]>=2.3.1", "greenlet>=3.0.3", diff --git a/src/crud/__init__.py b/src/crud/__init__.py index 011b2a0b8..bc5172bc8 100644 --- a/src/crud/__init__.py +++ b/src/crud/__init__.py @@ -3,7 +3,12 @@ get_or_create_collection, update_collection_internal_metadata, ) -from .deriver import get_deriver_status, get_queue_status, get_queue_work_units +from .deriver import ( + classify_work_unit_row, + get_deriver_status, + get_queue_status, + get_queue_work_units_query, +) from .document import ( create_documents, create_observations, @@ -80,9 +85,10 @@ "get_or_create_collection", "update_collection_internal_metadata", # Deriver + "classify_work_unit_row", "get_deriver_status", "get_queue_status", - "get_queue_work_units", + "get_queue_work_units_query", # Document "create_documents", "create_observations", diff --git a/src/crud/deriver.py b/src/crud/deriver.py index 898e6ae2a..e623c8dac 100644 --- a/src/crud/deriver.py +++ b/src/crud/deriver.py @@ -88,21 +88,20 @@ async def get_deriver_status( ) -async def get_queue_work_units( - db: AsyncSession, +async def get_queue_work_units_query( workspace_name: str, session_name: str | None = None, *, observer: str | None = None, observed: str | None = None, -) -> schemas.QueueWorkUnitsResponse: +) -> Select[Any]: """ - Return one row per unprocessed work unit in the queue, with token totals, - in-progress flag, and threshold classification. + Build the Select statement for one row per unprocessed work unit, with + token totals, in-progress flag, observer/observed, and timestamps. - Useful for debugging "why isn't this work unit advancing?" — distinguishes - work units stalled below DERIVER_REPRESENTATION_BATCH_MAX_TOKENS from those - that are claimed by a worker or eligible to be claimed. + Returns a SQLAlchemy Select for cursor pagination via apaginate. The + router is responsible for computing per-row threshold classification + (hit_threshold, tokens_until_threshold) and wrapping the envelope. Same filter semantics as get_queue_status: observer/observed match the queue item payload, session_name filters via the sessions table. @@ -111,55 +110,34 @@ async def get_queue_work_units( normalized_observed = observed if observed else None normalized_session_name = session_name if session_name else None - batch_max_tokens = settings.DERIVER.REPRESENTATION_BATCH_MAX_TOKENS - flush_enabled = settings.DERIVER.FLUSH_ENABLED - - stmt = _build_queue_work_units_query( + return _build_queue_work_units_query( workspace_name, normalized_session_name, observer=normalized_observer, observed=normalized_observed, ) - result = await db.execute(stmt) - rows = result.fetchall() - work_units: list[schemas.QueueWorkUnit] = [] - for row in rows: - threshold_gated = ( - row.task_type == _THRESHOLD_GATED_TASK_TYPE - and not flush_enabled - and batch_max_tokens > 0 - ) - if threshold_gated: - hit_threshold = row.pending_tokens >= batch_max_tokens - tokens_until_threshold = max(batch_max_tokens - row.pending_tokens, 0) - else: - hit_threshold = True - tokens_until_threshold = 0 - - work_units.append( - schemas.QueueWorkUnit( - work_unit_key=row.work_unit_key, - task_type=row.task_type, - session_id=row.session_id, - session_name=row.session_name, - observer=row.observer, - observed=row.observed, - pending_items=row.pending_items, - pending_tokens=row.pending_tokens, - tokens_until_threshold=tokens_until_threshold, - hit_threshold=hit_threshold, - in_progress=bool(row.in_progress), - oldest_item_at=row.oldest_item_at, - newest_item_at=row.newest_item_at, - ) - ) - return schemas.QueueWorkUnitsResponse( - representation_batch_max_tokens=batch_max_tokens, - flush_enabled=flush_enabled, - work_units=work_units, +def classify_work_unit_row(row: Any) -> tuple[bool, int]: + """Classify a work-units query row against current threshold/flush settings. + + Returns (hit_threshold, tokens_until_threshold). Pure function so it + can be applied in a pagination transformer. + """ + batch_max_tokens = settings.DERIVER.REPRESENTATION_BATCH_MAX_TOKENS + flush_enabled = settings.DERIVER.FLUSH_ENABLED + + threshold_gated = ( + row.task_type == _THRESHOLD_GATED_TASK_TYPE + and not flush_enabled + and batch_max_tokens > 0 ) + if threshold_gated: + return ( + row.pending_tokens >= batch_max_tokens, + max(batch_max_tokens - row.pending_tokens, 0), + ) + return True, 0 def _build_queue_status_query( @@ -332,10 +310,11 @@ def _build_queue_work_units_query( models.Session.name, ) .order_by( - # Stalled-first ordering would require a HAVING-style classification; - # instead order by oldest pending so debugging surfaces oldest stuck - # work first. - func.min(models.QueueItem.created_at) + # Oldest-pending first surfaces the most-stuck work for debugging. + # work_unit_key is a unique tiebreaker required by sqlakeyset for + # stable cursor pagination when timestamps collide. + func.min(models.QueueItem.created_at), + models.QueueItem.work_unit_key, ) ) diff --git a/src/routers/workspaces.py b/src/routers/workspaces.py index 46b91cbc3..177b22e25 100644 --- a/src/routers/workspaces.py +++ b/src/routers/workspaces.py @@ -1,4 +1,6 @@ import logging +from collections.abc import Sequence +from typing import Any from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Response from fastapi_pagination import Page @@ -192,7 +194,7 @@ async def get_queue_status( @router.get( "/{workspace_id}/queue/work-units", - response_model=schemas.QueueWorkUnitsResponse, + response_model=schemas.QueueWorkUnitsPage, dependencies=[Depends(require_auth(workspace_name="workspace_id"))], ) async def get_queue_work_units( @@ -208,20 +210,56 @@ async def get_queue_work_units( ): """ Return one row per unprocessed work unit in the Workspace's queue, with - token totals, in-progress flag, and threshold classification. + token totals, in-progress flag, and threshold classification. Cursor- + paginated (the queue mutates rapidly; offset pagination would skip rows + as workers process items between page fetches). Useful for debugging "why isn't this work unit advancing?" — distinguishes work units stalled below the batch token threshold from those claimed by a worker or eligible to be claimed. Same filter semantics as /queue/status. """ try: - return await crud.get_queue_work_units( - db, + stmt = await crud.get_queue_work_units_query( workspace_name=workspace_id, session_name=session_id, observer=observer_id, observed=sender_id, ) + + def _transform(rows: Sequence[Any]) -> list[schemas.QueueWorkUnit]: + items: list[schemas.QueueWorkUnit] = [] + for row in rows: + hit_threshold, tokens_until_threshold = crud.classify_work_unit_row(row) + items.append( + schemas.QueueWorkUnit( + work_unit_key=row.work_unit_key, + task_type=row.task_type, + session_id=row.session_id, + session_name=row.session_name, + observer=row.observer, + observed=row.observed, + pending_items=row.pending_items, + pending_tokens=row.pending_tokens, + tokens_until_threshold=tokens_until_threshold, + hit_threshold=hit_threshold, + in_progress=bool(row.in_progress), + oldest_item_at=row.oldest_item_at, + newest_item_at=row.newest_item_at, + ) + ) + return items + + return await apaginate( + db, + stmt, + transformer=_transform, + additional_data={ + "representation_batch_max_tokens": ( + settings.DERIVER.REPRESENTATION_BATCH_MAX_TOKENS + ), + "flush_enabled": settings.DERIVER.FLUSH_ENABLED, + }, + ) except ValueError as e: logger.warning(f"Invalid request parameters: {str(e)}") raise HTTPException(status_code=400, detail=str(e)) from e diff --git a/src/schemas/__init__.py b/src/schemas/__init__.py index b86675f2f..593f9e2e7 100644 --- a/src/schemas/__init__.py +++ b/src/schemas/__init__.py @@ -34,7 +34,7 @@ PeerUpdate, QueueStatus, QueueWorkUnit, - QueueWorkUnitsResponse, + QueueWorkUnitsPage, RepresentationResponse, ScheduleDreamRequest, Session, @@ -131,7 +131,7 @@ "PeerUpdate", "QueueStatus", "QueueWorkUnit", - "QueueWorkUnitsResponse", + "QueueWorkUnitsPage", "RESOURCE_NAME_PATTERN", "RepresentationResponse", "ScheduleDreamRequest", diff --git a/src/schemas/api.py b/src/schemas/api.py index b9deb3998..53d682126 100644 --- a/src/schemas/api.py +++ b/src/schemas/api.py @@ -10,6 +10,7 @@ from urllib.parse import urlparse import tiktoken +from fastapi_pagination.cursor import CursorPage from pydantic import ( AliasChoices, BaseModel, @@ -709,8 +710,18 @@ class QueueWorkUnit(BaseModel): newest_item_at: datetime.datetime -class QueueWorkUnitsResponse(BaseModel): - """Response for /queue/work-units — per-work-unit listing of unprocessed queue items.""" +class QueueWorkUnitsPage(CursorPage[QueueWorkUnit]): + """Cursor-paginated /queue/work-units response. + + Cursor pagination is used because the queue mutates rapidly (workers claim + and complete items continuously) — offset pagination would skip rows when + items are processed between page fetches. + + Standard CursorPage envelope (`items`, `total`, `current_page`, + `next_page`, `previous_page`) plus two additional fields describing the + deriver's threshold configuration, needed to interpret per-row + `hit_threshold` and `tokens_until_threshold`. + """ representation_batch_max_tokens: int = Field( description="DERIVER_REPRESENTATION_BATCH_MAX_TOKENS at the time of the request" @@ -721,7 +732,6 @@ class QueueWorkUnitsResponse(BaseModel): "and all pending representation work units are eligible to be claimed." ) ) - work_units: list[QueueWorkUnit] # --------------------------------------------------------------------------- diff --git a/tests/routes/test_queue_status.py b/tests/routes/test_queue_status.py index c52a137fe..e4eca386d 100644 --- a/tests/routes/test_queue_status.py +++ b/tests/routes/test_queue_status.py @@ -371,7 +371,12 @@ async def test_empty_workspace_returns_empty_work_units( response = client.get(f"/v3/workspaces/{workspace.name}/queue/work-units") assert response.status_code == 200 body = response.json() - assert body["work_units"] == [] + # CursorPage envelope: items, total, current_page, next_page, previous_page + assert body["items"] == [] + assert body["total"] == 0 + assert body["next_page"] is None + assert body["previous_page"] is None + # Envelope additions assert body["representation_batch_max_tokens"] > 0 assert "flush_enabled" in body @@ -423,12 +428,13 @@ async def test_pending_representation_below_threshold_is_stalled( == status_body["pending_work_units"] ) - # Check per-work-unit endpoint + # Check per-work-unit endpoint (cursor-paginated) wu_resp = client.get(f"/v3/workspaces/{workspace.name}/queue/work-units") assert wu_resp.status_code == 200 wu_body = wu_resp.json() - assert len(wu_body["work_units"]) == 1 # 3 items collapse into 1 work unit - work_unit = wu_body["work_units"][0] + assert len(wu_body["items"]) == 1 # 3 items collapse into 1 work unit + assert wu_body["total"] == 1 + work_unit = wu_body["items"][0] assert work_unit["task_type"] == "representation" assert work_unit["pending_items"] == 3 assert work_unit["pending_tokens"] == 0 @@ -438,3 +444,64 @@ async def test_pending_representation_below_threshold_is_stalled( work_unit["tokens_until_threshold"] == wu_body["representation_batch_max_tokens"] ) + + async def test_cursor_pagination_navigation( + self, + client: TestClient, + db_session: AsyncSession, + sample_data: tuple[models.Workspace, models.Peer], + ): + """Verify cursor tokens enable forward navigation across pages.""" + workspace, peer = sample_data + # Create 3 distinct work units (one per session) so we have 3 rows + sessions = [ + models.Session(workspace_name=workspace.name, name=f"cursor_sess_{i}") + for i in range(3) + ] + db_session.add_all(sessions) + await db_session.commit() + for s in sessions: + await db_session.refresh(s) + for session in sessions: + payload = { + "observed": peer.name, + "observer": peer.name, + "task_type": "representation", + "workspace_name": workspace.name, + "session_name": session.name, + } + db_session.add( + models.QueueItem( + session_id=session.id, + task_type="representation", + work_unit_key=construct_work_unit_key(workspace.name, payload), + payload=payload, + processed=False, + workspace_name=workspace.name, + ) + ) + await db_session.commit() + + # Page 1 with size=2 + page1 = client.get( + f"/v3/workspaces/{workspace.name}/queue/work-units", + params={"size": 2}, + ) + assert page1.status_code == 200 + body1 = page1.json() + assert len(body1["items"]) == 2 + assert body1["next_page"] is not None # has more + + # Page 2 via the cursor — fetch remaining row + page2 = client.get( + f"/v3/workspaces/{workspace.name}/queue/work-units", + params={"size": 2, "cursor": body1["next_page"]}, + ) + assert page2.status_code == 200 + body2 = page2.json() + assert len(body2["items"]) == 1 + assert body2["next_page"] is None # end of results + + # No duplicates / skips across pages + seen = {item["work_unit_key"] for item in body1["items"] + body2["items"]} + assert len(seen) == 3 diff --git a/uv.lock b/uv.lock index 75a59f808..ce4889187 100644 --- a/uv.lock +++ b/uv.lock @@ -8,7 +8,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-05-09T19:09:35.818254Z" +exclude-newer = "2026-05-16T02:58:20.64737Z" exclude-newer-span = "P5D" [manifest] @@ -1188,6 +1188,7 @@ dependencies = [ { name = "rich" }, { name = "scikit-learn" }, { name = "sentry-sdk", extra = ["anthropic", "fastapi", "sqlalchemy"] }, + { name = "sqlakeyset" }, { name = "sqlalchemy" }, { name = "tenacity" }, { name = "tiktoken" }, @@ -1242,6 +1243,7 @@ requires-dist = [ { name = "rich", specifier = ">=13.7.1" }, { name = "scikit-learn", specifier = ">=1.6.0" }, { name = "sentry-sdk", extras = ["anthropic", "fastapi", "sqlalchemy"], specifier = ">=2.3.1" }, + { name = "sqlakeyset", specifier = ">=2.0.1745022416" }, { name = "sqlalchemy", specifier = ">=2.0.30" }, { name = "tenacity", specifier = ">=9.1.2" }, { name = "tiktoken", specifier = ">=0.9.0" }, @@ -3618,6 +3620,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/32/46/9cb0e58b2deb7f82b84065f37f3bffeb12413f947f9388e4cac22c4621ce/sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0", size = 29575, upload-time = "2021-05-16T22:03:41.177Z" }, ] +[[package]] +name = "sqlakeyset" +version = "2.0.1775222100" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packaging" }, + { name = "python-dateutil" }, + { name = "sqlalchemy" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/75/18/0a06e22a66df13debde3eb799f7f31f17bf02242836f47a20e2c8c45e44c/sqlakeyset-2.0.1775222100.tar.gz", hash = "sha256:c2988c289181fa3615f3c091308a06001d83fa6ebc0591e7d18e08d8b92ee012", size = 114403, upload-time = "2026-04-03T13:15:03.428Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/f7/ff211cc8bc2968459b78a63c8b62b3b6819dff0abdfd260aa5ef012a73ec/sqlakeyset-2.0.1775222100-py3-none-any.whl", hash = "sha256:5004e671ddc7d9ecf105bcbd46c079b314cc6d413db3159e63a3567e51e28fa3", size = 27369, upload-time = "2026-04-03T13:15:02.219Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.49" From b4282420d48e36eb4fbd2d7feceebc4721047f87 Mon Sep 17 00:00:00 2001 From: Vineeth Voruganti <13438633+VVoruganti@users.noreply.github.com> Date: Thu, 21 May 2026 00:26:48 -0400 Subject: [PATCH 3/5] feat(sdk): add queue_work_units + backfill QueueStatus fields Wire the cursor-paginated /queue/work-units endpoint through both SDKs: new SyncCursorPage/AsyncCursorPage in Python and CursorPage in TS, plus typed QueueWorkUnitsPage* subclasses carrying the deriver-threshold envelope extras (representation_batch_max_tokens, flush_enabled). Adds queue_work_units() / queueWorkUnits() on both client and session levels (sync + async for Python). Also backfills pending_stalled_work_units + pending_ready_work_units on the existing QueueStatus types so SDK consumers can read them. --- sdks/python/src/honcho/aio.py | 112 +++++++++++ sdks/python/src/honcho/api_types.py | 52 +++++ sdks/python/src/honcho/client.py | 75 +++++++ sdks/python/src/honcho/http/routes.py | 4 + sdks/python/src/honcho/pagination.py | 242 ++++++++++++++++++++++- sdks/python/src/honcho/session.py | 55 ++++++ sdks/typescript/__tests__/client.test.ts | 59 ++++++ sdks/typescript/src/client.ts | 74 ++++++- sdks/typescript/src/index.ts | 7 +- sdks/typescript/src/pagination.ts | 211 +++++++++++++++++++- sdks/typescript/src/session.ts | 68 ++++++- sdks/typescript/src/types/api.ts | 67 +++++++ sdks/typescript/src/utils.ts | 29 +++ sdks/typescript/src/validation.ts | 18 ++ tests/sdk/test_client.py | 101 +++++++++- tests/sdk/test_session.py | 51 ++++- 16 files changed, 1215 insertions(+), 10 deletions(-) diff --git a/sdks/python/src/honcho/aio.py b/sdks/python/src/honcho/aio.py index 0edcc60f4..e0dafc698 100644 --- a/sdks/python/src/honcho/aio.py +++ b/sdks/python/src/honcho/aio.py @@ -39,6 +39,8 @@ PeerContextResponse, PeerResponse, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitsPageAsync, RepresentationResponse, SessionConfiguration, SessionPeerConfig, @@ -401,6 +403,64 @@ async def queue_status( ) return QueueStatusResponse.model_validate(data) + async def queue_work_units( + self, + observer: str | PeerBase | None = None, + sender: str | PeerBase | None = None, + session: str | SessionBase | None = None, + *, + cursor: str | None = None, + size: int | None = None, + ) -> QueueWorkUnitsPageAsync: + """List unprocessed queue work units asynchronously, cursor-paginated. + + Useful for debugging "why isn't this work unit advancing?" — distinguishes + work units stalled below the batch token threshold from those claimed by + a worker or eligible to be claimed. See ``Honcho.queue_work_units`` for + details. + """ + await self._honcho._ensure_workspace_async() + resolved_observer_id = resolve_id(observer) + resolved_sender_id = resolve_id(sender) + resolved_session_id = resolve_id(session) + + def build_query(cursor_token: str | None) -> dict[str, Any]: + q: dict[str, Any] = {} + if resolved_observer_id: + q["observer_id"] = resolved_observer_id + if resolved_sender_id: + q["sender_id"] = resolved_sender_id + if resolved_session_id: + q["session_id"] = resolved_session_id + if cursor_token is not None: + q["cursor"] = cursor_token + if size is not None: + q["size"] = size + return q + + async def fetch_at(cursor_token: str) -> QueueWorkUnitsPageAsync: + next_data = await self._honcho._async_http_client.get( + routes.workspace_queue_work_units(self._honcho.workspace_id), + query=build_query(cursor_token), + ) + return QueueWorkUnitsPageAsync( + next_data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + + data = await self._honcho._async_http_client.get( + routes.workspace_queue_work_units(self._honcho.workspace_id), + query=build_query(cursor) or None, + ) + return QueueWorkUnitsPageAsync( + data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) async def schedule_dream( self, @@ -1362,6 +1422,58 @@ async def queue_status( ) return QueueStatusResponse.model_validate(data) + async def queue_work_units( + self, + observer: str | PeerBase | None = None, + sender: str | PeerBase | None = None, + *, + cursor: str | None = None, + size: int | None = None, + ) -> QueueWorkUnitsPageAsync: + """List unprocessed queue work units scoped to this session. + + Cursor-paginated. ``session_id`` is hard-set to this session and cannot + be overridden. + """ + await self._session._honcho._ensure_workspace_async() + resolved_observer_id = resolve_id(observer) + resolved_sender_id = resolve_id(sender) + + def build_query(cursor_token: str | None) -> dict[str, Any]: + q: dict[str, Any] = {"session_id": self._session.id} + if resolved_observer_id: + q["observer_id"] = resolved_observer_id + if resolved_sender_id: + q["sender_id"] = resolved_sender_id + if cursor_token is not None: + q["cursor"] = cursor_token + if size is not None: + q["size"] = size + return q + + async def fetch_at(cursor_token: str) -> QueueWorkUnitsPageAsync: + next_data = await self._session._honcho._async_http_client.get( + routes.workspace_queue_work_units(self._session.workspace_id), + query=build_query(cursor_token), + ) + return QueueWorkUnitsPageAsync( + next_data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + + data = await self._session._honcho._async_http_client.get( + routes.workspace_queue_work_units(self._session.workspace_id), + query=build_query(cursor), + ) + return QueueWorkUnitsPageAsync( + data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + async def get_message(self, message_id: str) -> Message: """Get a single message by ID from this session asynchronously. diff --git a/sdks/python/src/honcho/api_types.py b/sdks/python/src/honcho/api_types.py index 897a86feb..f8a50b1ca 100644 --- a/sdks/python/src/honcho/api_types.py +++ b/sdks/python/src/honcho/api_types.py @@ -10,6 +10,8 @@ from pydantic import BaseModel, ConfigDict, Field +from .pagination import AsyncCursorPage, SyncCursorPage + # ============================================================================== # Configuration Types # ============================================================================== @@ -468,6 +470,8 @@ class SessionQueueStatus(BaseModel): completed_work_units: int in_progress_work_units: int pending_work_units: int + pending_stalled_work_units: int = 0 + pending_ready_work_units: int = 0 class QueueStatusResponse(BaseModel): @@ -477,9 +481,57 @@ class QueueStatusResponse(BaseModel): completed_work_units: int in_progress_work_units: int pending_work_units: int + pending_stalled_work_units: int = 0 + pending_ready_work_units: int = 0 sessions: dict[str, SessionQueueStatus] | None = None +class QueueWorkUnit(BaseModel): + """One row from ``/queue/work-units`` — a single unprocessed work unit.""" + + work_unit_key: str + task_type: str + session_id: str | None = None + session_name: str | None = None + observer: str | None = None + observed: str | None = None + pending_items: int + pending_tokens: int + tokens_until_threshold: int + hit_threshold: bool + in_progress: bool + oldest_item_at: datetime.datetime + newest_item_at: datetime.datetime + + +class _QueueWorkUnitsEnvelopeMixin: + """Shared accessors for the /queue/work-units envelope extras.""" + + _data: dict[str, Any] # pyright: ignore[reportUninitializedInstanceVariable] + + @property + def representation_batch_max_tokens(self) -> int: + """DERIVER_REPRESENTATION_BATCH_MAX_TOKENS at the time of the request.""" + return int(self._data["representation_batch_max_tokens"]) + + @property + def flush_enabled(self) -> bool: + """True when the batch threshold gating is bypassed server-side.""" + return bool(self._data["flush_enabled"]) + + +class QueueWorkUnitsPageSync( + _QueueWorkUnitsEnvelopeMixin, SyncCursorPage[QueueWorkUnit, QueueWorkUnit] +): + """Cursor-paginated page of ``QueueWorkUnit`` with envelope extras.""" + + +class QueueWorkUnitsPageAsync( + _QueueWorkUnitsEnvelopeMixin, AsyncCursorPage[QueueWorkUnit, QueueWorkUnit] +): + """Async cursor-paginated page of ``QueueWorkUnit`` with envelope extras.""" + + # ============================================================================== # Dialectic (Chat) Types # ============================================================================== diff --git a/sdks/python/src/honcho/client.py b/sdks/python/src/honcho/client.py index 969ff741c..89181bf3c 100644 --- a/sdks/python/src/honcho/client.py +++ b/sdks/python/src/honcho/client.py @@ -16,6 +16,8 @@ PeerConfig, PeerResponse, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitsPageSync, SessionConfiguration, SessionResponse, WorkspaceConfiguration, @@ -604,6 +606,79 @@ def queue_status( ) return QueueStatusResponse.model_validate(data) + @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) + def queue_work_units( + self, + observer: str | PeerBase | None = None, + sender: str | PeerBase | None = None, + session: str | SessionBase | None = None, + *, + cursor: str | None = None, + size: int | None = None, + ) -> QueueWorkUnitsPageSync: + """ + List unprocessed queue work units, cursor-paginated. + + Useful for debugging "why isn't this work unit advancing?" — distinguishes + work units stalled below the batch token threshold from those claimed by + a worker or eligible to be claimed. + + Args: + observer: Optional observer (ID string or Peer object) to filter by + sender: Optional sender (ID string or Peer object) to filter by + session: Optional session (ID string or Session object) to filter by + cursor: Optional cursor token from a previous page's ``next_page`` + or ``previous_page`` + size: Optional page size + + Returns: + A ``QueueWorkUnitsPageSync`` — iterate items via ``.items`` (current + page only) or by iterating the page object (auto-walks subsequent + pages until exhausted). ``.representation_batch_max_tokens`` and + ``.flush_enabled`` expose the server's threshold configuration. + """ + self._ensure_workspace() + resolved_observer_id = resolve_id(observer) + resolved_sender_id = resolve_id(sender) + resolved_session_id = resolve_id(session) + + def build_query(cursor_token: str | None) -> dict[str, Any]: + q: dict[str, Any] = {} + if resolved_observer_id: + q["observer_id"] = resolved_observer_id + if resolved_sender_id: + q["sender_id"] = resolved_sender_id + if resolved_session_id: + q["session_id"] = resolved_session_id + if cursor_token is not None: + q["cursor"] = cursor_token + if size is not None: + q["size"] = size + return q + + def fetch_at(cursor_token: str) -> QueueWorkUnitsPageSync: + next_data = self._http.get( + routes.workspace_queue_work_units(self.workspace_id), + query=build_query(cursor_token), + ) + return QueueWorkUnitsPageSync( + next_data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + + data = self._http.get( + routes.workspace_queue_work_units(self.workspace_id), + query=build_query(cursor) or None, + ) + return QueueWorkUnitsPageSync( + data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def schedule_dream( self, diff --git a/sdks/python/src/honcho/http/routes.py b/sdks/python/src/honcho/http/routes.py index 3fdbd677d..cceda20f4 100644 --- a/sdks/python/src/honcho/http/routes.py +++ b/sdks/python/src/honcho/http/routes.py @@ -24,6 +24,10 @@ def workspace_queue_status(workspace_id: str) -> str: return f"/{API_VERSION}/workspaces/{workspace_id}/queue/status" +def workspace_queue_work_units(workspace_id: str) -> str: + return f"/{API_VERSION}/workspaces/{workspace_id}/queue/work-units" + + def workspace_schedule_dream(workspace_id: str) -> str: return f"/{API_VERSION}/workspaces/{workspace_id}/schedule_dream" diff --git a/sdks/python/src/honcho/pagination.py b/sdks/python/src/honcho/pagination.py index 32c2c05a9..933aa8c42 100644 --- a/sdks/python/src/honcho/pagination.py +++ b/sdks/python/src/honcho/pagination.py @@ -1,4 +1,11 @@ -"""Pagination wrapper for Honcho SDK.""" +"""Pagination wrappers for Honcho SDK. + +Two flavors live here: + - ``SyncPage`` / ``AsyncPage`` — offset/page-number pagination + - ``SyncCursorPage`` / ``AsyncCursorPage`` — opaque-cursor pagination + +Use whichever matches the endpoint's response envelope. +""" from __future__ import annotations @@ -11,7 +18,7 @@ T = TypeVar("T", bound=BaseModel) U = TypeVar("U", default=T) -__all__ = ["SyncPage", "AsyncPage"] +__all__ = ["SyncPage", "AsyncPage", "SyncCursorPage", "AsyncCursorPage"] class SyncPage(Generic[T, U]): @@ -248,3 +255,234 @@ async def get_next_page(self) -> "AsyncPage[T, U] | None": return None return await self._fetch_next(current_page + 1) + + +class SyncCursorPage(Generic[T, U]): + """ + Cursor-paginated result wrapper that transforms objects from type T to type U. + + Cursor pagination uses opaque tokens (``next_page`` / ``previous_page``) instead + of page numbers; the navigation is stable across concurrent mutations of the + underlying data, unlike offset pagination. + + Note: + The underlying server data may still mutate between page fetches (rows + appearing or being processed away). Iterating across pages snapshots + what the server returns per page, but pages may be inconsistent with + each other under concurrent processing. Use ``.items`` to read just + the current page if you need a stable view. + """ + + def __init__( + self, + data: dict[str, Any], + item_type: type[T], + transform_func: Callable[[T], U] | None = None, + fetch_next: Callable[[str], "SyncCursorPage[T, U]"] | None = None, + fetch_previous: Callable[[str], "SyncCursorPage[T, U]"] | None = None, + ) -> None: + """ + Initialize a cursor page. + + Args: + data: Raw paginated response — ``items``, ``total``, ``current_page``, + ``current_page_backwards``, ``next_page``, ``previous_page``. + item_type: Pydantic type to parse each item as. + transform_func: Optional mapper from raw item type to a user-facing type. + fetch_next: Optional callback that takes a cursor token and returns + the page at that cursor. + fetch_previous: Optional callback for backwards navigation. + """ + self._data: dict[str, Any] = data + self._item_type: type[T] = item_type + self._transform_func: Callable[[T], U] | None = transform_func + self._fetch_next: Callable[[str], "SyncCursorPage[T, U]"] | None = fetch_next + self._fetch_previous: Callable[[str], "SyncCursorPage[T, U]"] | None = ( + fetch_previous + ) + + raw_items = data.get("items", []) + self._raw_items: list[T] = [ + item_type.model_validate(item) for item in raw_items + ] + + def __iter__(self) -> Iterator[U] | Iterator[T]: + """ + Iterate over all transformed items across all pages. + + Warning: + Automatically chases ``next_page`` tokens until exhausted. For + a mutating queue this may yield a different set of items than + a single-shot snapshot. Use ``items`` for the current page only. + """ + page: SyncCursorPage[T, U] | None = self + while page is not None: + for item in page._raw_items: + if self._transform_func is not None: + yield self._transform_func(item) + else: + yield item + page = page.get_next_page() + + def __getitem__(self, index: int) -> U | T: + """Get a transformed item by index on the current page.""" + item = self._raw_items[index] + if self._transform_func is not None: + return self._transform_func(item) + return item + + def __len__(self) -> int: + """Get the number of items on the current page.""" + return len(self._raw_items) + + @property + def items(self) -> list[U] | list[T]: + """Get all transformed items on the current page (snapshot only).""" + if self._transform_func is not None: + return [self._transform_func(item) for item in self._raw_items] + return list(self._raw_items) + + @property + def total(self) -> int | None: + """Total items across all pages, when the server populates it.""" + return self._data.get("total") + + @property + def current_page(self) -> str | None: + """Cursor token that re-fetches the current page.""" + return self._data.get("current_page") + + @property + def current_page_backwards(self) -> str | None: + """Cursor token to re-fetch the current page starting from the last item.""" + return self._data.get("current_page_backwards") + + @property + def next_page(self) -> str | None: + """Cursor token for the next page, or None if no more pages.""" + return self._data.get("next_page") + + @property + def previous_page(self) -> str | None: + """Cursor token for the previous page, or None if at the start.""" + return self._data.get("previous_page") + + def has_next_page(self) -> bool: + """True if there's a cursor for the next page.""" + return self.next_page is not None + + def has_previous_page(self) -> bool: + """True if there's a cursor for the previous page.""" + return self.previous_page is not None + + def get_next_page(self) -> "SyncCursorPage[T, U] | None": + """Fetch the next page; returns None if at the end or no fetch callback.""" + if self.next_page is None or self._fetch_next is None: + return None + return self._fetch_next(self.next_page) + + def get_previous_page(self) -> "SyncCursorPage[T, U] | None": + """Fetch the previous page; returns None if at the start or no callback.""" + if self.previous_page is None or self._fetch_previous is None: + return None + return self._fetch_previous(self.previous_page) + + +class AsyncCursorPage(Generic[T, U]): + """ + Async cursor-paginated result wrapper. See ``SyncCursorPage`` for semantics. + """ + + def __init__( + self, + data: dict[str, Any], + item_type: type[T], + transform_func: Callable[[T], U] | None = None, + fetch_next: (Callable[[str], Awaitable["AsyncCursorPage[T, U]"]] | None) = None, + fetch_previous: ( + Callable[[str], Awaitable["AsyncCursorPage[T, U]"]] | None + ) = None, + ) -> None: + self._data: dict[str, Any] = data + self._item_type: type[T] = item_type + self._transform_func: Callable[[T], U] | None = transform_func + self._fetch_next: Callable[[str], Awaitable["AsyncCursorPage[T, U]"]] | None = ( + fetch_next + ) + self._fetch_previous: ( + Callable[[str], Awaitable["AsyncCursorPage[T, U]"]] | None + ) = fetch_previous + + raw_items = data.get("items", []) + self._raw_items: list[T] = [ + item_type.model_validate(item) for item in raw_items + ] + + async def __aiter__(self) -> AsyncIterator[U] | AsyncIterator[T]: + """ + Async iterate over all transformed items across all pages. + + Warning: + Automatically chases ``next_page`` tokens until exhausted. For + a mutating queue this may yield a different set of items than + a single-shot snapshot. Use ``items`` for the current page only. + """ + page: AsyncCursorPage[T, U] | None = self + while page is not None: + for item in page._raw_items: + if self._transform_func is not None: + yield self._transform_func(item) + else: + yield item + page = await page.get_next_page() + + def __getitem__(self, index: int) -> U | T: + item = self._raw_items[index] + if self._transform_func is not None: + return self._transform_func(item) + return item + + def __len__(self) -> int: + return len(self._raw_items) + + @property + def items(self) -> list[U] | list[T]: + if self._transform_func is not None: + return [self._transform_func(item) for item in self._raw_items] + return list(self._raw_items) + + @property + def total(self) -> int | None: + return self._data.get("total") + + @property + def current_page(self) -> str | None: + return self._data.get("current_page") + + @property + def current_page_backwards(self) -> str | None: + return self._data.get("current_page_backwards") + + @property + def next_page(self) -> str | None: + return self._data.get("next_page") + + @property + def previous_page(self) -> str | None: + return self._data.get("previous_page") + + def has_next_page(self) -> bool: + return self.next_page is not None + + def has_previous_page(self) -> bool: + return self.previous_page is not None + + async def get_next_page(self) -> "AsyncCursorPage[T, U] | None": + if self.next_page is None or self._fetch_next is None: + return None + return await self._fetch_next(self.next_page) + + async def get_previous_page(self) -> "AsyncCursorPage[T, U] | None": + if self.previous_page is None or self._fetch_previous is None: + return None + return await self._fetch_previous(self.previous_page) diff --git a/sdks/python/src/honcho/session.py b/sdks/python/src/honcho/session.py index 3b1b0f140..2a49bbbee 100644 --- a/sdks/python/src/honcho/session.py +++ b/sdks/python/src/honcho/session.py @@ -15,6 +15,8 @@ MessageResponse, PeerResponse, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitsPageSync, RepresentationResponse, SessionConfiguration, SessionPeerConfig, @@ -968,6 +970,59 @@ def queue_status( ) return QueueStatusResponse.model_validate(data) + @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) + def queue_work_units( + self, + observer: str | PeerBase | None = None, + sender: str | PeerBase | None = None, + *, + cursor: str | None = None, + size: int | None = None, + ) -> QueueWorkUnitsPageSync: + """List unprocessed queue work units scoped to this session. + + Cursor-paginated. ``session_id`` is hard-set to this session and cannot + be overridden. + """ + self._honcho._ensure_workspace() + resolved_observer_id = resolve_id(observer) + resolved_sender_id = resolve_id(sender) + + def build_query(cursor_token: str | None) -> dict[str, Any]: + q: dict[str, Any] = {"session_id": self.id} + if resolved_observer_id: + q["observer_id"] = resolved_observer_id + if resolved_sender_id: + q["sender_id"] = resolved_sender_id + if cursor_token is not None: + q["cursor"] = cursor_token + if size is not None: + q["size"] = size + return q + + def fetch_at(cursor_token: str) -> QueueWorkUnitsPageSync: + next_data = self._honcho._http.get( + routes.workspace_queue_work_units(self.workspace_id), + query=build_query(cursor_token), + ) + return QueueWorkUnitsPageSync( + next_data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + + data = self._honcho._http.get( + routes.workspace_queue_work_units(self.workspace_id), + query=build_query(cursor), + ) + return QueueWorkUnitsPageSync( + data, + QueueWorkUnit, + fetch_next=fetch_at, + fetch_previous=fetch_at, + ) + def get_message(self, message_id: str) -> Message: """Get a single message by ID from this session. diff --git a/sdks/typescript/__tests__/client.test.ts b/sdks/typescript/__tests__/client.test.ts index 715d55e46..7e024a4b5 100644 --- a/sdks/typescript/__tests__/client.test.ts +++ b/sdks/typescript/__tests__/client.test.ts @@ -321,6 +321,12 @@ describe('Honcho Client', () => { expect(typeof status.completedWorkUnits).toBe('number') expect(typeof status.inProgressWorkUnits).toBe('number') expect(typeof status.pendingWorkUnits).toBe('number') + // Phase 1 fields: stalled + ready partition pending + expect(typeof status.pendingStalledWorkUnits).toBe('number') + expect(typeof status.pendingReadyWorkUnits).toBe('number') + expect( + status.pendingStalledWorkUnits + status.pendingReadyWorkUnits + ).toBe(status.pendingWorkUnits) }) test('queueStatus with observer filter', async () => { @@ -344,6 +350,59 @@ describe('Honcho Client', () => { }) }) + // =========================================================================== + // Queue Work Units (cursor-paginated) + // =========================================================================== + + describe('GET /workspaces/:id/queue/work-units', () => { + test('queueWorkUnits returns cursor-paginated page with envelope extras', async () => { + const page = await client.queueWorkUnits() + + // Envelope extras (server's deriver threshold config) + expect(typeof page.representationBatchMaxTokens).toBe('number') + expect(page.representationBatchMaxTokens).toBeGreaterThan(0) + expect(typeof page.flushEnabled).toBe('boolean') + + // Cursor page surface + expect(Array.isArray(page.items)).toBe(true) + expect(page.hasNextPage).toBe(false) // empty workspace + expect(page.nextPage).toBeNull() + }) + + test('queueWorkUnits with filter combos still returns a page', async () => { + const peer = await client.peer('queue-wu-observer') + const session = await client.session('queue-wu-session', { + metadata: {}, + }) + + const byObserver = await client.queueWorkUnits({ observer: peer }) + expect(Array.isArray(byObserver.items)).toBe(true) + + const bySession = await client.queueWorkUnits({ session: session }) + expect(Array.isArray(bySession.items)).toBe(true) + + const bySender = await client.queueWorkUnits({ sender: peer }) + expect(Array.isArray(bySender.items)).toBe(true) + }) + + test('queueWorkUnits session-scoped variant hard-sets session_id', async () => { + const session = await client.session('queue-wu-session-scoped', { + metadata: {}, + }) + const page = await session.queueWorkUnits() + expect(typeof page.representationBatchMaxTokens).toBe('number') + expect(Array.isArray(page.items)).toBe(true) + }) + + test('queueWorkUnits with size param does not exceed page size', async () => { + const page = await client.queueWorkUnits({ size: 10 }) + expect(page.items.length).toBeLessThanOrEqual(10) + // Empty queue: no next page, getNextPage returns null + const next = await page.getNextPage() + expect(next).toBeNull() + }) + }) + // =========================================================================== // Client Configuration // =========================================================================== diff --git a/sdks/typescript/src/client.ts b/sdks/typescript/src/client.ts index 1270a3196..d24b310ae 100644 --- a/sdks/typescript/src/client.ts +++ b/sdks/typescript/src/client.ts @@ -1,7 +1,7 @@ import { API_VERSION } from './api-version' import { HonchoHTTPClient } from './http/client' import { Message } from './message' -import { Page } from './pagination' +import { CursorPage, Page, QueueWorkUnitsPage } from './pagination' import { Peer } from './peer' import { Session } from './session' import type { @@ -11,10 +11,17 @@ import type { QueueStatus, QueueStatusParams, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitResponse, + QueueWorkUnitsResponse, SessionResponse, WorkspaceResponse, } from './types/api' -import { resolveId, transformQueueStatus } from './utils' +import { + resolveId, + transformQueueStatus, + transformQueueWorkUnit, +} from './utils' import { FilterSchema, type Filters, @@ -30,6 +37,7 @@ import { peerConfigFromApi, peerConfigToApi, type QueueStatusOptions, + type QueueWorkUnitsOptions, SearchQuerySchema, type SessionConfig, SessionConfigSchema, @@ -818,6 +826,68 @@ export class Honcho { return transformQueueStatus(status) } + /** + * List unprocessed queue work units, cursor-paginated. + * + * Useful for debugging "why isn't this work unit advancing?" — distinguishes + * work units stalled below the batch token threshold from those claimed by + * a worker or eligible to be claimed. + * + * @returns A QueueWorkUnitsPage iterable across pages. Read `.items` for the + * current page only; iterate the page object to walk subsequent pages. + * `.representationBatchMaxTokens` and `.flushEnabled` expose the server's + * threshold configuration. + */ + async queueWorkUnits( + options?: Omit< + QueueWorkUnitsOptions, + 'observerId' | 'senderId' | 'sessionId' + > & { + observer?: string | Peer + sender?: string | Peer + session?: string | Session + } + ): Promise> { + await this._ensureWorkspace() + const observerId = options?.observer + ? resolveId(options.observer) + : undefined + const senderId = options?.sender ? resolveId(options.sender) : undefined + const sessionId = options?.session ? resolveId(options.session) : undefined + const size = options?.size + + const buildQuery = ( + cursor?: string + ): Record => { + const q: Record = {} + if (observerId) q.observer_id = observerId + if (senderId) q.sender_id = senderId + if (sessionId) q.session_id = sessionId + if (cursor) q.cursor = cursor + if (size != null) q.size = size + return q + } + + const fetchAt = async (cursor: string): Promise => { + return this._http.get( + `/${API_VERSION}/workspaces/${this.workspaceId}/queue/work-units`, + { query: buildQuery(cursor) } + ) + } + + const data = await this._http.get( + `/${API_VERSION}/workspaces/${this.workspaceId}/queue/work-units`, + { query: buildQuery(options?.cursor) } + ) + + return new QueueWorkUnitsPage( + data, + transformQueueWorkUnit, + fetchAt, + fetchAt + ) + } + /** * Schedule a dream task for memory consolidation. * diff --git a/sdks/typescript/src/index.ts b/sdks/typescript/src/index.ts index f6b11d26e..6de84a94f 100644 --- a/sdks/typescript/src/index.ts +++ b/sdks/typescript/src/index.ts @@ -28,7 +28,7 @@ export { DialecticStreamResponse, } from './http/streaming' export { Message, type MessageInput } from './message' -export { Page } from './pagination' +export { CursorPage, Page, QueueWorkUnitsPage } from './pagination' export { Peer, PeerContext } from './peer' export { Session } from './session' export { @@ -42,12 +42,16 @@ export { export type { ConclusionQueryParams, ConclusionResponse, + CursorPageResponse, MessageResponse, PageResponse, PeerContextResponse, PeerResponse, QueueStatus, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitResponse, + QueueWorkUnitsResponse, RepresentationOptions, SessionContextResponse, SessionQueueStatus, @@ -72,6 +76,7 @@ export type { PeerMetadata, PeerRemoval, QueueStatusOptions, + QueueWorkUnitsOptions, SessionConfig, SessionMetadata, SessionPeerConfig, diff --git a/sdks/typescript/src/pagination.ts b/sdks/typescript/src/pagination.ts index 8c5b68574..65bf30794 100644 --- a/sdks/typescript/src/pagination.ts +++ b/sdks/typescript/src/pagination.ts @@ -1,4 +1,8 @@ -import type { PageResponse } from './types/api' +import type { + CursorPageResponse, + PageResponse, + QueueWorkUnitsResponse, +} from './types/api' /** * Function type for fetching the next page of results. @@ -8,6 +12,14 @@ export type NextPageFetcher = ( size: number ) => Promise> +/** + * Function type for fetching a page by cursor token. + */ +export type CursorPageFetcher = ( + cursor: string, + size?: number +) => Promise> + /** * Generic paginated result wrapper for Honcho SDK. * Provides async iteration and transformation capabilities. @@ -181,3 +193,200 @@ export class Page implements AsyncIterable { return allItems } } + +/** + * Cursor-paginated result wrapper. Uses opaque tokens (`nextPage`/`previousPage`) + * instead of page numbers. Stable across concurrent mutations of the + * underlying data, unlike offset pagination. + * + * **Warning:** The underlying server data may still mutate between page fetches. + * Iterating across pages snapshots what the server returns per page, but pages + * may be inconsistent with each other under concurrent processing. Use + * `.items` to read just the current page if you need a stable view. + */ +export class CursorPage implements AsyncIterable { + protected _data: CursorPageResponse + protected _transformFunc?: (item: TOriginal) => T + protected _fetchNext?: CursorPageFetcher + protected _fetchPrevious?: CursorPageFetcher + + constructor( + data: CursorPageResponse, + transformFunc?: (item: TOriginal) => T, + fetchNext?: CursorPageFetcher, + fetchPrevious?: CursorPageFetcher + ) { + this._data = data + this._transformFunc = transformFunc + this._fetchNext = fetchNext + this._fetchPrevious = fetchPrevious + } + + /** + * Async iterator across all pages, auto-following `nextPage` until exhausted. + * Use `.items` for the current page only. + */ + async *[Symbol.asyncIterator](): AsyncIterator { + for (const item of this._data.items) { + yield this._transformFunc + ? this._transformFunc(item) + : (item as unknown as T) + } + + let currentPage: CursorPage | null = this + while (currentPage.hasNextPage) { + const next: CursorPage | null = + await currentPage.getNextPage() + if (!next) break + currentPage = next + for (const item of next._data.items) { + yield next._transformFunc + ? next._transformFunc(item) + : (item as unknown as T) + } + } + } + + get items(): T[] { + const items = this._data.items || [] + return this._transformFunc + ? items.map(this._transformFunc) + : (items as unknown as T[]) + } + + get length(): number { + return this._data.items?.length ?? 0 + } + + /** Total items across all pages, when the server populates it. */ + get total(): number | null { + return this._data.total ?? null + } + + /** Cursor token that re-fetches the current page. */ + get currentPage(): string | null { + return this._data.current_page ?? null + } + + /** Cursor token to re-fetch the current page from the last item. */ + get currentPageBackwards(): string | null { + return this._data.current_page_backwards ?? null + } + + /** Cursor token for the next page, or null at the end. */ + get nextPage(): string | null { + return this._data.next_page ?? null + } + + /** Cursor token for the previous page, or null at the start. */ + get previousPage(): string | null { + return this._data.previous_page ?? null + } + + get hasNextPage(): boolean { + return this._data.next_page != null + } + + get hasPreviousPage(): boolean { + return this._data.previous_page != null + } + + /** Fetch the next page; null if at end or no fetch callback. */ + async getNextPage(): Promise | null> { + if (!this._data.next_page || !this._fetchNext) return null + const data = await this._fetchNext(this._data.next_page) + return new CursorPage( + data, + this._transformFunc, + this._fetchNext, + this._fetchPrevious + ) + } + + /** Fetch the previous page; null if at start or no fetch callback. */ + async getPreviousPage(): Promise | null> { + if (!this._data.previous_page || !this._fetchPrevious) return null + const data = await this._fetchPrevious(this._data.previous_page) + return new CursorPage( + data, + this._transformFunc, + this._fetchNext, + this._fetchPrevious + ) + } + + /** Collect items from all pages into an array. */ + async toArray(): Promise { + const allItems: T[] = [] + for await (const item of this) { + allItems.push(item) + } + return allItems + } +} + +/** + * Cursor page for the /queue/work-units endpoint with envelope extras + * (`representationBatchMaxTokens`, `flushEnabled`) carrying the server-side + * deriver threshold configuration. + */ +export class QueueWorkUnitsPage extends CursorPage< + T, + TOriginal +> { + protected override _data: QueueWorkUnitsResponse & { + items: TOriginal[] + } + + constructor( + data: QueueWorkUnitsResponse & { items: TOriginal[] }, + transformFunc?: (item: TOriginal) => T, + fetchNext?: CursorPageFetcher, + fetchPrevious?: CursorPageFetcher + ) { + super(data, transformFunc, fetchNext, fetchPrevious) + this._data = data + } + + /** DERIVER_REPRESENTATION_BATCH_MAX_TOKENS at the time of the request. */ + get representationBatchMaxTokens(): number { + return this._data.representation_batch_max_tokens + } + + /** True when the batch threshold gating is bypassed server-side. */ + get flushEnabled(): boolean { + return this._data.flush_enabled + } + + override async getNextPage(): Promise | null> { + if (!this._data.next_page || !this._fetchNext) return null + const data = (await this._fetchNext( + this._data.next_page + )) as QueueWorkUnitsResponse & { items: TOriginal[] } + return new QueueWorkUnitsPage( + data, + this._transformFunc, + this._fetchNext, + this._fetchPrevious + ) + } + + override async getPreviousPage(): Promise | null> { + if (!this._data.previous_page || !this._fetchPrevious) return null + const data = (await this._fetchPrevious( + this._data.previous_page + )) as QueueWorkUnitsResponse & { items: TOriginal[] } + return new QueueWorkUnitsPage( + data, + this._transformFunc, + this._fetchNext, + this._fetchPrevious + ) + } +} diff --git a/sdks/typescript/src/session.ts b/sdks/typescript/src/session.ts index 0f9cf4aa9..45a8086fa 100644 --- a/sdks/typescript/src/session.ts +++ b/sdks/typescript/src/session.ts @@ -1,7 +1,7 @@ import { API_VERSION } from './api-version' import type { HonchoHTTPClient } from './http/client' import { Message } from './message' -import { Page } from './pagination' +import { Page, QueueWorkUnitsPage } from './pagination' import { Peer } from './peer' import { SessionContext, SessionSummaries } from './session_context' import type { @@ -11,13 +11,16 @@ import type { QueueStatus, QueueStatusParams, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitResponse, + QueueWorkUnitsResponse, RepresentationOptions, RepresentationResponse, SessionContextResponse, SessionResponse, SessionSummariesResponse, } from './types/api' -import { transformQueueStatus } from './utils' +import { transformQueueStatus, transformQueueWorkUnit } from './utils' import { ContextParamsSchema, FileUploadSchema, @@ -37,6 +40,7 @@ import { PeerRemovalSchema, peerConfigFromApi, type QueueStatusOptions, + type QueueWorkUnitsOptions, SearchQuerySchema, type SessionConfig, SessionConfigSchema, @@ -906,6 +910,66 @@ export class Session { return transformQueueStatus(status) } + /** + * List unprocessed queue work units scoped to this session, cursor-paginated. + * + * @returns A QueueWorkUnitsPage iterable across pages. session_id is hard-set + * to this session and cannot be overridden. + */ + async queueWorkUnits( + options?: Omit< + QueueWorkUnitsOptions, + 'sessionId' | 'observerId' | 'senderId' | 'session' + > & { + observer?: string | Peer + sender?: string | Peer + } + ): Promise> { + const resolvedObserverId = options?.observer + ? typeof options.observer === 'string' + ? options.observer + : options.observer.id + : undefined + const resolvedSenderId = options?.sender + ? typeof options.sender === 'string' + ? options.sender + : options.sender.id + : undefined + const size = options?.size + + const buildQuery = ( + cursor?: string + ): Record => { + const q: Record = { + session_id: this.id, + } + if (resolvedObserverId) q.observer_id = resolvedObserverId + if (resolvedSenderId) q.sender_id = resolvedSenderId + if (cursor) q.cursor = cursor + if (size != null) q.size = size + return q + } + + const fetchAt = async (cursor: string): Promise => { + return this._http.get( + `/${API_VERSION}/workspaces/${this.workspaceId}/queue/work-units`, + { query: buildQuery(cursor) } + ) + } + + const data = await this._http.get( + `/${API_VERSION}/workspaces/${this.workspaceId}/queue/work-units`, + { query: buildQuery(options?.cursor) } + ) + + return new QueueWorkUnitsPage( + data, + transformQueueWorkUnit, + fetchAt, + fetchAt + ) + } + /** * Upload a file to this session as a message. * diff --git a/sdks/typescript/src/types/api.ts b/sdks/typescript/src/types/api.ts index 1622c99cf..a35423ea5 100644 --- a/sdks/typescript/src/types/api.ts +++ b/sdks/typescript/src/types/api.ts @@ -320,6 +320,8 @@ export interface SessionQueueStatusResponse { completed_work_units: number in_progress_work_units: number pending_work_units: number + pending_stalled_work_units?: number + pending_ready_work_units?: number } export interface QueueStatusResponse { @@ -327,6 +329,8 @@ export interface QueueStatusResponse { completed_work_units: number in_progress_work_units: number pending_work_units: number + pending_stalled_work_units?: number + pending_ready_work_units?: number sessions?: Record } @@ -336,6 +340,14 @@ export interface QueueStatusParams { session_id?: string } +export interface QueueWorkUnitsParams { + observer_id?: string + sender_id?: string + session_id?: string + cursor?: string + size?: number +} + /** * Queue status scoped to a single session. */ @@ -345,6 +357,8 @@ export interface SessionQueueStatus { completedWorkUnits: number inProgressWorkUnits: number pendingWorkUnits: number + pendingStalledWorkUnits: number + pendingReadyWorkUnits: number } /** @@ -355,9 +369,45 @@ export interface QueueStatus { completedWorkUnits: number inProgressWorkUnits: number pendingWorkUnits: number + pendingStalledWorkUnits: number + pendingReadyWorkUnits: number sessions?: Record } +/** Raw per-work-unit row from /queue/work-units. */ +export interface QueueWorkUnitResponse { + work_unit_key: string + task_type: string + session_id: string | null + session_name: string | null + observer: string | null + observed: string | null + pending_items: number + pending_tokens: number + tokens_until_threshold: number + hit_threshold: boolean + in_progress: boolean + oldest_item_at: string + newest_item_at: string +} + +/** Transformed per-work-unit row, camelCase. */ +export interface QueueWorkUnit { + workUnitKey: string + taskType: string + sessionId: string | null + sessionName: string | null + observer: string | null + observed: string | null + pendingItems: number + pendingTokens: number + tokensUntilThreshold: number + hitThreshold: boolean + inProgress: boolean + oldestItemAt: string + newestItemAt: string +} + // ============================================================================= // Pagination Types // ============================================================================= @@ -369,3 +419,20 @@ export interface PageResponse { total: number pages: number } + +/** Cursor-paginated response envelope. */ +export interface CursorPageResponse { + items: T[] + total?: number + current_page?: string | null + current_page_backwards?: string | null + next_page?: string | null + previous_page?: string | null +} + +/** /queue/work-units response: a CursorPage of QueueWorkUnit plus envelope extras. */ +export interface QueueWorkUnitsResponse + extends CursorPageResponse { + representation_batch_max_tokens: number + flush_enabled: boolean +} diff --git a/sdks/typescript/src/utils.ts b/sdks/typescript/src/utils.ts index 0092c25bd..e2424d74d 100644 --- a/sdks/typescript/src/utils.ts +++ b/sdks/typescript/src/utils.ts @@ -1,6 +1,8 @@ import type { QueueStatus, QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitResponse, SessionQueueStatus, SessionQueueStatusResponse, } from './types/api' @@ -32,6 +34,8 @@ function transformSessionQueueStatus( completedWorkUnits: status.completed_work_units, inProgressWorkUnits: status.in_progress_work_units, pendingWorkUnits: status.pending_work_units, + pendingStalledWorkUnits: status.pending_stalled_work_units ?? 0, + pendingReadyWorkUnits: status.pending_ready_work_units ?? 0, } } @@ -53,6 +57,31 @@ export function transformQueueStatus(status: QueueStatusResponse): QueueStatus { completedWorkUnits: status.completed_work_units, inProgressWorkUnits: status.in_progress_work_units, pendingWorkUnits: status.pending_work_units, + pendingStalledWorkUnits: status.pending_stalled_work_units ?? 0, + pendingReadyWorkUnits: status.pending_ready_work_units ?? 0, sessions, } } + +/** + * Transform a QueueWorkUnitResponse to QueueWorkUnit (snake_case to camelCase). + */ +export function transformQueueWorkUnit( + row: QueueWorkUnitResponse +): QueueWorkUnit { + return { + workUnitKey: row.work_unit_key, + taskType: row.task_type, + sessionId: row.session_id, + sessionName: row.session_name, + observer: row.observer, + observed: row.observed, + pendingItems: row.pending_items, + pendingTokens: row.pending_tokens, + tokensUntilThreshold: row.tokens_until_threshold, + hitThreshold: row.hit_threshold, + inProgress: row.in_progress, + oldestItemAt: row.oldest_item_at, + newestItemAt: row.newest_item_at, + } +} diff --git a/sdks/typescript/src/validation.ts b/sdks/typescript/src/validation.ts index fed1b10b7..9d50db7fb 100644 --- a/sdks/typescript/src/validation.ts +++ b/sdks/typescript/src/validation.ts @@ -388,6 +388,23 @@ export const QueueStatusOptionsSchema = z }) .strict() +/** + * Schema for queue work units options (cursor-paginated). + */ +export const QueueWorkUnitsOptionsSchema = z + .object({ + observer: z.union([PeerIdSchema, PeerIdObjectSchema]).optional(), + sender: z.union([PeerIdSchema, PeerIdObjectSchema]).optional(), + session: z.union([SessionIdSchema, SessionIdObjectSchema]).optional(), + cursor: z.string().optional(), + size: z + .number() + .int('Page size must be an integer') + .positive('Page size must be positive') + .optional(), + }) + .strict() + /** * Schema for file upload parameters. * Supports Blob/File objects and custom uploadable objects with binary content. @@ -960,6 +977,7 @@ export type ChatQuery = z.infer export type ContextParams = z.infer export type SearchQueryLike = z.infer export type QueueStatusOptions = z.infer +export type QueueWorkUnitsOptions = z.infer export type FileUpload = z.infer export type GetRepresentationParams = z.infer< typeof GetRepresentationParamsSchema diff --git a/tests/sdk/test_client.py b/tests/sdk/test_client.py index 893711498..04694766b 100644 --- a/tests/sdk/test_client.py +++ b/tests/sdk/test_client.py @@ -1,7 +1,12 @@ import pytest from fastapi.testclient import TestClient -from sdks.python.src.honcho.api_types import QueueStatusResponse +from sdks.python.src.honcho.api_types import ( + QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitsPageAsync, + QueueWorkUnitsPageSync, +) from sdks.python.src.honcho.client import Honcho from sdks.python.src.honcho.message import Message from sdks.python.src.honcho.pagination import AsyncPage, SyncPage @@ -197,6 +202,13 @@ def assert_queue_status(status: QueueStatusResponse) -> None: + status.in_progress_work_units + status.pending_work_units ) + # Phase 1 fields: stalled + ready partition pending + assert status.pending_stalled_work_units >= 0 + assert status.pending_ready_work_units >= 0 + assert ( + status.pending_stalled_work_units + status.pending_ready_work_units + == status.pending_work_units + ) if status.sessions is not None: for session_status in status.sessions.values(): @@ -205,6 +217,11 @@ def assert_queue_status(status: QueueStatusResponse) -> None: + session_status.in_progress_work_units + session_status.pending_work_units ) + assert ( + session_status.pending_stalled_work_units + + session_status.pending_ready_work_units + == session_status.pending_work_units + ) if client_type == "async": # Test with no parameters - this should work in the SDK even though API requires at least one @@ -268,6 +285,88 @@ def assert_queue_status(status: QueueStatusResponse) -> None: assert_queue_status(status) +@pytest.mark.asyncio +async def test_get_queue_work_units(client_fixture: tuple[Honcho, str]): + """ + Tests the cursor-paginated /queue/work-units SDK method. + + Verifies envelope extras are populated, items parse into QueueWorkUnit, + and forward cursor traversal works without duplicates or skips. + """ + honcho_client, client_type = client_fixture + + def assert_envelope( + page: QueueWorkUnitsPageSync | QueueWorkUnitsPageAsync, + ) -> None: + # Envelope extras present and typed + assert isinstance(page.representation_batch_max_tokens, int) + assert isinstance(page.flush_enabled, bool) + assert page.representation_batch_max_tokens > 0 + + if client_type == "async": + # Empty workspace path + page_async = await honcho_client.aio.queue_work_units() + assert isinstance(page_async, QueueWorkUnitsPageAsync) + assert_envelope(page_async) + for wu in page_async.items: + assert isinstance(wu, QueueWorkUnit) + + # Filter combos still parse + peer = await honcho_client.aio.peer(id="test-peer-wu-async") + await peer.aio.get_metadata() + session = await honcho_client.aio.session(id="test-session-wu-async") + await session.aio.get_metadata() + + page_async = await honcho_client.aio.queue_work_units(observer=peer.id) + assert isinstance(page_async, QueueWorkUnitsPageAsync) + + page_async = await honcho_client.aio.queue_work_units(session=session.id) + assert isinstance(page_async, QueueWorkUnitsPageAsync) + + # Session-scoped variant + page_async = await session.aio.queue_work_units() + assert isinstance(page_async, QueueWorkUnitsPageAsync) + assert_envelope(page_async) + + # Size param is plumbed through (small empty page is still valid) + page_async = await honcho_client.aio.queue_work_units(size=10) + assert isinstance(page_async, QueueWorkUnitsPageAsync) + assert len(page_async.items) <= 10 + # Cursor traversal terminates on empty queue + assert page_async.has_next_page() is False + assert page_async.get_next_page is not None + nxt = await page_async.get_next_page() + assert nxt is None + else: + page_sync = honcho_client.queue_work_units() + assert isinstance(page_sync, QueueWorkUnitsPageSync) + assert_envelope(page_sync) + for wu in page_sync.items: + assert isinstance(wu, QueueWorkUnit) + + peer = honcho_client.peer(id="test-peer-wu-sync") + peer.get_metadata() + session = honcho_client.session(id="test-session-wu-sync") + session.get_metadata() + + page_sync = honcho_client.queue_work_units(observer=peer.id) + assert isinstance(page_sync, QueueWorkUnitsPageSync) + + page_sync = honcho_client.queue_work_units(session=session.id) + assert isinstance(page_sync, QueueWorkUnitsPageSync) + + # Session-scoped variant + page_sync = session.queue_work_units() + assert isinstance(page_sync, QueueWorkUnitsPageSync) + assert_envelope(page_sync) + + page_sync = honcho_client.queue_work_units(size=10) + assert isinstance(page_sync, QueueWorkUnitsPageSync) + assert len(page_sync.items) <= 10 + assert page_sync.has_next_page() is False + assert page_sync.get_next_page() is None + + @pytest.mark.asyncio async def test_update_message_with_message_object( client_fixture: tuple[Honcho, str], diff --git a/tests/sdk/test_session.py b/tests/sdk/test_session.py index 3a618891c..9bd52e60f 100644 --- a/tests/sdk/test_session.py +++ b/tests/sdk/test_session.py @@ -1,6 +1,11 @@ import pytest -from sdks.python.src.honcho.api_types import QueueStatusResponse +from sdks.python.src.honcho.api_types import ( + QueueStatusResponse, + QueueWorkUnit, + QueueWorkUnitsPageAsync, + QueueWorkUnitsPageSync, +) from sdks.python.src.honcho.client import Honcho from sdks.python.src.honcho.message import Message from sdks.python.src.honcho.peer import Peer @@ -550,6 +555,50 @@ def assert_session_queue_status(status: QueueStatusResponse) -> None: assert_session_queue_status(status) +@pytest.mark.asyncio +async def test_session_queue_work_units(client_fixture: tuple[Honcho, str]): + """ + Tests cursor-paginated /queue/work-units scoped to a session. + """ + honcho_client, client_type = client_fixture + + if client_type == "async": + session = await honcho_client.aio.session(id="test-session-wu") + await session.aio.get_metadata() + page_async = await session.aio.queue_work_units() + assert isinstance(page_async, QueueWorkUnitsPageAsync) + assert isinstance(page_async.representation_batch_max_tokens, int) + assert isinstance(page_async.flush_enabled, bool) + for wu in page_async.items: + assert isinstance(wu, QueueWorkUnit) + + # Filter combinations still parse + peer = await honcho_client.aio.peer(id="test-peer-session-wu") + await peer.aio.get_metadata() + page_async = await session.aio.queue_work_units(observer=peer.id) + assert isinstance(page_async, QueueWorkUnitsPageAsync) + page_async = await session.aio.queue_work_units( + observer=peer.id, sender=peer.id + ) + assert isinstance(page_async, QueueWorkUnitsPageAsync) + else: + session = honcho_client.session(id="test-session-wu") + session.get_metadata() + page_sync = session.queue_work_units() + assert isinstance(page_sync, QueueWorkUnitsPageSync) + assert isinstance(page_sync.representation_batch_max_tokens, int) + assert isinstance(page_sync.flush_enabled, bool) + for wu in page_sync.items: + assert isinstance(wu, QueueWorkUnit) + + peer = honcho_client.peer(id="test-peer-session-wu") + peer.get_metadata() + page_sync = session.queue_work_units(observer=peer.id) + assert isinstance(page_sync, QueueWorkUnitsPageSync) + page_sync = session.queue_work_units(observer=peer.id, sender=peer.id) + assert isinstance(page_sync, QueueWorkUnitsPageSync) + + @pytest.mark.asyncio async def test_session_clone(client_fixture: tuple[Honcho, str]): """ From 31c2149b9e6618a3142f78af0729921c0a57e1ee Mon Sep 17 00:00:00 2001 From: Vineeth Voruganti <13438633+VVoruganti@users.noreply.github.com> Date: Thu, 21 May 2026 11:11:01 -0400 Subject: [PATCH 4/5] chore(docs): Update CLAUDE.md and add docs page on advanced queue monitoring --- CLAUDE.md | 1 + .../features/advanced/queue-status.mdx | 137 +++ docs/v3/openapi.json | 1071 +++++++++++------ 3 files changed, 808 insertions(+), 401 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 6fa9ce2fd..e47f6ccad 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,6 +44,7 @@ All API routes follow the pattern: `/v3/{resource}/{id}/{action}`. Most "list/se - **Peers**: Create, list, update, chat (dialectic), messages, representation - **Sessions**: Create, list, update, delete, clone, manage peers, get context - **Messages**: Create (batch up to 100), upload (file), list, get, update +- **Queue introspection**: `/queue/status` aggregate counts (incl. `pending_stalled_work_units` / `pending_ready_work_units` split for representation batches below `DERIVER_REPRESENTATION_BATCH_MAX_TOKENS`) and `/queue/work-units` per-work-unit detail (cursor-paginated via `fastapi-pagination.CursorPage`) - **Conclusions**: Create, list, query (semantic search), delete — the API-facing name for observations stored in `(observer, observed)` collections - **Keys**: Create scoped JWTs - **Webhooks**: Register endpoint, list, delete, test diff --git a/docs/v3/documentation/features/advanced/queue-status.mdx b/docs/v3/documentation/features/advanced/queue-status.mdx index 7c4d363cf..dcbaa950d 100644 --- a/docs/v3/documentation/features/advanced/queue-status.mdx +++ b/docs/v3/documentation/features/advanced/queue-status.mdx @@ -43,6 +43,17 @@ class QueueStatus(BaseModel): pending_work_units: int """Work units waiting to be processed""" + pending_stalled_work_units: int + """Pending representation work units waiting to accumulate enough tokens to + hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 when + DERIVER_FLUSH_ENABLED is true.""" + + pending_ready_work_units: int + """Pending work units eligible to be claimed: non-representation task types, + plus representation work units whose pending tokens are at/above the batch + threshold (or when flush is enabled). + pending_stalled_work_units + pending_ready_work_units == pending_work_units.""" + total_work_units: int """Total work units""" @@ -55,12 +66,19 @@ Promise<{ completedWorkUnits: number inProgressWorkUnits: number pendingWorkUnits: number + pendingStalledWorkUnits: number + pendingReadyWorkUnits: number sessions?: Record }> ``` +The `pending_stalled_work_units` / `pending_ready_work_units` split tells you +*why* pending work isn't moving: stalled items are sitting below the deriver's +batch token threshold waiting for more messages, while ready items are eligible +to be picked up by a worker. The two always sum to `pending_work_units`. + Whenever a message is sent it will generate several tasks. These could be tasks such as generating insights, cleaning up a representation, summarizing a conversation etc. These tasks are defined based on who is sending the @@ -147,7 +165,126 @@ async queueStatus( completedWorkUnits: number inProgressWorkUnits: number pendingWorkUnits: number + pendingStalledWorkUnits: number + pendingReadyWorkUnits: number sessions?: Record }> ``` + +## Inspecting individual work units + +When the aggregate counts tell you "something is stalled" but not *which* +work units are stalled, use `queue_work_units` / `queueWorkUnits`. It returns +one row per unprocessed work unit with the token totals, in-progress flag, +and threshold classification needed to debug "why isn't this advancing?". + + +```python Python +from honcho import Honcho +honcho = Honcho() + +page = honcho.queue_work_units() + +# Inspect just the current page +for wu in page.items: + print(wu.work_unit_key, wu.pending_tokens, wu.hit_threshold) + +# Threshold context from the envelope +print(page.representation_batch_max_tokens, page.flush_enabled) + +# Walk the full queue (auto-fetches subsequent pages) +for wu in page: + ... +``` + +```typescript typescript +import { Honcho } from '@honcho-ai/sdk'; + +const honcho = new Honcho({}); + +const page = await honcho.queueWorkUnits(); + +// Current page only +for (const wu of page.items) { + console.log(wu.workUnitKey, wu.pendingTokens, wu.hitThreshold); +} + +// Threshold context from the envelope +console.log(page.representationBatchMaxTokens, page.flushEnabled); + +// Walk the full queue (auto-fetches subsequent pages) +for await (const wu of page) { + // ... +} +``` + + +### Cursor pagination + +The endpoint is **cursor-paginated**, not offset-paginated. The queue mutates +rapidly (workers claim and complete items continuously), and offset pagination +would skip rows that were processed between fetches. Cursor pagination uses +opaque tokens (`next_page` / `previous_page`) that are stable across these +mutations. + +Pass `cursor` and optionally `size` to fetch a specific page, or use the +helpers on the returned page object to navigate. + + +```python Python +# Explicit cursor navigation +page = honcho.queue_work_units(size=50) +while page.has_next_page(): + page = page.get_next_page() + process(page.items) + +# Or pass a cursor token directly +page = honcho.queue_work_units(cursor="") +``` + +```typescript typescript +let page = await honcho.queueWorkUnits({ size: 50 }); +while (page.hasNextPage) { + const next = await page.getNextPage(); + if (!next) break; + page = next; + process(page.items); +} +``` + + +### Per-work-unit fields + +| Field | Type | Description | +|---|---|---| +| `work_unit_key` | `str` | Full key, e.g. `representation:ws_abc:sess_xyz:peer_observed` | +| `task_type` | `str` | `"representation"`, `"summary"`, or `"dream"` | +| `session_id` | `str \| null` | FK to the session row; null for task types without a session | +| `session_name` | `str \| null` | Human-readable session name | +| `observer` | `str \| null` | Observer peer (from queue payload) | +| `observed` | `str \| null` | Observed peer (from queue payload) | +| `pending_items` | `int` | Unprocessed queue items in this work unit | +| `pending_tokens` | `int` | Sum of `messages.token_count` across the pending items | +| `tokens_until_threshold` | `int` | Tokens still needed to fire the batch (0 for non-representation task types or when flush is enabled) | +| `hit_threshold` | `bool` | True if eligible to be claimed; false means stalled | +| `in_progress` | `bool` | True if a deriver worker has claimed this work unit | +| `oldest_item_at` | `datetime` | Oldest pending queue-item creation timestamp | +| `newest_item_at` | `datetime` | Newest pending queue-item creation timestamp | + +Each page also carries the deriver's threshold configuration so you can +interpret per-row classification without re-querying server settings: + +| Field | Type | Description | +|---|---|---| +| `representation_batch_max_tokens` | `int` | `DERIVER_REPRESENTATION_BATCH_MAX_TOKENS` at request time | +| `flush_enabled` | `bool` | `DERIVER_FLUSH_ENABLED` at request time | + + +**Cursor pagination is stable, but the underlying data is not.** Items can be +processed between pages, and new items can be enqueued. Each page is a +snapshot of what the server saw at request time, but pages may be +inconsistent with each other under concurrent processing. Use `page.items` +for a stable per-page view; iterate the page object only when an approximate +walk is acceptable. + diff --git a/docs/v3/openapi.json b/docs/v3/openapi.json index a2e46b2c2..ff033364d 100644 --- a/docs/v3/openapi.json +++ b/docs/v3/openapi.json @@ -11,9 +11,10 @@ }, "license": { "name": "GNU Affero General Public License v3.0", + "identifier": "AGPL-3.0-only", "url": "https://github.com/plastic-labs/honcho/blob/main/LICENSE" }, - "version": "3.0.3" + "version": "3.0.6" }, "servers": [ { @@ -28,7 +29,9 @@ "paths": { "/v3/workspaces": { "post": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Get Or Create Workspace", "description": "Get a Workspace by ID.\n\nIf workspace_id is provided as a query parameter, it uses that (must match JWT workspace_id).\nOtherwise, it uses the workspace_id from the JWT.", "operationId": "get_or_create_workspace_v3_workspaces_post", @@ -68,52 +71,18 @@ "security": [ { "HTTPBearer": [] - }, - {} + } ] } }, "/v3/workspaces/list": { "post": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Get All Workspaces", "description": "Get all Workspaces, paginated with optional filters.", "operationId": "get_all_workspaces_v3_workspaces_list_post", - "security": [ - { - "HTTPBearer": [] - }, - {} - ], - "parameters": [ - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" - } - ], "requestBody": { "content": { "application/json": { @@ -126,8 +95,8 @@ "type": "null" } ], - "description": "Filtering and pagination options for the workspaces list", - "title": "Options" + "title": "Options", + "description": "Filtering and pagination options for the workspaces list" } } } @@ -153,20 +122,26 @@ } } } - } + }, + "security": [ + { + "HTTPBearer": [] + } + ] } }, "/v3/workspaces/{workspace_id}": { "put": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Update Workspace", "description": "Update Workspace metadata and/or configuration.", "operationId": "update_workspace_v3_workspaces__workspace_id__put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -214,15 +189,16 @@ } }, "delete": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Delete Workspace", "description": "Delete a Workspace. This accepts the deletion request and processes it in the background,\npermanently deleting all peers, messages, conclusions, and other resources associated\nwith the workspace.\n\nReturns 409 Conflict if the workspace contains active sessions.\nDelete all sessions first, then delete the workspace.\n\nThis action cannot be undone.", "operationId": "delete_workspace_v3_workspaces__workspace_id__delete", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -259,15 +235,16 @@ }, "/v3/workspaces/{workspace_id}/search": { "post": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Search Workspace", "description": "Search messages in a Workspace using optional filters. Use `limit` to control the number of\nresults returned.", "operationId": "search_workspace_v3_workspaces__workspace_id__search_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -321,15 +298,16 @@ }, "/v3/workspaces/{workspace_id}/queue/status": { "get": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Get Queue Status", "description": "Get the processing queue status for a Workspace, optionally scoped to an observer, sender,\nand/or session.\n\nOnly tracks user-facing task types (representation, summary, dream).\nInternal infrastructure tasks (reconciler, webhook, deletion) are excluded.\nNote: completed counts reflect items since the last periodic queue cleanup,\nnot lifetime totals.", "operationId": "get_queue_status_v3_workspaces__workspace_id__queue_status_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -420,17 +398,120 @@ } } }, + "/v3/workspaces/{workspace_id}/queue/work-units": { + "get": { + "tags": [ + "workspaces" + ], + "summary": "Get Queue Work Units", + "description": "Return one row per unprocessed work unit in the Workspace's queue, with\ntoken totals, in-progress flag, and threshold classification. Cursor-\npaginated (the queue mutates rapidly; offset pagination would skip rows\nas workers process items between page fetches).\n\nUseful for debugging \"why isn't this work unit advancing?\" \u2014 distinguishes\nwork units stalled below the batch token threshold from those claimed by a\nworker or eligible to be claimed. Same filter semantics as /queue/status.", + "operationId": "get_queue_work_units_v3_workspaces__workspace_id__queue_work_units_get", + "security": [ + { + "HTTPBearer": [] + } + ], + "parameters": [ + { + "name": "workspace_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Workspace Id" + } + }, + { + "name": "observer_id", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Optional observer ID to filter by", + "title": "Observer Id" + }, + "description": "Optional observer ID to filter by" + }, + { + "name": "sender_id", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Optional sender ID to filter by", + "title": "Sender Id" + }, + "description": "Optional sender ID to filter by" + }, + { + "name": "session_id", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Optional session ID to filter by", + "title": "Session Id" + }, + "description": "Optional session ID to filter by" + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/QueueWorkUnitsPage" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/v3/workspaces/{workspace_id}/schedule_dream": { "post": { - "tags": ["workspaces"], + "tags": [ + "workspaces" + ], "summary": "Schedule Dream", "description": "Manually schedule a dream task for a specific collection.\n\nThis endpoint bypasses all automatic dream conditions (document threshold,\nminimum hours between dreams) and schedules the dream task for a future execution.\n\nCurrently this endpoint only supports scheduling immediate dreams. In the future,\nusers may pass a cron-style expression to schedule dreams at specific times.", "operationId": "schedule_dream_v3_workspaces__workspace_id__schedule_dream_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -473,15 +554,16 @@ }, "/v3/workspaces/{workspace_id}/peers/list": { "post": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Peers", "description": "Get all Peers for a Workspace, paginated with optional filters.", "operationId": "get_peers_v3_workspaces__workspace_id__peers_list_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -492,33 +574,6 @@ "type": "string", "title": "Workspace Id" } - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "requestBody": { @@ -565,15 +620,16 @@ }, "/v3/workspaces/{workspace_id}/peers": { "post": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Or Create Peer", "description": "Get a Peer by ID or create a new Peer with the given ID.\n\nIf peer_id is provided as a query parameter, it uses that (must match JWT workspace_id).\nOtherwise, it uses the peer_id from the JWT.", "operationId": "get_or_create_peer_v3_workspaces__workspace_id__peers_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -623,15 +679,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}": { "put": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Update Peer", "description": "Update a Peer's metadata and/or configuration.", "operationId": "update_peer_v3_workspaces__workspace_id__peers__peer_id__put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -690,15 +747,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/sessions": { "post": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Sessions For Peer", "description": "Get all Sessions for a Peer, paginated with optional filters.", "operationId": "get_sessions_for_peer_v3_workspaces__workspace_id__peers__peer_id__sessions_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -718,33 +776,6 @@ "type": "string", "title": "Peer Id" } - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "requestBody": { @@ -791,15 +822,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/chat": { "post": { - "tags": ["peers"], - "summary": "Query a Peer's representation using natural language", + "tags": [ + "peers" + ], + "summary": "Chat", "description": "Query a Peer's representation using natural language. Performs agentic search and reasoning to comprehensively\nanswer the query based on all latent knowledge gathered about the peer from their messages and conclusions.", "operationId": "chat_v3_workspaces__workspace_id__peers__peer_id__chat_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -850,7 +882,9 @@ "title": "Content" } }, - "required": ["content"], + "required": [ + "content" + ], "title": "DialecticResponse", "type": "object" } @@ -873,15 +907,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/representation": { "post": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Representation", "description": "Get a curated subset of a Peer's Representation. A Representation is always a subset of the total\nknowledge about the Peer. The subset can be scoped and filtered in various ways.\n\n\nIf a session_id is provided in the body, we get the Representation of the Peer scoped to that Session.\nIf a target is provided, we get the Representation of the target from the perspective of the Peer.\nIf no target is provided, we get the omniscient Honcho Representation of the Peer.", "operationId": "get_representation_v3_workspaces__workspace_id__peers__peer_id__representation_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -940,15 +975,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/card": { "get": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Peer Card", "description": "Get a peer card for a specific peer relationship.\n\nReturns the peer card that the observer peer has for the target peer if it exists.\nIf no target is specified, returns the observer's own peer card.", "operationId": "get_peer_card_v3_workspaces__workspace_id__peers__peer_id__card_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1014,15 +1050,16 @@ } }, "put": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Set Peer Card", "description": "Set a peer card for a specific peer relationship.\n\nSets the peer card that the observer peer has for the target peer.\nIf no target is specified, sets the observer's own peer card.", "operationId": "set_peer_card_v3_workspaces__workspace_id__peers__peer_id__card_put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1101,15 +1138,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/context": { "get": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Get Peer Context", "description": "Get context for a peer, including their representation and peer card.\n\nThis endpoint returns a curated subset of the representation and peer card for a peer.\nIf a target is specified, returns the context for the target from the\nobserver peer's perspective. If no target is specified, returns the\npeer's own context (self-observation).\n\nThis is useful for getting all the context needed about a peer without\nmaking multiple API calls.", "operationId": "get_peer_context_v3_workspaces__workspace_id__peers__peer_id__context_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1267,15 +1305,16 @@ }, "/v3/workspaces/{workspace_id}/peers/{peer_id}/search": { "post": { - "tags": ["peers"], + "tags": [ + "peers" + ], "summary": "Search Peer", "description": "Search a Peer's messages, optionally filtered by various criteria.", "operationId": "search_peer_v3_workspaces__workspace_id__peers__peer_id__search_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1338,15 +1377,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/list": { "post": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Sessions", "description": "Get all Sessions for a Workspace, paginated with optional filters.", "operationId": "get_sessions_v3_workspaces__workspace_id__sessions_list_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1357,33 +1397,6 @@ "type": "string", "title": "Workspace Id" } - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "requestBody": { @@ -1430,15 +1443,16 @@ }, "/v3/workspaces/{workspace_id}/sessions": { "post": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Or Create Session", "description": "Get a Session by ID or create a new Session with the given ID.\n\nIf Session ID is provided as a parameter, it verifies the Session is in the Workspace.\nOtherwise, it uses the session_id from the JWT for verification.", "operationId": "get_or_create_session_v3_workspaces__workspace_id__sessions_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1488,15 +1502,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}": { "put": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Update Session", "description": "Update a Session's metadata and/or configuration.", "operationId": "update_session_v3_workspaces__workspace_id__sessions__session_id__put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1553,15 +1568,16 @@ } }, "delete": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Delete Session", "description": "Delete a Session and all associated messages.\n\nThe Session is marked as inactive immediately and returns 202 Accepted. The actual\ndeletion of all related data happens asynchronously via the queue with retry support.\n\nThis action cannot be undone.", "operationId": "delete_session_v3_workspaces__workspace_id__sessions__session_id__delete", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1607,15 +1623,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/clone": { "post": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Clone Session", "description": "Clone a Session, optionally up to a specific message ID.", "operationId": "clone_session_v3_workspaces__workspace_id__sessions__session_id__clone_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1681,15 +1698,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/peers": { "post": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Add Peers To Session", "description": "Add Peers to a Session. If a Peer does not yet exist, it will be created automatically.", "operationId": "add_peers_to_session_v3_workspaces__workspace_id__sessions__session_id__peers_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1750,15 +1768,16 @@ } }, "put": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Set Session Peers", "description": "Set the Peers in a Session. If a Peer does not yet exist, it will be created automatically.\n\nThis will fully replace the current set of Peers in the Session.", "operationId": "set_session_peers_v3_workspaces__workspace_id__sessions__session_id__peers_put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1819,15 +1838,16 @@ } }, "delete": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Remove Peers From Session", "description": "Remove Peers by ID from a Session.", "operationId": "remove_peers_from_session_v3_workspaces__workspace_id__sessions__session_id__peers_delete", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1888,15 +1908,16 @@ } }, "get": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Session Peers", "description": "Get all Peers in a Session. Results are paginated.", "operationId": "get_session_peers_v3_workspaces__workspace_id__sessions__session_id__peers_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -1916,33 +1937,6 @@ "type": "string", "title": "Session Id" } - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "responses": { @@ -1971,15 +1965,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/peers/{peer_id}/config": { "get": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Peer Config", "description": "Get the configuration for a Peer in a Session.", "operationId": "get_peer_config_v3_workspaces__workspace_id__sessions__session_id__peers__peer_id__config_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2034,15 +2029,16 @@ } }, "put": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Set Peer Config", "description": "Set the configuration for a Peer in a Session.", "operationId": "set_peer_config_v3_workspaces__workspace_id__sessions__session_id__peers__peer_id__config_put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2103,15 +2099,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/context": { "get": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Session Context", "description": "Produce a context object from the Session. The caller provides an optional token limit which the entire context must fit into.\nIf not provided, the context will be exhaustive (within configured max tokens). To do this, we allocate 40% of the token limit\nto the summary, and 60% to recent messages -- as many as can fit. Note that the summary will usually take up less space than\nthis. If the caller does not want a summary, we allocate all the tokens to recent messages.", "operationId": "get_session_context_v3_workspaces__workspace_id__sessions__session_id__context_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2328,15 +2325,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/summaries": { "get": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Get Session Summaries", "description": "Get available summaries for a Session.\n\nReturns both short and long summaries if available, including metadata like\nthe message ID they cover up to, creation timestamp, and token count.", "operationId": "get_session_summaries_v3_workspaces__workspace_id__sessions__session_id__summaries_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2384,15 +2382,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/search": { "post": { - "tags": ["sessions"], + "tags": [ + "sessions" + ], "summary": "Search Session", "description": "Search a Session with optional filters. Use `limit` to control the number of results returned.", "operationId": "search_session_v3_workspaces__workspace_id__sessions__session_id__search_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2455,15 +2454,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/messages": { "post": { - "tags": ["messages"], + "tags": [ + "messages" + ], "summary": "Create Messages For Session", "description": "Add new message(s) to a session.", "operationId": "create_messages_for_session_v3_workspaces__workspace_id__sessions__session_id__messages_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2525,15 +2525,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/messages/upload": { "post": { - "tags": ["messages"], + "tags": [ + "messages" + ], "summary": "Create Messages With File", "description": "Create messages from uploaded files. Files are converted to text and split into multiple messages.", "operationId": "create_messages_with_file_v3_workspaces__workspace_id__sessions__session_id__messages_upload_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2595,15 +2596,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/messages/list": { "post": { - "tags": ["messages"], + "tags": [ + "messages" + ], "summary": "Get Messages", "description": "Get all messages for a Session with optional filters. Results are paginated.", "operationId": "get_messages_v3_workspaces__workspace_id__sessions__session_id__messages_list_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2642,33 +2644,6 @@ "title": "Reverse" }, "description": "Whether to reverse the order of results" - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "requestBody": { @@ -2715,15 +2690,16 @@ }, "/v3/workspaces/{workspace_id}/sessions/{session_id}/messages/{message_id}": { "get": { - "tags": ["messages"], + "tags": [ + "messages" + ], "summary": "Get Message", "description": "Get a single message by ID from a Session.", "operationId": "get_message_v3_workspaces__workspace_id__sessions__session_id__messages__message_id__get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2778,15 +2754,16 @@ } }, "put": { - "tags": ["messages"], + "tags": [ + "messages" + ], "summary": "Update Message", "description": "Update the metadata of a message.\n\nThis will overwrite any existing metadata for the message.", "operationId": "update_message_v3_workspaces__workspace_id__sessions__session_id__messages__message_id__put", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2854,15 +2831,16 @@ }, "/v3/workspaces/{workspace_id}/conclusions": { "post": { - "tags": ["conclusions"], + "tags": [ + "conclusions" + ], "summary": "Create Conclusions", "description": "Create one or more Conclusions.\n\nConclusions are logical certainties derived from interactions between Peers. They form the basis of a Peer's Representation.", "operationId": "create_conclusions_v3_workspaces__workspace_id__conclusions_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2916,15 +2894,16 @@ }, "/v3/workspaces/{workspace_id}/conclusions/list": { "post": { - "tags": ["conclusions"], + "tags": [ + "conclusions" + ], "summary": "List Conclusions", "description": "List Conclusions using optional filters, ordered by recency unless `reverse` is true. Results are paginated.", "operationId": "list_conclusions_v3_workspaces__workspace_id__conclusions_list_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -2950,37 +2929,10 @@ } ], "description": "Whether to reverse the order of results", - "default": false, - "title": "Reverse" - }, - "description": "Whether to reverse the order of results" - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" + "default": false, + "title": "Reverse" }, - "description": "Page size" + "description": "Whether to reverse the order of results" } ], "requestBody": { @@ -3027,15 +2979,16 @@ }, "/v3/workspaces/{workspace_id}/conclusions/query": { "post": { - "tags": ["conclusions"], + "tags": [ + "conclusions" + ], "summary": "Query Conclusions", "description": "Query Conclusions using semantic search. Use `top_k` to control the number of results returned.", "operationId": "query_conclusions_v3_workspaces__workspace_id__conclusions_query_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3089,15 +3042,16 @@ }, "/v3/workspaces/{workspace_id}/conclusions/{conclusion_id}": { "delete": { - "tags": ["conclusions"], + "tags": [ + "conclusions" + ], "summary": "Delete Conclusion", "description": "Delete a single Conclusion by ID.\n\nThis action cannot be undone.", "operationId": "delete_conclusion_v3_workspaces__workspace_id__conclusions__conclusion_id__delete", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3138,15 +3092,16 @@ }, "/v3/keys": { "post": { - "tags": ["keys"], + "tags": [ + "keys" + ], "summary": "Create Key", "description": "Create a new Key", "operationId": "create_key_v3_keys_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3245,15 +3200,16 @@ }, "/v3/workspaces/{workspace_id}/webhooks": { "post": { - "tags": ["webhooks"], + "tags": [ + "webhooks" + ], "summary": "Get Or Create Webhook Endpoint", "description": "Get or create a webhook endpoint URL.", "operationId": "get_or_create_webhook_endpoint_v3_workspaces__workspace_id__webhooks_post", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3303,15 +3259,16 @@ } }, "get": { - "tags": ["webhooks"], + "tags": [ + "webhooks" + ], "summary": "List Webhook Endpoints", "description": "List all webhook endpoints, optionally filtered by workspace.", "operationId": "list_webhook_endpoints_v3_workspaces__workspace_id__webhooks_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3324,33 +3281,6 @@ "title": "Workspace Id" }, "description": "Workspace ID" - }, - { - "name": "page", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "minimum": 1, - "description": "Page number", - "default": 1, - "title": "Page" - }, - "description": "Page number" - }, - { - "name": "size", - "in": "query", - "required": false, - "schema": { - "type": "integer", - "maximum": 100, - "minimum": 1, - "description": "Page size", - "default": 50, - "title": "Size" - }, - "description": "Page size" } ], "responses": { @@ -3379,15 +3309,16 @@ }, "/v3/workspaces/{workspace_id}/webhooks/{endpoint_id}": { "delete": { - "tags": ["webhooks"], + "tags": [ + "webhooks" + ], "summary": "Delete Webhook Endpoint", "description": "Delete a specific webhook endpoint.", "operationId": "delete_webhook_endpoint_v3_workspaces__workspace_id__webhooks__endpoint_id__delete", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3432,15 +3363,16 @@ }, "/v3/workspaces/{workspace_id}/webhooks/test": { "get": { - "tags": ["webhooks"], + "tags": [ + "webhooks" + ], "summary": "Test Emit", "description": "Test publishing a webhook event.", "operationId": "test_emit_v3_workspaces__workspace_id__webhooks_test_get", "security": [ { "HTTPBearer": [] - }, - {} + } ], "parameters": [ { @@ -3476,6 +3408,23 @@ } } } + }, + "/health": { + "get": { + "summary": "Health Check", + "description": "Health check endpoint for monitoring and container orchestration.", + "operationId": "health_check_health_get", + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": {} + } + } + } + } + } } }, "components": { @@ -3526,7 +3475,10 @@ } }, "type": "object", - "required": ["file", "peer_id"], + "required": [ + "file", + "peer_id" + ], "title": "Body_create_messages_with_file_v3_workspaces__workspace_id__sessions__session_id__messages_upload_post" }, "Conclusion": { @@ -3590,7 +3542,9 @@ } }, "type": "object", - "required": ["conclusions"], + "required": [ + "conclusions" + ], "title": "ConclusionBatchCreate", "description": "Schema for batch conclusion creation with a max of 100 conclusions." }, @@ -3626,7 +3580,11 @@ } }, "type": "object", - "required": ["content", "observer_id", "observed_id"], + "required": [ + "content", + "observer_id", + "observed_id" + ], "title": "ConclusionCreate", "description": "Schema for creating a single conclusion." }, @@ -3693,7 +3651,9 @@ } }, "type": "object", - "required": ["query"], + "required": [ + "query" + ], "title": "ConclusionQuery", "description": "Query parameters for semantic search of conclusions." }, @@ -3737,14 +3697,22 @@ }, "reasoning_level": { "type": "string", - "enum": ["minimal", "low", "medium", "high", "max"], + "enum": [ + "minimal", + "low", + "medium", + "high", + "max" + ], "title": "Reasoning Level", "description": "Level of reasoning to apply: minimal, low, medium, high, or max", "default": "low" } }, "type": "object", - "required": ["query"], + "required": [ + "query" + ], "title": "DialecticOptions" }, "DreamConfiguration": { @@ -3767,7 +3735,9 @@ }, "DreamType": { "type": "string", - "enum": ["omni"], + "enum": [ + "omni" + ], "title": "DreamType", "description": "Types of dreams that can be triggered." }, @@ -3846,7 +3816,9 @@ } }, "type": "object", - "required": ["messages"], + "required": [ + "messages" + ], "title": "MessageBatchCreate", "description": "Schema for batch message creation with a max of 100 messages" }, @@ -3916,7 +3888,10 @@ } }, "type": "object", - "required": ["content", "peer_id"], + "required": [ + "content", + "peer_id" + ], "title": "MessageCreate" }, "MessageGet": { @@ -3967,7 +3942,9 @@ } }, "type": "object", - "required": ["query"], + "required": [ + "query" + ], "title": "MessageSearchOptions" }, "MessageUpdate": { @@ -4019,7 +3996,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[Conclusion]" }, "Page_Message_": { @@ -4053,7 +4036,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[Message]" }, "Page_Peer_": { @@ -4087,7 +4076,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[Peer]" }, "Page_Session_": { @@ -4121,7 +4116,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[Session]" }, "Page_WebhookEndpoint_": { @@ -4155,7 +4156,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[WebhookEndpoint]" }, "Page_Workspace_": { @@ -4189,7 +4196,13 @@ } }, "type": "object", - "required": ["items", "total", "page", "size", "pages"], + "required": [ + "items", + "total", + "page", + "size", + "pages" + ], "title": "Page[Workspace]" }, "Peer": { @@ -4219,7 +4232,11 @@ } }, "type": "object", - "required": ["id", "workspace_id", "created_at"], + "required": [ + "id", + "workspace_id", + "created_at" + ], "title": "Peer" }, "PeerCardConfiguration": { @@ -4285,7 +4302,9 @@ } }, "type": "object", - "required": ["peer_card"], + "required": [ + "peer_card" + ], "title": "PeerCardSet" }, "PeerContext": { @@ -4329,7 +4348,10 @@ } }, "type": "object", - "required": ["peer_id", "target_id"], + "required": [ + "peer_id", + "target_id" + ], "title": "PeerContext", "description": "Context for a peer, including representation and peer card." }, @@ -4337,7 +4359,7 @@ "properties": { "id": { "type": "string", - "maxLength": 100, + "maxLength": 512, "minLength": 1, "pattern": "^[a-zA-Z0-9_-]+$", "title": "Id" @@ -4368,7 +4390,9 @@ } }, "type": "object", - "required": ["id"], + "required": [ + "id" + ], "title": "PeerCreate" }, "PeerGet": { @@ -4538,6 +4562,18 @@ "title": "Pending Work Units", "description": "Work units waiting to be processed" }, + "pending_stalled_work_units": { + "type": "integer", + "title": "Pending Stalled Work Units", + "description": "Pending representation work units waiting to accumulate enough tokens to hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 when DERIVER_FLUSH_ENABLED is true.", + "default": 0 + }, + "pending_ready_work_units": { + "type": "integer", + "title": "Pending Ready Work Units", + "description": "Pending work units eligible to be claimed: non-representation task types, plus representation work units whose pending tokens are at/above the batch threshold (or when flush is enabled). pending_stalled_work_units + pending_ready_work_units == pending_work_units.", + "default": 0 + }, "sessions": { "anyOf": [ { @@ -4564,6 +4600,194 @@ "title": "QueueStatus", "description": "Aggregated processing queue status.\n\nTracks user-facing task types only: representation, summary, and dream.\nInternal infrastructure tasks (reconciler, webhook, deletion) are excluded.\n\nNote: completed_work_units reflects items since the last periodic queue\ncleanup, not lifetime totals." }, + "QueueWorkUnit": { + "properties": { + "work_unit_key": { + "type": "string", + "title": "Work Unit Key" + }, + "task_type": { + "type": "string", + "title": "Task Type" + }, + "session_id": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Session Id" + }, + "session_name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Session Name" + }, + "observer": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Observer" + }, + "observed": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Observed" + }, + "pending_items": { + "type": "integer", + "title": "Pending Items", + "description": "Unprocessed queue items in this work unit" + }, + "pending_tokens": { + "type": "integer", + "title": "Pending Tokens", + "description": "Sum of token_count across messages on unprocessed queue items" + }, + "tokens_until_threshold": { + "type": "integer", + "title": "Tokens Until Threshold", + "description": "Tokens still needed before the deriver will claim this batch. 0 for non-representation task types and when flush is enabled." + }, + "hit_threshold": { + "type": "boolean", + "title": "Hit Threshold", + "description": "True if this work unit is eligible to be claimed (not threshold-gated, threshold met, or flush enabled). False means stalled." + }, + "in_progress": { + "type": "boolean", + "title": "In Progress", + "description": "True if a deriver worker has claimed this work unit" + }, + "oldest_item_at": { + "type": "string", + "format": "date-time", + "title": "Oldest Item At" + }, + "newest_item_at": { + "type": "string", + "format": "date-time", + "title": "Newest Item At" + } + }, + "type": "object", + "required": [ + "work_unit_key", + "task_type", + "pending_items", + "pending_tokens", + "tokens_until_threshold", + "hit_threshold", + "in_progress", + "oldest_item_at", + "newest_item_at" + ], + "title": "QueueWorkUnit", + "description": "Per-work-unit breakdown returned by /queue/work-units.\n\n`hit_threshold` and `tokens_until_threshold` apply only to representation\nwork units (the only task type gated by DERIVER_REPRESENTATION_BATCH_MAX_TOKENS).\nFor other task types, `hit_threshold` is always True and\n`tokens_until_threshold` is 0." + }, + "QueueWorkUnitsPage": { + "properties": { + "items": { + "items": { + "$ref": "#/components/schemas/QueueWorkUnit" + }, + "type": "array", + "title": "Items" + }, + "total": { + "type": "integer", + "minimum": 0.0, + "title": "Total" + }, + "current_page": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Current Page", + "description": "Cursor to refetch the current page" + }, + "current_page_backwards": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Current Page Backwards", + "description": "Cursor to refetch the current page starting from the last item" + }, + "previous_page": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Previous Page", + "description": "Cursor for the previous page" + }, + "next_page": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Next Page", + "description": "Cursor for the next page" + }, + "representation_batch_max_tokens": { + "type": "integer", + "title": "Representation Batch Max Tokens", + "description": "DERIVER_REPRESENTATION_BATCH_MAX_TOKENS at the time of the request" + }, + "flush_enabled": { + "type": "boolean", + "title": "Flush Enabled", + "description": "DERIVER_FLUSH_ENABLED. When true, the batch threshold is bypassed and all pending representation work units are eligible to be claimed." + } + }, + "type": "object", + "required": [ + "items", + "total", + "representation_batch_max_tokens", + "flush_enabled" + ], + "title": "QueueWorkUnitsPage", + "description": "Cursor-paginated /queue/work-units response.\n\nCursor pagination is used because the queue mutates rapidly (workers claim\nand complete items continuously) \u2014 offset pagination would skip rows when\nitems are processed between page fetches.\n\nStandard CursorPage envelope (`items`, `total`, `current_page`,\n`next_page`, `previous_page`) plus two additional fields describing the\nderiver's threshold configuration, needed to interpret per-row\n`hit_threshold` and `tokens_until_threshold`." + }, "ReasoningConfiguration": { "properties": { "enabled": { @@ -4588,7 +4812,7 @@ } ], "title": "Custom Instructions", - "description": "TODO: currently unused. Custom instructions to use for the reasoning system on this workspace/session/message." + "description": "Optional custom instructions for the reasoning system on this workspace/session/message. Rejected if they exceed the deriver custom-instruction token cap." } }, "type": "object", @@ -4602,7 +4826,9 @@ } }, "type": "object", - "required": ["representation"], + "required": [ + "representation" + ], "title": "RepresentationResponse" }, "ScheduleDreamRequest": { @@ -4642,7 +4868,10 @@ } }, "type": "object", - "required": ["observer", "dream_type"], + "required": [ + "observer", + "dream_type" + ], "title": "ScheduleDreamRequest" }, "Session": { @@ -4676,7 +4905,12 @@ } }, "type": "object", - "required": ["id", "is_active", "workspace_id", "created_at"], + "required": [ + "id", + "is_active", + "workspace_id", + "created_at" + ], "title": "Session" }, "SessionConfiguration": { @@ -4784,14 +5018,17 @@ } }, "type": "object", - "required": ["id", "messages"], + "required": [ + "id", + "messages" + ], "title": "SessionContext" }, "SessionCreate": { "properties": { "id": { "type": "string", - "maxLength": 100, + "maxLength": 512, "minLength": 1, "pattern": "^[a-zA-Z0-9_-]+$", "title": "Id" @@ -4834,7 +5071,9 @@ } }, "type": "object", - "required": ["id"], + "required": [ + "id" + ], "title": "SessionCreate" }, "SessionGet": { @@ -4918,6 +5157,18 @@ "type": "integer", "title": "Pending Work Units", "description": "Work units waiting to be processed" + }, + "pending_stalled_work_units": { + "type": "integer", + "title": "Pending Stalled Work Units", + "description": "Pending representation work units waiting to accumulate enough tokens to hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 when DERIVER_FLUSH_ENABLED is true.", + "default": 0 + }, + "pending_ready_work_units": { + "type": "integer", + "title": "Pending Ready Work Units", + "description": "Pending work units eligible to be claimed: non-representation task types, plus representation work units whose pending tokens are at/above the batch threshold (or when flush is enabled). pending_stalled_work_units + pending_ready_work_units == pending_work_units.", + "default": 0 } }, "type": "object", @@ -4960,7 +5211,9 @@ } }, "type": "object", - "required": ["id"], + "required": [ + "id" + ], "title": "SessionSummaries" }, "SessionUpdate": { @@ -5106,7 +5359,11 @@ } }, "type": "object", - "required": ["loc", "msg", "type"], + "required": [ + "loc", + "msg", + "type" + ], "title": "ValidationError" }, "WebhookEndpoint": { @@ -5137,7 +5394,12 @@ } }, "type": "object", - "required": ["id", "workspace_id", "url", "created_at"], + "required": [ + "id", + "workspace_id", + "url", + "created_at" + ], "title": "WebhookEndpoint" }, "WebhookEndpointCreate": { @@ -5148,7 +5410,9 @@ } }, "type": "object", - "required": ["url"], + "required": [ + "url" + ], "title": "WebhookEndpointCreate" }, "Workspace": { @@ -5174,7 +5438,10 @@ } }, "type": "object", - "required": ["id", "created_at"], + "required": [ + "id", + "created_at" + ], "title": "Workspace" }, "WorkspaceConfiguration": { @@ -5233,7 +5500,7 @@ "properties": { "id": { "type": "string", - "maxLength": 100, + "maxLength": 512, "minLength": 1, "pattern": "^[a-zA-Z0-9_-]+$", "title": "Id" @@ -5249,7 +5516,9 @@ } }, "type": "object", - "required": ["id"], + "required": [ + "id" + ], "title": "WorkspaceCreate" }, "WorkspaceGet": { From e98ae24a691d7daa7c843c349a267e2efae3db64 Mon Sep 17 00:00:00 2001 From: Vineeth Voruganti <13438633+VVoruganti@users.noreply.github.com> Date: Mon, 22 Jun 2026 18:02:44 -0400 Subject: [PATCH 5/5] chore(queue): review cleanups for work-units endpoint - Order work-units cursor query by labeled oldest_item_at column - Document item-vs-work-unit counting difference between queue endpoints - Remove unused QueueWorkUnitsParams type from TS SDK --- docs/v3/documentation/features/advanced/queue-status.mdx | 9 +++++++++ sdks/typescript/src/types/api.ts | 8 -------- src/crud/deriver.py | 8 ++++++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/docs/v3/documentation/features/advanced/queue-status.mdx b/docs/v3/documentation/features/advanced/queue-status.mdx index dcbaa950d..bcb67e723 100644 --- a/docs/v3/documentation/features/advanced/queue-status.mdx +++ b/docs/v3/documentation/features/advanced/queue-status.mdx @@ -179,6 +179,15 @@ work units are stalled, use `queue_work_units` / `queueWorkUnits`. It returns one row per unprocessed work unit with the token totals, in-progress flag, and threshold classification needed to debug "why isn't this advancing?". + +The two endpoints count at different granularities. `queue/status` counts +individual queue items (one per message awaiting processing), while +`queue/work-units` returns one row per work unit — items sharing a +`work_unit_key` collapse into a single row. So a status response reporting +`pending_work_units: 3` can correspond to a single row (`total: 1`) here when +those three items belong to the same work unit. + + ```python Python from honcho import Honcho diff --git a/sdks/typescript/src/types/api.ts b/sdks/typescript/src/types/api.ts index a35423ea5..fdb7b5c49 100644 --- a/sdks/typescript/src/types/api.ts +++ b/sdks/typescript/src/types/api.ts @@ -340,14 +340,6 @@ export interface QueueStatusParams { session_id?: string } -export interface QueueWorkUnitsParams { - observer_id?: string - sender_id?: string - session_id?: string - cursor?: string - size?: number -} - /** * Queue status scoped to a single session. */ diff --git a/src/crud/deriver.py b/src/crud/deriver.py index e623c8dac..55e992146 100644 --- a/src/crud/deriver.py +++ b/src/crud/deriver.py @@ -271,6 +271,10 @@ def _build_queue_work_units_query( observer_name_expr = models.QueueItem.payload["observer"].astext observed_name_expr = models.QueueItem.payload["observed"].astext + # Labeled so the cursor orders by (and binds the keyset to) a named SELECT + # column rather than re-deriving the bare aggregate expression. + oldest_item_at = func.min(models.QueueItem.created_at).label("oldest_item_at") + stmt = ( select( models.QueueItem.work_unit_key.label("work_unit_key"), @@ -283,7 +287,7 @@ def _build_queue_work_units_query( func.coalesce( func.sum(func.coalesce(models.Message.token_count, 0)), 0 ).label("pending_tokens"), - func.min(models.QueueItem.created_at).label("oldest_item_at"), + oldest_item_at, func.max(models.QueueItem.created_at).label("newest_item_at"), func.bool_or(models.ActiveQueueSession.id.isnot(None)).label("in_progress"), ) @@ -313,7 +317,7 @@ def _build_queue_work_units_query( # Oldest-pending first surfaces the most-stuck work for debugging. # work_unit_key is a unique tiebreaker required by sqlakeyset for # stable cursor pagination when timestamps collide. - func.min(models.QueueItem.created_at), + oldest_item_at, models.QueueItem.work_unit_key, ) )