feat: add queue purge endpoint for stranded work units (#799)#801
feat: add queue purge endpoint for stranded work units (#799)#801zsxh1990 wants to merge 3 commits into
Conversation
When sessions are soft-deleted, their work units remain in the queue
permanently. This adds a DELETE /v3/{workspace_id}/queue endpoint
to clean up stranded items.
Changes:
- Add src/routers/queue.py with purge endpoint
- Register queue router in main.py
Closes plastic-labs#799
Signed-off-by: zsxh1990 <zsxh1990@users.noreply.github.com>
WalkthroughThis PR introduces a new DELETE endpoint ( ChangesQueue Purge Endpoint
Sequence Diagram(s)sequenceDiagram
participant Client
participant Auth as require_auth
participant Handler as purge_stranded_work_units
participant DB as AsyncSession
participant Log as logger
Client->>Auth: DELETE /v3/{workspace_id}/queue?session_id=?&status=?
Auth->>Auth: Verify workspace authorization
Auth->>Handler: Request authorized
Handler->>DB: Build WHERE conditions (workspace, session, status)
Handler->>DB: COUNT(*) matching QueueItem rows
alt Count > 0
Handler->>DB: DELETE QueueItem rows
Handler->>DB: COMMIT transaction
Handler->>Log: Log purge summary
Handler->>Client: 200 OK {purged_count, workspace_id, session_id}
else Count = 0
Handler->>Client: 200 OK {message: "No stranded work units found", purged_count: 0}
end
Note over Handler,DB: On exception: ROLLBACK & re-raise
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/main.py (1)
24-40:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winDuplicated imports from merge error.
All router imports except
queueappear twice. Lines 33-39 duplicate lines 25-32. Remove the duplicate block:🧹 Proposed fix to remove duplicates
from src.routers import ( conclusions, keys, messages, peers, queue, sessions, webhooks, workspaces, - conclusions, - keys, - messages, - peers, - sessions, - webhooks, - workspaces, )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/main.py` around lines 24 - 40, The import block in main.py contains duplicated router imports (conclusions, keys, messages, peers, sessions, webhooks, workspaces) caused by a merge error; remove the repeated occurrences so each module is imported only once and keep the single import of queue intact—look for the duplicated names (conclusions, keys, messages, peers, sessions, webhooks, workspaces) in the from src.routers import (...) statement and delete the second repeated group.
🧹 Nitpick comments (1)
src/routers/queue.py (1)
67-70: 💤 Low valueConsider adding a Pydantic response schema.
Per coding guidelines, FastAPI routes should use Pydantic schemas for response validation. This would provide better API documentation and type safety.
Also applies to: 84-89
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/routers/queue.py` around lines 67 - 70, Create a Pydantic response schema (e.g., class PurgeResultSchema(BaseModel) with fields message: str and purged_count: int), import it in src/routers/queue.py, and update the FastAPI route functions that currently return plain dicts (the one returning {"message": "No stranded work units found", "purged_count": 0} and the other block at lines 84-89) to declare response_model=PurgeResultSchema on the route decorator and return an instance (or dict conforming to the schema). Ensure you add the import from pydantic and update both return sites to match the schema.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/routers/queue.py`:
- Around line 60-64: The current code builds count_query selecting
models.QueueItem and then calls db.execute to fetch all ORM rows into
items_to_purge and uses len(items_to_purge); change this to run a SQL COUNT
instead to avoid loading objects: replace the select(models.QueueItem) usage in
the count path with a select(func.count()).where(*conditions) (importing
sqlalchemy.func as needed), execute it via db.execute and retrieve the numeric
result (e.g., scalar or scalar_one) to set count; keep the original conditions
and only change how count_query is constructed and consumed (references:
count_query, models.QueueItem, db.execute, items_to_purge, count).
- Line 35: The route/function purge_stranded_work_units is using the
module-level db symbol (db: AsyncSession = db) from src.db instead of a FastAPI
dependency, so FastAPI won't inject an AsyncSession; replace that parameter to
use the dependency exported from src.dependencies (e.g., import the dependency
symbol from src.dependencies or annotate with Depends(get_db)) so the function
signature receives an injected AsyncSession; update the parameter declaration in
purge_stranded_work_units and its import to reference the dependency name
(get_db or db from src.dependencies) rather than the module-level src.db symbol.
---
Outside diff comments:
In `@src/main.py`:
- Around line 24-40: The import block in main.py contains duplicated router
imports (conclusions, keys, messages, peers, sessions, webhooks, workspaces)
caused by a merge error; remove the repeated occurrences so each module is
imported only once and keep the single import of queue intact—look for the
duplicated names (conclusions, keys, messages, peers, sessions, webhooks,
workspaces) in the from src.routers import (...) statement and delete the second
repeated group.
---
Nitpick comments:
In `@src/routers/queue.py`:
- Around line 67-70: Create a Pydantic response schema (e.g., class
PurgeResultSchema(BaseModel) with fields message: str and purged_count: int),
import it in src/routers/queue.py, and update the FastAPI route functions that
currently return plain dicts (the one returning {"message": "No stranded work
units found", "purged_count": 0} and the other block at lines 84-89) to declare
response_model=PurgeResultSchema on the route decorator and return an instance
(or dict conforming to the schema). Ensure you add the import from pydantic and
update both return sites to match the schema.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 392c5b6e-570d-4f59-a5ba-73326b1fc8f8
📒 Files selected for processing (2)
src/main.pysrc/routers/queue.py
- Use Depends(get_db) from src.dependencies instead of the module-level src.db symbol, matching the pattern in workspaces.py / sessions.py. The previous default value was FastAPI-invisible, so `await db.execute(...)` would have failed at runtime with no injected AsyncSession. - Move require_auth import from src.dependencies to src.security where the function actually lives (also matches sibling routers). - Replace the `select(QueueItem) → .scalars().all() → len()` pattern with a SQL `func.count()` query so we don't load every ORM row into memory just to compute a count. - Use `~models.QueueItem.processed` (ruff E712) instead of `== False`. Verified with: uv run ruff check src/routers/queue.py uv run ruff format --check src/routers/queue.py uv run basedpyright src/routers/queue.py All pass. Signed-off-by: zsxh1990 <zsxh1990@users.noreply.github.com>
Add tests/routes/test_queue_purge.py with five scenarios: - empty workspace returns purged_count=0 - status='unprocessed' (default) keeps processed rows - status='all' purges both processed and unprocessed - session_id filter narrows the purge to one session - cross-workspace isolation: purging A never touches B Reuses the existing client / db_session / sample_data fixtures from tests/conftest.py (same as test_queue_status.py). Local pytest run was skipped (no postgres/redis in this env), but basedpyright / ruff check / ruff format all pass on the new file and the patterns mirror the existing tests/routes/ files. Signed-off-by: zsxh1990 <zsxh1990@users.noreply.github.com>
|
Addressed both CodeRabbit findings from the initial review — thanks for the catch, they were real bugs. 🔴 Critical (DB session injection) — fixed in The default value Switched to the project-wide pattern used by from src import models
from src.dependencies import db # AsyncSession = Depends(get_db)
from src.security import require_auth # lives in security, not dependencies
…
async def purge_stranded_work_units(…, db: AsyncSession = db):🟠 Major (count via row load) — fixed in Replaced count_query = (
select(func.count()).select_from(models.QueueItem).where(*conditions)
)
count = (await db.execute(count_query)).scalar() or 0Tests — added in
Local pytest run was skipped here (no postgres in this env), but |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
cat src/routers/queue.py
echo "---"
cat tests/routes/test_queue_purge.py 2>/dev/null || echo "FILE NOT FOUND"Length of output: 9192 🏁 Script executed: #!/bin/bash
# Find QueueItem model definition
ast-grep --pattern 'class QueueItem($_):
$$$'
echo "---"
# Also check for unique constraints on work_unit_key
rg -n "work_unit_key" src/Length of output: 17465
🔴 DB session injection ✅ 🟠 COUNT via SQL ✅ Tests ✅ One minor nit left for consideration: the early-return on if count == 0:
return {
"message": "No stranded work units found",
"purged_count": 0,
"workspace_id": workspace_id,
"session_id": session_id,
}Not a blocker — just a shape inconsistency. Otherwise the implementation looks solid. (◕‿◕✿) |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/routes/test_queue_purge.py`:
- Around line 115-123: After calling await db_session.flush() on the Session
objects, sess_a and sess_b have auto-generated nanoid IDs assigned to their id
attributes. However, when calling _make_queue_item in both loops, the code
passes hardcoded string literals "sess_a" and "sess_b" instead of the actual
auto-generated IDs, which violates the foreign key constraint. Replace
session_id="sess_a" with session_id=sess_a.id in the first loop and
session_id="sess_b" with session_id=sess_b.id in the second loop. Additionally,
update the session_id parameter in the purge request (around line 128) to use
sess_a.id instead of a string literal, and update any assertion that checks
session_id (around line 133) to compare against sess_a.id instead of a string
literal.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 639c4145-b9c7-4e50-b8e3-e23949761121
📒 Files selected for processing (2)
src/routers/queue.pytests/routes/test_queue_purge.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/routers/queue.py
| sess_a = models.Session(workspace_name=workspace.name, name="sess_a") | ||
| sess_b = models.Session(workspace_name=workspace.name, name="sess_b") | ||
| db_session.add_all([sess_a, sess_b]) | ||
| await db_session.flush() | ||
|
|
||
| for _ in range(2): | ||
| db_session.add(_make_queue_item(workspace.name, session_id="sess_a")) | ||
| for _ in range(3): | ||
| db_session.add(_make_queue_item(workspace.name, session_id="sess_b")) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check Session model definition for ID field type and assignment
# Look for Session class definition and id field
ast-grep --pattern $'class Session($$$):
$$$
id: $$$
$$$'
# Also check for any custom ID assignment logic in Session __init__ or events
rg -nP --type=py -A5 -B2 'class Session\(' src/models.pyRepository: plastic-labs/honcho
Length of output: 2769
🏁 Script executed:
# Find _make_queue_item function definition in the test file
rg -n "_make_queue_item" tests/routes/test_queue_purge.py -B2 -A10
# Also check the full test function context around the problematic lines
sed -n '100,144p' tests/routes/test_queue_purge.pyRepository: plastic-labs/honcho
Length of output: 4581
🏁 Script executed:
# Check QueueItem model definition to verify FK relationship
rg -n "class QueueItem" src/models.py -A30 | head -50
# Also check if there are any FK constraints defined for session_id
rg -n "session_id" src/models.py -B2 -A2Repository: plastic-labs/honcho
Length of output: 1959
🏁 Script executed:
# Check if there are FK constraints or if they're deferred in the test setup
rg -n "ForeignKey" src/models.py | grep -i "ondelete\|onupdate\|defer"
# Check if the test file has any database constraints disabled or deferred
rg -n "PRAGMA\|constraint\|defer" tests/routes/test_queue_purge.py
# Look at how the QueueItem FK is defined more carefully
rg -n "ForeignKey.*sessions" src/models.py -B2 -A2Repository: plastic-labs/honcho
Length of output: 370
🏁 Script executed:
# Check if FK constraints are disabled in test configuration or conftest
rg -n "ForeignKey\|constraint" tests/conftest.py -B2 -A2
# Check test setup or any pytest fixtures that might disable FKs
find tests -name "conftest.py" -type f
# Check if there's any test configuration or database setup that disables constraints
rg -n "sqlite_synchronous\|echo\|pragma" tests/ -i
# Let's also check if Session model has a custom __init__ that might override the id
rg -n "def __init__" src/models.py -A10 | grep -A10 "class Session"
</parameter>Repository: plastic-labs/honcho
Length of output: 391
🏁 Script executed:
# Check the main conftest for database setup
cat tests/conftest.py | head -100
# Also check for any FK constraint handling
rg -n "PRAGMA\|ForeignKey\|constraint" tests/conftest.pyRepository: plastic-labs/honcho
Length of output: 3237
🏁 Script executed:
# Continue reading conftest to find database engine creation
tail -n +100 tests/conftest.py | head -200
# Look for async engine creation and any settings related to FK constraints
rg -n "create_async_engine\|echo\|pool" tests/conftest.py -A5Repository: plastic-labs/honcho
Length of output: 7022
🏁 Script executed:
# Check if this test file actually runs and passes
# Look for any test execution or CI configuration
find . -name "pytest.ini" -o -name "pyproject.toml" -o -name "tox.ini" -o -name ".github" -type d
# Check if there's any setup in the test file itself that might handle IDs
head -50 tests/routes/test_queue_purge.py
# Also check the imports and fixtures in the test file
rg -n "import\|`@pytest`" tests/routes/test_queue_purge.py | head -30Repository: plastic-labs/honcho
Length of output: 1811
🏁 Script executed:
# Check how the purge endpoint actually uses session_id
rg -n "session_id" src/routes/ -B3 -A3 | grep -i "queue\|purge" -A10 -B10
# Look for the queue purge implementation
find src/routes -name "*queue*" -type f
# Also check if there's any route handler for the queue purge endpoint
rg -n "def.*purge\|async def.*purge" src/ -A20Repository: plastic-labs/honcho
Length of output: 161
🏁 Script executed:
# Find all Python files in src directory
find src -name "*.py" -type f | head -20
# Search for the queue purge endpoint implementation
rg -n "purge\|/queue" src/ -l
# Look for the actual route definition
rg -n "def.*purge" src/ -A15Repository: plastic-labs/honcho
Length of output: 1364
🏁 Script executed:
# Get the full implementation of the purge endpoint
sed -n '30,100p' src/routers/queue.pyRepository: plastic-labs/honcho
Length of output: 2208
🏁 Script executed:
# Check if the test file is in the test suite and actually runs
# Look for pytest configuration to see if this test is skipped
rg -n "skip\|xfail" tests/routes/test_queue_purge.py
# Check if there's any database configuration that might disable FKs
rg -n "disable.*foreign\|foreign.*key\|PRAGMA" tests/ -i
# Let's verify the exact flow: when Session is created without id, does it get assigned?
# Check if there's any test that actually uses the session ID
rg -n "sess_a\.id\|sess_b\.id" tests/routes/test_queue_purge.pyRepository: plastic-labs/honcho
Length of output: 45
🏁 Script executed:
# Check if QueueItem.session_id is actually used as an FK in the code
# or if it's just stored as a string for filtering purposes
rg -n "relationship.*session\|back_populates.*session" src/models.py
# Check if there's any actual relationship between QueueItem and Session
grep -n "session" src/models.py | grep -i "queueitem\|queue"
# Let's also check the actual database schema or migrations to confirm FK exists
find . -name "*.py" -path "*/alembic/*" -type f | head -10
# Check what the actual error would be if we try to insert a QueueItem with non-existent session_id
rg -n "FOREIGN KEY\|CHECK\|CONSTRAINT" src/models.py | head -20Repository: plastic-labs/honcho
Length of output: 992
🏁 Script executed:
# Check if there's a relationship defined between QueueItem and Session
rg -n "class QueueItem" src/models.py -A50 | grep -i "relationship\|session"
# Look at the full QueueItem model to see if it has a relationship
sed -n '477,535p' src/models.py
# Check if PostgreSQL FK constraints are actually enforced in test setup
rg -n "SET\|PRAGMA\|constraint" tests/conftest.py -i | grep -i "check\|constraint\|foreign"Repository: plastic-labs/honcho
Length of output: 2451
🏁 Script executed:
# Let's verify: are Session objects created with explicit id or auto-generated?
# And does the test actually pass? Let's check if there's a way to run this test
rg -n "Session(" tests/routes/test_queue_purge.py -B2 -A2
# Check if there's any custom Session initialization that might override the id
rg -n "class Session" src/models.py -A20 | grep -i "init\|__post"
# Let's also verify the exact constraint definition one more time
rg -n "ForeignKey.*sessions.id\|OnDelete" src/models.pyRepository: plastic-labs/honcho
Length of output: 130
🏁 Script executed:
# Let's verify: are Session objects created with explicit id or auto-generated?
rg -n "models.Session\(" tests/routes/test_queue_purge.py -B2 -A2
# Check if there's any custom Session initialization that might override the id
rg -n "def __init__" src/models.py -A10
# Look at the Session model one more time to see if there's anything special about ID
sed -n '167,200p' src/models.pyRepository: plastic-labs/honcho
Length of output: 1906
Use actual auto-generated Session IDs instead of string literals.
After await db_session.flush() on line 118, sess_a.id and sess_b.id are assigned auto-generated nanoids (21 characters, not "sess_a"/"sess_b"). Lines 121 and 123 create queue items with session_id="sess_a" and session_id="sess_b" (string literals), which violate the foreign key constraint to sessions.id. Replace the session_id values with the actual auto-generated IDs:
for _ in range(2):
db_session.add(_make_queue_item(workspace.name, session_id=sess_a.id))
for _ in range(3):
db_session.add(_make_queue_item(workspace.name, session_id=sess_b.id))Also update the purge request on line 128 to use the actual session ID:
response = client.delete(
f"/v3/workspaces/{workspace.name}/queue",
params={"session_id": sess_a.id},
)And update the assertion on line 133:
assert body["session_id"] == sess_a.id🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/routes/test_queue_purge.py` around lines 115 - 123, After calling await
db_session.flush() on the Session objects, sess_a and sess_b have auto-generated
nanoid IDs assigned to their id attributes. However, when calling
_make_queue_item in both loops, the code passes hardcoded string literals
"sess_a" and "sess_b" instead of the actual auto-generated IDs, which violates
the foreign key constraint. Replace session_id="sess_a" with
session_id=sess_a.id in the first loop and session_id="sess_b" with
session_id=sess_b.id in the second loop. Additionally, update the session_id
parameter in the purge request (around line 128) to use sess_a.id instead of a
string literal, and update any assertion that checks session_id (around line
133) to compare against sess_a.id instead of a string literal.
What
Add
DELETE /v3/{workspace_id}/queueendpoint to purge stranded work units from the queue.Why
From #799: When sessions are soft-deleted via
DELETE /v3/workspaces/{workspace_id}/sessions/{session_id}, any pending work units queued for the deriver remain permanently in the queue. The deriver does not process them (the session no longer exists), and there is no API endpoint to purge or cancel these stranded work units.How
New file:
src/routers/queue.pyFeatures
session_idparameter to purge only for specific sessionunprocessed(default) orallto include processed itemsUsage
Testing
pytest tests/ -v -k "queue"Closes #799
Signed-off-by: zsxh1990 zsxh1990@users.noreply.github.com
Summary by CodeRabbit
/v3namespace.session_idandstatus(defaulting to unprocessed-only, with an option to purge all).