PSv2: Show online / offline status for async processing services#1146
PSv2: Show online / offline status for async processing services#1146
Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughRenamed ProcessingService monitoring fields from last_checked* to last_seen*; added is_async and mark_seen() to record heartbeats for pull-mode services; updated models, serializers, views, tasks, migrations, tests, and UI to use last_seen* and emit heartbeats on pipeline registration and job endpoints. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant JobsView as "Jobs View"
participant Pipeline as "Pipeline Service"
participant PSModel as "ProcessingService"
participant DB as "Database"
Client->>JobsView: POST /jobs/{id}/tasks or /results
JobsView->>Pipeline: validate job.pipeline_id
JobsView->>PSModel: _mark_pipeline_pull_services_seen(job)
PSModel->>PSModel: for each async service: mark_seen(live=True)
PSModel->>DB: UPDATE processing_service SET last_seen=now(), last_seen_live=TRUE
DB-->>PSModel: OK
PSModel-->>JobsView: done
JobsView-->>Client: continue tasks/results flow
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…1122) Rename fields to better reflect the semantic difference between sync and async processing service status tracking: - last_checked → last_seen - last_checked_live → last_seen_live - last_checked_latency → last_seen_latency For sync services with endpoint URLs, fields are updated by the periodic status checker. For async/pull-mode services, a new mark_seen() method is called when the service registers pipelines, recording that we heard from it. Also updates all references in serializers, pipeline queryset, views, frontend models, columns, dialog, and language strings. Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Claude <noreply@anthropic.com>
faf0a9e to
af6e267
Compare
Add structured queryset methods and a heartbeat mechanism so async
(pull-mode) processing services stay in sync with their actual liveness.
ProcessingService:
- New ProcessingServiceQuerySet with async_services() / sync_services()
methods — single canonical filter for endpoint_url null-or-empty, used
everywhere instead of ad-hoc Q expressions
- is_async property (derived from endpoint_url, no DB column)
- Docstrings reference Job.dispatch_mode ASYNC_API / SYNC_API for context
Liveness tracking:
- PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12× the worker's 5s
poll interval) — async services are considered offline after this
- check_processing_services_online task now handles both modes:
sync → active /readyz poll; async → bulk mark stale via async_services()
- _mark_pipeline_pull_services_seen() helper in jobs/views.py: single bulk
UPDATE via job.pipeline.processing_services.async_services(), called at
the top of both /jobs/{id}/tasks/ and /jobs/{id}/result/ so every worker
poll cycle refreshes last_seen without needing a separate registration
Async job cleanup (from carlosg/redisatomic):
- Rename _cleanup_job_if_needed → cleanup_async_job_if_needed and export
it so Job.cancel() can call it directly without a local import
- JobLogHandler: refresh_from_db before appending to avoid last-writer-
wins race across concurrent worker processes
- Job.logger: update existing handler's job reference instead of always
adding a new handler (process-level singleton leak fix)
Co-Authored-By: Claude <noreply@anthropic.com>
77dd024 to
2c029f8
Compare
- PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12x the worker's 5s poll interval) used by check_processing_services_online to expire stale async service heartbeats - get_status() pull-mode branch: derives server_live from staleness check, populates pipelines_online from registered pipelines, uses `not self.endpoint_url` to also handle empty-string endpoints - endpointUrl getter: returns undefined instead of stringified "null" so async services show a blank cell in the endpoint column Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
ui/src/pages/project/processing-services/processing-services-columns.tsx (1)
54-59: Consider using the translation key for consistency.The status details use a hardcoded
'Last seen 'string, while other parts of the codebase (e.g.,processing-service-details-dialog.tsxat line 81) usetranslate(STRING.FIELD_LABEL_LAST_SEEN). For i18n consistency, consider using the translation function here as well.Suggested change
renderCell: (item: ProcessingService) => ( <StatusTableCell color={item.status.color} - details={'Last seen ' + item.lastSeen} + details={translate(STRING.FIELD_LABEL_LAST_SEEN) + ' ' + item.lastSeen} label={item.status.label} /> ),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/pages/project/processing-services/processing-services-columns.tsx` around lines 54 - 59, Replace the hardcoded "Last seen " prefix in the StatusTableCell details with the i18n translation key usage; locate the StatusTableCell usage where details is built (details={'Last seen ' + item.lastSeen}) and change it to use translate(STRING.FIELD_LABEL_LAST_SEEN) concatenated or interpolated with item.lastSeen (e.g., translate(STRING.FIELD_LABEL_LAST_SEEN) + item.lastSeen) so it matches other components like processing-service-details-dialog.tsx and uses the same STRING.FIELD_LABEL_LAST_SEEN key.ami/ml/models/pipeline.py (1)
1072-1092: Field renames look correct, but note pre-existing edge case.The field references are correctly updated to
last_seen_liveandlast_seen_latency. However, there's a pre-existing edge case: if all online processing services havelast_seen_latencyasNone,processing_service_lowest_latencywill never be assigned, causing anUnboundLocalErrorat line 1088.This isn't introduced by this PR, but consider addressing it as a follow-up if async/pull-mode services may not have latency values.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/models/pipeline.py` around lines 1072 - 1092, The loop can leave processing_service_lowest_latency unassigned when every online processing_service has last_seen_latency == None; after iterating processing_services check if processing_service_lowest_latency is still None and if so pick a fallback online service (e.g., the first processing_service where last_seen_live is True) and set lowest_latency to None or a sentinel, then log that latency is unknown for the chosen service using task_logger (include pipeline_name and the selected processing_service) before returning it; update references to processing_services, processing_service_lowest_latency, last_seen_live, last_seen_latency, pipeline_name, and task_logger in this fix.ami/ml/tasks.py (1)
121-123: Uselogger.exception()to preserve traceback when service health checks fail.At line 122,
logger.error()logs only the exception message, losing the full stack trace. When debugging intermittent polling failures in production, the traceback is essential. Uselogger.exception()instead, which is designed for exception handlers and automatically includes the full context.♻️ Proposed refactor
except Exception as e: - logger.error(f"Error checking service {service}: {e}") + logger.exception("Error checking service %s", service) continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/tasks.py` around lines 121 - 123, Replace the logger.error call inside the exception handler that catches service health-check failures so the full traceback is preserved: in the except Exception as e: block where logger.error(f"Error checking service {service}: {e}") is used (in ami/ml/tasks.py, referencing logger and the service variable), call logger.exception with a clear message (e.g., "Error checking service {service}") so the stack trace is included, then continue as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 49-52: The bulk update on
job.pipeline.processing_services.async_services().update(...) incorrectly
updates services across all projects; restrict the update to services linked to
the current job's project by adding an M2M filter on the ProcessingService
projects relation (e.g., replace async_services().update(...) with
async_services().filter(projects=job.project).update(...) or
.filter(projects__in=[job.project]).update(...) as appropriate), ensuring you
reference the existing job.pipeline and job.project objects so only services
associated with this job's project are marked last_seen/last_seen_live.
In `@ui/src/data-services/models/processing-service.ts`:
- Around line 53-54: The endpointUrl getter currently returns
this._processingService.endpoint_url using nullish coalescing, which leaves
empty string values intact; update the endpointUrl getter to normalize both null
and "" (and optionally whitespace-only strings) to undefined by reading const v
= this._processingService.endpoint_url and returning undefined when v is null or
v.trim() === "" (otherwise return v) so the UI treats empty endpoints as
async/pull-mode; modify the endpointUrl getter implementation accordingly.
---
Nitpick comments:
In `@ami/ml/models/pipeline.py`:
- Around line 1072-1092: The loop can leave processing_service_lowest_latency
unassigned when every online processing_service has last_seen_latency == None;
after iterating processing_services check if processing_service_lowest_latency
is still None and if so pick a fallback online service (e.g., the first
processing_service where last_seen_live is True) and set lowest_latency to None
or a sentinel, then log that latency is unknown for the chosen service using
task_logger (include pipeline_name and the selected processing_service) before
returning it; update references to processing_services,
processing_service_lowest_latency, last_seen_live, last_seen_latency,
pipeline_name, and task_logger in this fix.
In `@ami/ml/tasks.py`:
- Around line 121-123: Replace the logger.error call inside the exception
handler that catches service health-check failures so the full traceback is
preserved: in the except Exception as e: block where logger.error(f"Error
checking service {service}: {e}") is used (in ami/ml/tasks.py, referencing
logger and the service variable), call logger.exception with a clear message
(e.g., "Error checking service {service}") so the stack trace is included, then
continue as before.
In `@ui/src/pages/project/processing-services/processing-services-columns.tsx`:
- Around line 54-59: Replace the hardcoded "Last seen " prefix in the
StatusTableCell details with the i18n translation key usage; locate the
StatusTableCell usage where details is built (details={'Last seen ' +
item.lastSeen}) and change it to use translate(STRING.FIELD_LABEL_LAST_SEEN)
concatenated or interpolated with item.lastSeen (e.g.,
translate(STRING.FIELD_LABEL_LAST_SEEN) + item.lastSeen) so it matches other
components like processing-service-details-dialog.tsx and uses the same
STRING.FIELD_LABEL_LAST_SEEN key.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
.agents/AGENTS.md.agents/DATABASE_SCHEMA.mdami/jobs/views.pyami/ml/migrations/0027_rename_last_checked_to_last_seen.pyami/ml/models/pipeline.pyami/ml/models/processing_service.pyami/ml/serializers.pyami/ml/tasks.pyami/ml/tests.pyami/ml/views.pyui/src/data-services/models/pipeline.tsui/src/data-services/models/processing-service.tsui/src/pages/processing-service-details/processing-service-details-dialog.tsxui/src/pages/project/pipelines/pipelines-columns.tsxui/src/pages/project/processing-services/processing-services-columns.tsxui/src/utils/language.ts
There was a problem hiding this comment.
Pull request overview
Renames ProcessingService.last_checked* fields to last_seen* across backend and UI, and adds heartbeat-style liveness tracking for pull-mode (async/no-endpoint) processing services.
Changes:
- Backend: rename model fields + add
mark_seen()and async/sync query helpers; update polling task + endpoints to updatelast_seen. - Frontend: rename model accessors/columns/labels from “last checked” to “last seen”.
- Data migration: add Django
RenameFieldmigration to preserve existing data.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
ami/ml/models/processing_service.py |
Renames fields, adds mark_seen(), and changes get_status() semantics for pull-mode services. |
ami/ml/tasks.py |
Updates periodic liveness checker to poll sync services + mark stale async services offline. |
ami/ml/views.py |
Marks service seen after successful async pipeline registration. |
ami/jobs/views.py |
Adds heartbeat updates on async worker task-fetch/result-submit endpoints. |
ami/ml/serializers.py |
Renames serialized fields to last_seen/last_seen_live. |
ami/ml/models/pipeline.py |
Updates queryset filters and comments to use last_seen*. |
ami/ml/migrations/0027_rename_last_checked_to_last_seen.py |
Data-preserving field renames via RenameField. |
ami/ml/tests.py |
Adds unit/integration tests for mark_seen() and pipeline registration. |
ui/src/data-services/models/processing-service.ts |
Renames getters to lastSeen* and makes endpointUrl nullable-safe. |
ui/src/data-services/models/pipeline.ts |
Renames computed fields to processingServicesOnlineLastSeen. |
ui/src/pages/project/processing-services/processing-services-columns.tsx |
Updates status details display to use lastSeen. |
ui/src/pages/project/pipelines/pipelines-columns.tsx |
Renames “last checked” column to “last seen”. |
ui/src/pages/processing-service-details/processing-service-details-dialog.tsx |
Updates label/value bindings to lastSeen. |
ui/src/utils/language.ts |
Adds FIELD_LABEL_LAST_SEEN string and English translation. |
.agents/DATABASE_SCHEMA.md / .agents/AGENTS.md |
Updates documentation references to last_seen_live and pull-mode heartbeat. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ui/src/pages/project/processing-services/processing-services-columns.tsx
Outdated
Show resolved
Hide resolved
- Fix ImportError: import PROCESSING_SERVICE_LAST_SEEN_MAX directly from ami.ml.models.processing_service (not re-exported from ami.ml.models) - Fix null last_seen causing epoch timestamp in processingServicesOnlineLastSeen getter — filter out null values before Math.max - Fix "Last seen undefined" rendered in status column when lastSeen is undefined Co-Authored-By: Claude <noreply@anthropic.com>
Code reviewFound 2 issues; fixed 3 (including the critical one). Responding to existing CodeRabbit/Copilot comments inline below. Fixed in commit 671da1a:
Needs your input:
Responding to other comments:
🤖 Generated with Claude Code If this code review was useful, please react with 👍. Otherwise, react with 👎. |
Move the async service stale-check to the top of check_processing_services_online so it always runs, even if a slow sync service check hits the time limit. Reduce the per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — if a sync service is starting up it will recover on the next cycle. Raise soft_time_limit/time_limit accordingly to give the sync loop room to complete (worst case ~30s per service with retries). Co-Authored-By: Claude <noreply@anthropic.com>
Async services now derive liveness from heartbeats rather than returning an error message. Update assertions: server_live=False (not None) when no heartbeat has been received, and remove error message checks. Co-Authored-By: Claude <noreply@anthropic.com>
Filter async services by project when marking them as seen, preventing cross-project contamination when a pipeline is shared across projects. Clarify in the docstring that this still marks all async services on the pipeline within the project, not the individual caller, until application-token auth (PR #1117) is available. Co-Authored-By: Claude <noreply@anthropic.com>
Add is_async to ProcessingServiceSerializer and ProcessingServiceNestedSerializer so the frontend can distinguish pull-mode from push-mode services. Also normalize empty endpoint_url strings to undefined in the FE model for consistency with the backend. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
ui/src/data-services/models/processing-service.ts (1)
86-88: Consider guarding against nulllast_seen_live.The backend model defines
last_seen_live = models.BooleanField(null=True), meaning it can benull. The return typebooleandoesn't reflect this possibility. While the currentstatusgetter handles falsy values gracefully (treatingnullasOFFLINE), direct consumers oflastSeenLivemay expect a definite boolean.♻️ Optional: coerce to boolean
get lastSeenLive(): boolean { - return this._processingService.last_seen_live + return this._processingService.last_seen_live ?? false }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/data-services/models/processing-service.ts` around lines 86 - 88, The getter lastSeenLive currently declares a boolean return but directly returns _processingService.last_seen_live which can be null; change the getter signature to return boolean | null and keep returning this._processingService.last_seen_live (or, if you prefer to coerce, return !!this._processingService.last_seen_live and keep boolean) so consumers and the type system reflect the backend's nullable models.BooleanField; update any callers if you choose the nullable variant.ami/ml/tests.py (1)
192-204: Consider mocking the network call for deterministic tests.This test makes a real network request to a nonexistent host, which depends on DNS resolution and socket timeouts. While it works, mocking
requestsor the underlying HTTP call would make this test faster and more reliable in CI environments with restricted network access.Example mock approach
from unittest.mock import patch def test_get_status_updates_last_seen_for_sync_service(self): """Test that get_status() updates last_seen fields for sync services.""" service = ProcessingService.objects.create( name="Sync Service", endpoint_url="http://nonexistent-host:9999" ) service.projects.add(self.project) with patch('requests.Session.get') as mock_get: mock_get.side_effect = Exception("Connection refused") service.get_status(timeout=1) service.refresh_from_db() # assertions...🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/tests.py` around lines 192 - 204, Replace the real network call in test_get_status_updates_last_seen_for_sync_service with a deterministic mock: patch requests.Session.get (or the exact HTTP call used by ProcessingService.get_status) to raise an exception or return a controlled response, then call service.get_status(timeout=1), refresh_from_db and assert last_seen, last_seen_live and last_seen_latency; this ensures the test for ProcessingService.get_status runs fast and reliably without performing DNS/socket I/O.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/ml/tests.py`:
- Around line 192-204: Replace the real network call in
test_get_status_updates_last_seen_for_sync_service with a deterministic mock:
patch requests.Session.get (or the exact HTTP call used by
ProcessingService.get_status) to raise an exception or return a controlled
response, then call service.get_status(timeout=1), refresh_from_db and assert
last_seen, last_seen_live and last_seen_latency; this ensures the test for
ProcessingService.get_status runs fast and reliably without performing
DNS/socket I/O.
In `@ui/src/data-services/models/processing-service.ts`:
- Around line 86-88: The getter lastSeenLive currently declares a boolean return
but directly returns _processingService.last_seen_live which can be null; change
the getter signature to return boolean | null and keep returning
this._processingService.last_seen_live (or, if you prefer to coerce, return
!!this._processingService.last_seen_live and keep boolean) so consumers and the
type system reflect the backend's nullable models.BooleanField; update any
callers if you choose the nullable variant.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
ami/jobs/views.pyami/ml/serializers.pyami/ml/tasks.pyami/ml/tests.pyui/src/data-services/models/pipeline.tsui/src/data-services/models/processing-service.tsui/src/pages/project/processing-services/processing-services-columns.tsx
🚧 Files skipped from review as they are similar to previous changes (3)
- ui/src/pages/project/processing-services/processing-services-columns.tsx
- ami/jobs/views.py
- ui/src/data-services/models/pipeline.ts
I agree that a boolean with a default of Null is confusing. I think I chose that to represent "not checked" or "unknown". So the frontend could show blank instead of Offline. But I think False would also be a fine simplification. |
…le copies - Run async stale-check before the sync loop so it always executes regardless of how long sync checks take - Reduce per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — a slow or unreachable service just waits for the next 5-minute cycle - Add expires=240s so copies queued during a worker outage are discarded when the worker returns; only the most recent firing runs Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/ml/tasks.py (1)
98-101: Minor: Comment arithmetic may be slightly off.The comment states worst case is "8 + 2 + 8 + 4 + 8 = 30s" but with
retries=3andbackoff_factor=2, there are 4 total attempts (initial + 3 retries). The actual sum shown equals 30s, but if there are 4 attempts at 8s each plus backoff delays (0s, 2s, 4s), worst case would be closer to 38s. This doesn't affect functionality—just a documentation nit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/tasks.py` around lines 98 - 101, The inline comment for _BEAT_STATUS_TIMEOUT miscalculates the worst-case total; update the comment to state there are 4 attempts (initial + 3 retries) so the total is 4 * 8s = 32s plus backoff delays (0s + 2s + 4s = 6s) => ~38s; edit the comment above _BEAT_STATUS_TIMEOUT to reflect this corrected arithmetic and briefly note the calculation (attempts * timeout + backoff delays).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/ml/tasks.py`:
- Around line 98-101: The inline comment for _BEAT_STATUS_TIMEOUT miscalculates
the worst-case total; update the comment to state there are 4 attempts (initial
+ 3 retries) so the total is 4 * 8s = 32s plus backoff delays (0s + 2s + 4s =
6s) => ~38s; edit the comment above _BEAT_STATUS_TIMEOUT to reflect this
corrected arithmetic and briefly note the calculation (attempts * timeout +
backoff delays).
Summary
Closes #1122
last_checked→last_seen,last_checked_live→last_seen_live,last_checked_latency→last_seen_latencyon theProcessingServicemodelmark_seen()method for async/pull-mode services to record liveness when they register pipelinesPOST /api/v2/projects/{id}/pipelines/) to callmark_seen(live=True)after successful registrationRenameField(data-preserving, no data loss)The naming better reflects the semantic difference between:
endpoint_url): Antenna actively checks the service via the periodic Celery Beat task, updatinglast_seen/last_seen_livefrom the health check responseendpoint_url): Workers report in by registering pipelines, and we record when we last heard from them viamark_seen()Test plan
mark_seen()method (live=True and live=False)last_seen/last_seen_liveTestProcessingServiceAPItests pass with renamed fieldsTestPipelineandTestProjectPipelinesAPItests passSummary by CodeRabbit
New Features
Refactor
UI
Tests