Conversation
📝 WalkthroughWalkthroughThe changes refactor batch processing in the worker to eliminate a memory leak by introducing a modular Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Extract batch processing body into _process_batch() so all large intermediates (image_tensors, crops, batched_crops, image_detections, detector_results) go out of scope after each batch and are freed by reference counting. Replace all_detections accumulator list with a total_detections counter — the list grew unboundedly across all batches but was only used for len() in a final log message. Add torch.cuda.empty_cache() at end of each batch to free CUDA allocator caches. Add optional on_batch_complete callback to _process_job() for memory profiling. Add memory leak regression test: RSS growth across 25 batches must be < 150 MB. Test showed 88.0 MB growth (well under threshold) with the fix. Re-applied on top of main's refactored worker.py, which added: - _apply_binary_classification() helper (called from _process_batch) - ResultPoster for async result posting - processing_service_name parameter to _process_job Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
DataLoader batches may yield image_id elements as Tensor scalars. Cast to str before passing to AntennaTaskResultError(image_id=...) which expects str | None. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
84b20fa to
841fe87
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@trapdata/antenna/tests/test_memory_leak.py`:
- Around line 27-31: The helper _get_rss_mb currently assumes Linux and opens
"/proc/self/statm"; make it robust by detecting availability and raising a clear
SkipTest (or returning None) when "/proc/self/statm" or
os.sysconf("SC_PAGE_SIZE") is not available so tests don't fail on non-Linux
platforms: update the _get_rss_mb function to check
os.path.exists("/proc/self/statm") and handle exceptions around os.sysconf, and
modify any callers in the same test file (the checks around lines where
_get_rss_mb is used, e.g., the blocks referenced at 61-63) to skip the test when
_get_rss_mb indicates unavailability instead of proceeding and failing.
- Around line 51-58: The helper function _make_settings lacks a return type
annotation; add one to its signature (e.g., def _make_settings() -> MagicMock:)
and ensure MagicMock is imported (from unittest.mock import MagicMock) or use
the concrete Settings type if one exists; update the signature in the
_make_settings definition to include the chosen return type so the function is
properly type-hinted.
In `@trapdata/antenna/worker.py`:
- Around line 247-251: The length-check raises a ValueError but subsequent code
still uses zip(..., strict=True) which can re-raise and prevent emitting
AntennaTaskResultError for the batch; update the handler so when the lengths
mismatch (image_ids, reply_subjects, image_urls) you do not rely on zip(...,
strict=True) — instead detect the mismatch and immediately construct and emit an
AntennaTaskResultError for each affected image (or the whole batch) without
using strict zip, or iterate by index using a safe min/len loop or
itertools.zip_longest with explicit None checks; make the same change for the
second occurrence that references zip(..., strict=True) (the block around the
other reported lines) so mismatched payloads always result in
AntennaTaskResultError emissions rather than a secondary exception.
| def _get_rss_mb() -> float: | ||
| """Current RSS in MB, read from /proc/self/statm (Linux-only).""" | ||
| with open("/proc/self/statm") as f: | ||
| pages = int(f.read().split()[1]) # resident pages | ||
| return pages * os.sysconf("SC_PAGE_SIZE") / (1024 * 1024) |
There was a problem hiding this comment.
Guard the Linux-specific RSS probe to avoid cross-platform test failures.
/proc/self/statm is Linux-only; skip this test when unavailable.
💡 Proposed fix
`@pytest.mark.slow`
def test_rss_stable_across_batches(self):
+ if not os.path.exists("/proc/self/statm"):
+ pytest.skip("RSS regression test requires Linux /proc/self/statm")
"""RSS should not grow more than 150 MB across 25+ batches.Also applies to: 61-63
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@trapdata/antenna/tests/test_memory_leak.py` around lines 27 - 31, The helper
_get_rss_mb currently assumes Linux and opens "/proc/self/statm"; make it robust
by detecting availability and raising a clear SkipTest (or returning None) when
"/proc/self/statm" or os.sysconf("SC_PAGE_SIZE") is not available so tests don't
fail on non-Linux platforms: update the _get_rss_mb function to check
os.path.exists("/proc/self/statm") and handle exceptions around os.sysconf, and
modify any callers in the same test file (the checks around lines where
_get_rss_mb is used, e.g., the blocks referenced at 61-63) to skip the test when
_get_rss_mb indicates unavailability instead of proceeding and failing.
| def _make_settings(self): | ||
| settings = MagicMock() | ||
| settings.antenna_api_base_url = "http://testserver/api/v2" | ||
| settings.antenna_api_auth_token = "test-token" | ||
| settings.antenna_api_batch_size = 2 | ||
| settings.num_workers = 0 | ||
| settings.localization_batch_size = 2 | ||
| return settings |
There was a problem hiding this comment.
Add a return type annotation to _make_settings.
This signature is missing a return type hint.
💡 Proposed fix
- def _make_settings(self):
+ def _make_settings(self) -> MagicMock:
settings = MagicMock()
settings.antenna_api_base_url = "http://testserver/api/v2"
settings.antenna_api_auth_token = "test-token"
settings.antenna_api_batch_size = 2
settings.num_workers = 0
settings.localization_batch_size = 2
return settingsAs per coding guidelines, trapdata/**/*.py: "Use type hints in function signatures to document expected types without requiring extensive documentation."
🧰 Tools
🪛 Ruff (0.15.2)
[error] 54-54: Possible hardcoded password assigned to: "antenna_api_auth_token"
(S105)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@trapdata/antenna/tests/test_memory_leak.py` around lines 51 - 58, The helper
function _make_settings lacks a return type annotation; add one to its signature
(e.g., def _make_settings() -> MagicMock:) and ensure MagicMock is imported
(from unittest.mock import MagicMock) or use the concrete Settings type if one
exists; update the signature in the _make_settings definition to include the
chosen return type so the function is properly type-hinted.
| if len(image_ids) != len(reply_subjects) or len(image_ids) != len(image_urls): | ||
| raise ValueError( | ||
| f"Length mismatch: image_ids ({len(image_ids)}), " | ||
| f"reply_subjects ({len(reply_subjects)}), image_urls ({len(image_urls)})" | ||
| ) |
There was a problem hiding this comment.
Exception fallback can fail on the same length mismatch it is handling.
If the payload-length checks fail, the handler currently uses zip(..., strict=True) and can raise again, which prevents emitting AntennaTaskResultError results for the batch.
💡 Proposed fix
- for reply_subject, image_id in zip(reply_subjects, image_ids, strict=True):
+ max_len = max(len(reply_subjects), len(image_ids))
+ for idx in range(max_len):
+ reply_subject = reply_subjects[idx] if idx < len(reply_subjects) else None
+ image_id = image_ids[idx] if idx < len(image_ids) else None
batch_results.append(
AntennaTaskResult(
reply_subject=reply_subject,
result=AntennaTaskResultError(
error=f"Batch processing error: {e}",
image_id=str(image_id) if image_id is not None else None,
),
)
)Also applies to: 373-374
🧰 Tools
🪛 Ruff (0.15.2)
[warning] 248-251: Abstract raise to an inner function
(TRY301)
[warning] 248-251: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@trapdata/antenna/worker.py` around lines 247 - 251, The length-check raises a
ValueError but subsequent code still uses zip(..., strict=True) which can
re-raise and prevent emitting AntennaTaskResultError for the batch; update the
handler so when the lengths mismatch (image_ids, reply_subjects, image_urls) you
do not rely on zip(..., strict=True) — instead detect the mismatch and
immediately construct and emit an AntennaTaskResultError for each affected image
(or the whole batch) without using strict zip, or iterate by index using a safe
min/len loop or itertools.zip_longest with explicit None checks; make the same
change for the second occurrence that references zip(..., strict=True) (the
block around the other reported lines) so mismatched payloads always result in
AntennaTaskResultError emissions rather than a secondary exception.
|
@carlosgjs you may still be interested in this one! I was getting OOM errors in production on jobs over a few hours. No matter the worker type. |
Summary
_process_batch()so large intermediates (image tensors, crops, batched crops) go out of scope after each batchall_detectionsaccumulator list with atotal_detectionscounter — the list grew unboundedly but was only used forlen()in a log messagetorch.cuda.empty_cache()between batches to free CUDA allocator cachesMeasurements (500 tasks / 250 batches, separate processes, RTX 4090)
The remaining ~3.4 MB/batch growth appears to be from PyTorch/CUDA allocator internals and ML model class state — not from
_process_job()scope. Addressing that would require changes deeper in the ML model classes.Test plan
test_worker.pytests pass (no behavior change)Closes #118
🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests
Chores