diff --git a/adit/core/migrations/0018_dicomfolder_suspended.py b/adit/core/migrations/0018_dicomfolder_suspended.py new file mode 100644 index 00000000..d6c42814 --- /dev/null +++ b/adit/core/migrations/0018_dicomfolder_suspended.py @@ -0,0 +1,18 @@ +# Generated by Django 6.0.3 on 2026-04-20 14:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0017_review_fixes'), + ] + + operations = [ + migrations.AddField( + model_name='dicomfolder', + name='suspended', + field=models.BooleanField(default=False, help_text='Suspended destinations skip processing (e.g. disk full).'), + ), + ] diff --git a/adit/core/models.py b/adit/core/models.py index 1a1e88d4..6bf4aeaa 100644 --- a/adit/core/models.py +++ b/adit/core/models.py @@ -177,6 +177,10 @@ class DicomFolder(DicomNode): blank=True, help_text="When to warn the admins by Email (used space in GB).", ) + suspended = models.BooleanField( + default=False, + help_text="Suspended destinations skip processing (e.g. disk full).", + ) objects: DicomNodeManager["DicomFolder"] = DicomNodeManager["DicomFolder"]() diff --git a/adit/mass_transfer/processors.py b/adit/mass_transfer/processors.py index d2830090..317e29d6 100644 --- a/adit/mass_transfer/processors.py +++ b/adit/mass_transfer/processors.py @@ -1,5 +1,6 @@ from __future__ import annotations +import errno import json import logging import secrets @@ -71,6 +72,21 @@ def from_dict(cls, d: dict) -> "FilterSpec": _MIN_SPLIT_WINDOW = timedelta(minutes=30) _DELAY_BETWEEN_STUDIES = 0.5 # seconds between studies to avoid overwhelming the PACS +_SYSTEMIC_ERRNOS = (errno.ENOSPC, errno.ESTALE, errno.EROFS, errno.EIO) + + +def _is_systemic_error(err: Exception) -> bool: + """Return True if the error indicates a systemic infrastructure problem. + + Systemic errors (disk full, stale NFS handle, read-only filesystem, I/O error) + affect all remaining series equally, so continuing is pointless. + """ + if isinstance(err, OSError) and err.errno in _SYSTEMIC_ERRNOS: + return True + if isinstance(err, DicomError) and "Out of disk space" in str(err): + return True + return False + # Deterministic pseudonyms use 14 characters. Random pseudonyms use 15 so the # two modes can be distinguished by length. @@ -257,9 +273,19 @@ def process(self): "log": "Task skipped because the mass transfer app is suspended.", } + destination_node = self.mass_task.destination + if ( + destination_node.node_type == DicomNode.NodeType.FOLDER + and destination_node.dicomfolder.suspended + ): + return { + "status": MassTransferTask.Status.WARNING, + "message": f"Destination '{destination_node.name}' is suspended.", + "log": "Task skipped because the destination folder is suspended.", + } + job = self.mass_task.job source_node = self.mass_task.source - destination_node = self.mass_task.destination if source_node.node_type != DicomNode.NodeType.SERVER: raise DicomError("Mass transfer source must be a DICOM server.") @@ -312,18 +338,62 @@ def process(self): grouped_volumes = self._group_volumes(volumes) # Transfer: fetch series grouped by study - return self._transfer_grouped_series( - operator, - grouped_volumes, - job, - pseudonymizer, - output_base, - dest_operator, - ) + try: + return self._transfer_grouped_series( + operator, + grouped_volumes, + job, + pseudonymizer, + output_base, + dest_operator, + ) + except Exception as err: + if _is_systemic_error(err): + self._suspend_destination(destination_node, job, err) + return { + "status": MassTransferTask.Status.FAILURE, + "message": f"Destination suspended: {err}", + "log": ( + "Systemic error detected. Destination folder suspended.\n" + f"{err}" + ), + } + raise finally: if dest_operator: dest_operator.close() + def _suspend_destination( + self, + destination_node: DicomNode, + job: MassTransferJob, + err: Exception, + ) -> None: + """Suspend a folder destination after a systemic error (e.g. disk full).""" + if destination_node.node_type != DicomNode.NodeType.FOLDER: + return + + folder = destination_node.dicomfolder + folder.suspended = True + folder.save(update_fields=["suspended"]) + + logger.critical( + "Destination folder '%s' suspended due to systemic error (job %d): %s", + destination_node.name, + job.pk, + err, + ) + + from adit.core.utils.mail import send_mail_to_admins + + send_mail_to_admins( + f"Mass transfer destination '{destination_node.name}' suspended", + f"Destination folder '{destination_node.name}' was automatically suspended " + f"due to a systemic error during mass transfer job {job.pk}.\n\n" + f"Error: {err}\n\n" + f"Please fix the underlying issue and unsuspend the folder in the admin panel.", + ) + def _create_pending_volumes( self, discovered: list[DiscoveredSeries], @@ -535,6 +605,8 @@ def _transfer_single_series( ) volume.status = MassTransferVolume.Status.ERROR volume.log = str(err) + if _is_systemic_error(err): + raise finally: if volume.status == MassTransferVolume.Status.PENDING: logger.error( diff --git a/adit/mass_transfer/tests/test_processor.py b/adit/mass_transfer/tests/test_processor.py index f0063e88..5046319c 100644 --- a/adit/mass_transfer/tests/test_processor.py +++ b/adit/mass_transfer/tests/test_processor.py @@ -1,3 +1,4 @@ +import errno import json from datetime import date, datetime, timedelta from pathlib import Path @@ -29,6 +30,7 @@ _birth_date_range, _destination_base_dir, _dicom_match, + _is_systemic_error, _parse_int, _series_folder_name, _study_datetime, @@ -520,6 +522,7 @@ def _make_process_env( processor.mass_task.source.dicomserver = mocker.MagicMock() processor.mass_task.destination.node_type = DicomNode.NodeType.FOLDER processor.mass_task.destination.dicomfolder.path = str(tmp_path / "output") + processor.mass_task.destination.dicomfolder.suspended = False processor.mass_task.pk = 42 processor.mass_task.partition_key = "20240101" @@ -1989,3 +1992,138 @@ def fake_export(op, s, path, subject_id, pseudonymizer): # The path should contain the job-identifying folder expected_prefix = f"adit_mass_transfer_{job.pk}_{job.created.strftime('%Y%m%d')}_researcher" assert expected_prefix in str(export_paths[0]) + + +# --------------------------------------------------------------------------- +# Systemic error detection tests +# --------------------------------------------------------------------------- + + +class TestIsSystemicError: + def test_enospc(self): + assert _is_systemic_error(OSError(errno.ENOSPC, "No space left on device")) + + def test_estale(self): + assert _is_systemic_error(OSError(errno.ESTALE, "Stale file handle")) + + def test_erofs(self): + assert _is_systemic_error(OSError(errno.EROFS, "Read-only file system")) + + def test_eio(self): + assert _is_systemic_error(OSError(errno.EIO, "Input/output error")) + + def test_disk_space_dicom_error(self): + assert _is_systemic_error(DicomError("Out of disk space while trying to save 'foo.dcm'.")) + + def test_regular_oserror(self): + assert not _is_systemic_error(OSError(errno.ENOENT, "No such file")) + + def test_regular_exception(self): + assert not _is_systemic_error(ValueError("bad value")) + + def test_regular_dicom_error(self): + assert not _is_systemic_error(DicomError("Something else went wrong")) + + +@pytest.mark.django_db +def test_process_stops_on_systemic_error(mocker: MockerFixture, mass_transfer_env): + """When export raises ENOSPC, the task stops immediately and the destination is suspended.""" + env = mass_transfer_env + series = [ + _make_discovered(patient_id="PAT1", series_uid="series-1"), + _make_discovered(patient_id="PAT1", series_uid="series-2"), + ] + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object( + processor, + "_export_series", + side_effect=DicomError("Out of disk space while trying to save 'test.dcm'."), + ) + mocker.patch("adit.core.utils.mail.send_mail_to_admins") + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.FAILURE + assert "suspended" in result["message"].lower() or "suspended" in result["log"].lower() + + env.destination.refresh_from_db() + assert env.destination.suspended is True + + # Only the first series should have been attempted (second skipped due to early exit) + vols = MassTransferVolume.objects.filter(job=env.job) + error_vols = vols.filter(status=MassTransferVolume.Status.ERROR) + assert error_vols.count() == 1 + + +@pytest.mark.django_db +def test_process_continues_on_regular_error(mocker: MockerFixture, mass_transfer_env): + """Regular errors (non-systemic) mark the volume as ERROR but continue to the next series.""" + env = mass_transfer_env + series = [ + _make_discovered(patient_id="PAT1", series_uid="series-1"), + _make_discovered(patient_id="PAT1", series_uid="series-2"), + ] + + call_count = 0 + + def export_first_fails(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise DicomError("Bad DICOM data") + return (1, "", "") + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object(processor, "_export_series", side_effect=export_first_fails) + + result = processor.process() + + # Should be WARNING (one succeeded, one failed) — NOT FAILURE + assert result["status"] == MassTransferTask.Status.WARNING + + env.destination.refresh_from_db() + assert env.destination.suspended is False + + # Both series were attempted + assert call_count == 2 + + +@pytest.mark.django_db +def test_process_skips_when_destination_suspended(mocker: MockerFixture, mass_transfer_env): + """When the destination folder is suspended, process() returns WARNING immediately.""" + env = mass_transfer_env + env.destination.suspended = True + env.destination.save() + + processor = MassTransferTaskProcessor(env.task) + discover_mock = mocker.patch.object(processor, "_discover_series") + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.WARNING + assert "suspended" in result["message"].lower() + discover_mock.assert_not_called() + + +@pytest.mark.django_db +def test_process_does_not_suspend_on_regular_error(mocker: MockerFixture, mass_transfer_env): + """Non-systemic errors do NOT suspend the destination.""" + env = mass_transfer_env + series = [_make_discovered(patient_id="PAT1", series_uid="series-1")] + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object(processor, "_export_series", side_effect=DicomError("Export failed")) + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.FAILURE + + env.destination.refresh_from_db() + assert env.destination.suspended is False