Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ All API routes follow the pattern: `/v3/{resource}/{id}/{action}`. Most "list/se
- **Peers**: Create, list, update, chat (dialectic), messages, representation
- **Sessions**: Create, list, update, delete, clone, manage peers, get context
- **Messages**: Create (batch up to 100), upload (file), list, get, update
- **Queue introspection**: `/queue/status` aggregate counts (incl. `pending_stalled_work_units` / `pending_ready_work_units` split for representation batches below `DERIVER_REPRESENTATION_BATCH_MAX_TOKENS`) and `/queue/work-units` per-work-unit detail (cursor-paginated via `fastapi-pagination.CursorPage`)
- **Conclusions**: Create, list, query (semantic search), delete — the API-facing name for observations stored in `(observer, observed)` collections
- **Keys**: Create scoped JWTs
- **Webhooks**: Register endpoint, list, delete, test
Expand Down
146 changes: 146 additions & 0 deletions docs/v3/documentation/features/advanced/queue-status.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ class QueueStatus(BaseModel):
pending_work_units: int
"""Work units waiting to be processed"""

pending_stalled_work_units: int
"""Pending representation work units waiting to accumulate enough tokens to
hit DERIVER_REPRESENTATION_BATCH_MAX_TOKENS. Always 0 when
DERIVER_FLUSH_ENABLED is true."""

pending_ready_work_units: int
"""Pending work units eligible to be claimed: non-representation task types,
plus representation work units whose pending tokens are at/above the batch
threshold (or when flush is enabled).
pending_stalled_work_units + pending_ready_work_units == pending_work_units."""

total_work_units: int
"""Total work units"""

Expand All @@ -55,12 +66,19 @@ Promise<{
completedWorkUnits: number
inProgressWorkUnits: number
pendingWorkUnits: number
pendingStalledWorkUnits: number
pendingReadyWorkUnits: number
sessions?: Record<string, QueueStatus.Sessions>
}>

```
</CodeGroup>

The `pending_stalled_work_units` / `pending_ready_work_units` split tells you
*why* pending work isn't moving: stalled items are sitting below the deriver's
batch token threshold waiting for more messages, while ready items are eligible
to be picked up by a worker. The two always sum to `pending_work_units`.

Whenever a message is sent it will generate several tasks. These could
be tasks such as generating insights, cleaning up a representation, summarizing
a conversation etc. These tasks are defined based on who is sending the
Expand Down Expand Up @@ -147,7 +165,135 @@ async queueStatus(
completedWorkUnits: number
inProgressWorkUnits: number
pendingWorkUnits: number
pendingStalledWorkUnits: number
pendingReadyWorkUnits: number
sessions?: Record<string, QueueStatus.Sessions>
}>
```
</CodeGroup>

## Inspecting individual work units

When the aggregate counts tell you "something is stalled" but not *which*
work units are stalled, use `queue_work_units` / `queueWorkUnits`. It returns
one row per unprocessed work unit with the token totals, in-progress flag,
and threshold classification needed to debug "why isn't this advancing?".

<Note>
The two endpoints count at different granularities. `queue/status` counts
individual queue items (one per message awaiting processing), while
`queue/work-units` returns one row per work unit — items sharing a
`work_unit_key` collapse into a single row. So a status response reporting
`pending_work_units: 3` can correspond to a single row (`total: 1`) here when
those three items belong to the same work unit.
Comment on lines +183 to +188

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

This granularity note contradicts the queue-status contract.

The backend schema text for pending_work_units, pending_stalled_work_units, and pending_ready_work_units describes them as work-unit counts, not per-message queue-item counts. With the current wording, the example pending_work_units: 3 vs total: 1 reads as if /queue/status and /queue/work-units are counting different entities entirely.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/v3/documentation/features/advanced/queue-status.mdx` around lines 183 -
188, The granularity note in the queue status docs conflicts with the contract
for pending_work_units, pending_stalled_work_units, and
pending_ready_work_units. Update the explanation in queue-status.mdx to match
the backend schema and the behavior of the queue/status and queue/work-units
endpoints by clarifying that both are work-unit based, and adjust the example
wording so it no longer implies queue/status counts individual messages while
queue/work-units counts rows.

</Note>

<CodeGroup>
```python Python
from honcho import Honcho
honcho = Honcho()

page = honcho.queue_work_units()

# Inspect just the current page
for wu in page.items:
print(wu.work_unit_key, wu.pending_tokens, wu.hit_threshold)

# Threshold context from the envelope
print(page.representation_batch_max_tokens, page.flush_enabled)

# Walk the full queue (auto-fetches subsequent pages)
for wu in page:
...
```

```typescript typescript
import { Honcho } from '@honcho-ai/sdk';

const honcho = new Honcho({});

const page = await honcho.queueWorkUnits();

// Current page only
for (const wu of page.items) {
console.log(wu.workUnitKey, wu.pendingTokens, wu.hitThreshold);
}

// Threshold context from the envelope
console.log(page.representationBatchMaxTokens, page.flushEnabled);

// Walk the full queue (auto-fetches subsequent pages)
for await (const wu of page) {
// ...
}
```
</CodeGroup>

### Cursor pagination

The endpoint is **cursor-paginated**, not offset-paginated. The queue mutates
rapidly (workers claim and complete items continuously), and offset pagination
would skip rows that were processed between fetches. Cursor pagination uses
opaque tokens (`next_page` / `previous_page`) that are stable across these
mutations.

Pass `cursor` and optionally `size` to fetch a specific page, or use the
helpers on the returned page object to navigate.

<CodeGroup>
```python Python
# Explicit cursor navigation
page = honcho.queue_work_units(size=50)
while page.has_next_page():
page = page.get_next_page()
process(page.items)

# Or pass a cursor token directly
page = honcho.queue_work_units(cursor="<token-from-previous-page>")
```

```typescript typescript
let page = await honcho.queueWorkUnits({ size: 50 });
while (page.hasNextPage) {
const next = await page.getNextPage();
if (!next) break;
page = next;
process(page.items);
}
```
</CodeGroup>

### Per-work-unit fields

| Field | Type | Description |
|---|---|---|
| `work_unit_key` | `str` | Full key, e.g. `representation:ws_abc:sess_xyz:peer_observed` |
| `task_type` | `str` | `"representation"`, `"summary"`, or `"dream"` |
| `session_id` | `str \| null` | FK to the session row; null for task types without a session |
| `session_name` | `str \| null` | Human-readable session name |
| `observer` | `str \| null` | Observer peer (from queue payload) |
| `observed` | `str \| null` | Observed peer (from queue payload) |
| `pending_items` | `int` | Unprocessed queue items in this work unit |
| `pending_tokens` | `int` | Sum of `messages.token_count` across the pending items |
| `tokens_until_threshold` | `int` | Tokens still needed to fire the batch (0 for non-representation task types or when flush is enabled) |
| `hit_threshold` | `bool` | True if eligible to be claimed; false means stalled |
| `in_progress` | `bool` | True if a deriver worker has claimed this work unit |
| `oldest_item_at` | `datetime` | Oldest pending queue-item creation timestamp |
| `newest_item_at` | `datetime` | Newest pending queue-item creation timestamp |

Each page also carries the deriver's threshold configuration so you can
interpret per-row classification without re-querying server settings:

| Field | Type | Description |
|---|---|---|
| `representation_batch_max_tokens` | `int` | `DERIVER_REPRESENTATION_BATCH_MAX_TOKENS` at request time |
| `flush_enabled` | `bool` | `DERIVER_FLUSH_ENABLED` at request time |

<Warning>
**Cursor pagination is stable, but the underlying data is not.** Items can be
processed between pages, and new items can be enqueued. Each page is a
snapshot of what the server saw at request time, but pages may be
inconsistent with each other under concurrent processing. Use `page.items`
for a stable per-page view; iterate the page object only when an approximate
walk is acceptable.
</Warning>
Loading
Loading