-
Notifications
You must be signed in to change notification settings - Fork 8
Suspend destination folder on systemic errors (disk full, stale NFS) #331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # Generated by Django 6.0.3 on 2026-04-20 14:32 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('core', '0017_review_fixes'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name='dicomfolder', | ||
| name='suspended', | ||
| field=models.BooleanField(default=False, help_text='Suspended destinations skip processing (e.g. disk full).'), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,5 +1,6 @@ | ||||||
| from __future__ import annotations | ||||||
|
|
||||||
| import errno | ||||||
| import json | ||||||
| import logging | ||||||
| import secrets | ||||||
|
|
@@ -71,6 +72,21 @@ def from_dict(cls, d: dict) -> "FilterSpec": | |||||
|
|
||||||
| _MIN_SPLIT_WINDOW = timedelta(minutes=30) | ||||||
| _DELAY_BETWEEN_STUDIES = 0.5 # seconds between studies to avoid overwhelming the PACS | ||||||
| _SYSTEMIC_ERRNOS = (errno.ENOSPC, errno.ESTALE, errno.EROFS, errno.EIO) | ||||||
|
|
||||||
|
|
||||||
| def _is_systemic_error(err: Exception) -> bool: | ||||||
| """Return True if the error indicates a systemic infrastructure problem. | ||||||
|
|
||||||
| Systemic errors (disk full, stale NFS handle, read-only filesystem, I/O error) | ||||||
| affect all remaining series equally, so continuing is pointless. | ||||||
| """ | ||||||
| if isinstance(err, OSError) and err.errno in _SYSTEMIC_ERRNOS: | ||||||
| return True | ||||||
| if isinstance(err, DicomError) and "Out of disk space" in str(err): | ||||||
| return True | ||||||
| return False | ||||||
|
|
||||||
|
|
||||||
| # Deterministic pseudonyms use 14 characters. Random pseudonyms use 15 so the | ||||||
| # two modes can be distinguished by length. | ||||||
|
|
@@ -257,9 +273,19 @@ def process(self): | |||||
| "log": "Task skipped because the mass transfer app is suspended.", | ||||||
| } | ||||||
|
|
||||||
| destination_node = self.mass_task.destination | ||||||
| if ( | ||||||
| destination_node.node_type == DicomNode.NodeType.FOLDER | ||||||
| and destination_node.dicomfolder.suspended | ||||||
| ): | ||||||
| return { | ||||||
| "status": MassTransferTask.Status.WARNING, | ||||||
| "message": f"Destination '{destination_node.name}' is suspended.", | ||||||
| "log": "Task skipped because the destination folder is suspended.", | ||||||
| } | ||||||
|
|
||||||
|
Comment on lines
+276
to
+286
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the first task that encounters the disk full and suspends the destination DicomFolder is marked with status FAILURE. Then the remaining tasks in the job are still processed, hit the suspended check and are marked with status WARNING. I think they should also be marked as FAILURE since they are essentially in the same state as the first task. Also would make it easier to retry all the failed tasks since we have a UI button that does this for a job. I also wonder if it makes more sense to pause the job instead when a task encounters disk full? |
||||||
| job = self.mass_task.job | ||||||
| source_node = self.mass_task.source | ||||||
| destination_node = self.mass_task.destination | ||||||
|
|
||||||
| if source_node.node_type != DicomNode.NodeType.SERVER: | ||||||
| raise DicomError("Mass transfer source must be a DICOM server.") | ||||||
|
|
@@ -312,18 +338,62 @@ def process(self): | |||||
| grouped_volumes = self._group_volumes(volumes) | ||||||
|
|
||||||
| # Transfer: fetch series grouped by study | ||||||
| return self._transfer_grouped_series( | ||||||
| operator, | ||||||
| grouped_volumes, | ||||||
| job, | ||||||
| pseudonymizer, | ||||||
| output_base, | ||||||
| dest_operator, | ||||||
| ) | ||||||
| try: | ||||||
| return self._transfer_grouped_series( | ||||||
| operator, | ||||||
| grouped_volumes, | ||||||
| job, | ||||||
| pseudonymizer, | ||||||
| output_base, | ||||||
| dest_operator, | ||||||
| ) | ||||||
| except Exception as err: | ||||||
| if _is_systemic_error(err): | ||||||
| self._suspend_destination(destination_node, job, err) | ||||||
| return { | ||||||
| "status": MassTransferTask.Status.FAILURE, | ||||||
| "message": f"Destination suspended: {err}", | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message "Destination suspended" is misleading when the destination is a
Suggested change
|
||||||
| "log": ( | ||||||
| "Systemic error detected. Destination folder suspended.\n" | ||||||
| f"{err}" | ||||||
| ), | ||||||
| } | ||||||
| raise | ||||||
| finally: | ||||||
| if dest_operator: | ||||||
| dest_operator.close() | ||||||
|
|
||||||
| def _suspend_destination( | ||||||
| self, | ||||||
| destination_node: DicomNode, | ||||||
| job: MassTransferJob, | ||||||
| err: Exception, | ||||||
| ) -> None: | ||||||
| """Suspend a folder destination after a systemic error (e.g. disk full).""" | ||||||
| if destination_node.node_type != DicomNode.NodeType.FOLDER: | ||||||
| return | ||||||
|
|
||||||
| folder = destination_node.dicomfolder | ||||||
| folder.suspended = True | ||||||
| folder.save(update_fields=["suspended"]) | ||||||
|
|
||||||
| logger.critical( | ||||||
| "Destination folder '%s' suspended due to systemic error (job %d): %s", | ||||||
| destination_node.name, | ||||||
| job.pk, | ||||||
| err, | ||||||
| ) | ||||||
|
|
||||||
| from adit.core.utils.mail import send_mail_to_admins | ||||||
|
|
||||||
| send_mail_to_admins( | ||||||
| f"Mass transfer destination '{destination_node.name}' suspended", | ||||||
| f"Destination folder '{destination_node.name}' was automatically suspended " | ||||||
| f"due to a systemic error during mass transfer job {job.pk}.\n\n" | ||||||
| f"Error: {err}\n\n" | ||||||
| f"Please fix the underlying issue and unsuspend the folder in the admin panel.", | ||||||
| ) | ||||||
|
Comment on lines
+389
to
+395
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||||||
|
|
||||||
| def _create_pending_volumes( | ||||||
| self, | ||||||
| discovered: list[DiscoveredSeries], | ||||||
|
|
@@ -535,6 +605,8 @@ def _transfer_single_series( | |||||
| ) | ||||||
| volume.status = MassTransferVolume.Status.ERROR | ||||||
| volume.log = str(err) | ||||||
| if _is_systemic_error(err): | ||||||
| raise | ||||||
| finally: | ||||||
| if volume.status == MassTransferVolume.Status.PENDING: | ||||||
| logger.error( | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The string comparison for "Out of disk space" is case-sensitive. It is safer to use
.lower()to ensure it catches variations in the error message from different DICOM implementations or system locales.