Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions adit/core/migrations/0018_dicomfolder_suspended.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 6.0.3 on 2026-04-20 14:32

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0017_review_fixes'),
]

operations = [
migrations.AddField(
model_name='dicomfolder',
name='suspended',
field=models.BooleanField(default=False, help_text='Suspended destinations skip processing (e.g. disk full).'),
),
]
4 changes: 4 additions & 0 deletions adit/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ class DicomFolder(DicomNode):
blank=True,
help_text="When to warn the admins by Email (used space in GB).",
)
suspended = models.BooleanField(
default=False,
help_text="Suspended destinations skip processing (e.g. disk full).",
)

objects: DicomNodeManager["DicomFolder"] = DicomNodeManager["DicomFolder"]()

Expand Down
90 changes: 81 additions & 9 deletions adit/mass_transfer/processors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import errno
import json
import logging
import secrets
Expand Down Expand Up @@ -71,6 +72,21 @@ def from_dict(cls, d: dict) -> "FilterSpec":

_MIN_SPLIT_WINDOW = timedelta(minutes=30)
_DELAY_BETWEEN_STUDIES = 0.5 # seconds between studies to avoid overwhelming the PACS
_SYSTEMIC_ERRNOS = (errno.ENOSPC, errno.ESTALE, errno.EROFS, errno.EIO)


def _is_systemic_error(err: Exception) -> bool:
"""Return True if the error indicates a systemic infrastructure problem.

Systemic errors (disk full, stale NFS handle, read-only filesystem, I/O error)
affect all remaining series equally, so continuing is pointless.
"""
if isinstance(err, OSError) and err.errno in _SYSTEMIC_ERRNOS:
return True
if isinstance(err, DicomError) and "Out of disk space" in str(err):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The string comparison for "Out of disk space" is case-sensitive. It is safer to use .lower() to ensure it catches variations in the error message from different DICOM implementations or system locales.

Suggested change
if isinstance(err, DicomError) and "Out of disk space" in str(err):
if isinstance(err, DicomError) and "out of disk space" in str(err).lower():

return True
return False


# Deterministic pseudonyms use 14 characters. Random pseudonyms use 15 so the
# two modes can be distinguished by length.
Expand Down Expand Up @@ -257,9 +273,19 @@ def process(self):
"log": "Task skipped because the mass transfer app is suspended.",
}

destination_node = self.mass_task.destination
if (
destination_node.node_type == DicomNode.NodeType.FOLDER
and destination_node.dicomfolder.suspended
):
return {
"status": MassTransferTask.Status.WARNING,
"message": f"Destination '{destination_node.name}' is suspended.",
"log": "Task skipped because the destination folder is suspended.",
}

Comment on lines +276 to +286
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the first task that encounters the disk full and suspends the destination DicomFolder is marked with status FAILURE. Then the remaining tasks in the job are still processed, hit the suspended check and are marked with status WARNING. I think they should also be marked as FAILURE since they are essentially in the same state as the first task. Also would make it easier to retry all the failed tasks since we have a UI button that does this for a job.

I also wonder if it makes more sense to pause the job instead when a task encounters disk full?

job = self.mass_task.job
source_node = self.mass_task.source
destination_node = self.mass_task.destination

if source_node.node_type != DicomNode.NodeType.SERVER:
raise DicomError("Mass transfer source must be a DICOM server.")
Expand Down Expand Up @@ -312,18 +338,62 @@ def process(self):
grouped_volumes = self._group_volumes(volumes)

# Transfer: fetch series grouped by study
return self._transfer_grouped_series(
operator,
grouped_volumes,
job,
pseudonymizer,
output_base,
dest_operator,
)
try:
return self._transfer_grouped_series(
operator,
grouped_volumes,
job,
pseudonymizer,
output_base,
dest_operator,
)
except Exception as err:
if _is_systemic_error(err):
self._suspend_destination(destination_node, job, err)
return {
"status": MassTransferTask.Status.FAILURE,
"message": f"Destination suspended: {err}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message "Destination suspended" is misleading when the destination is a SERVER. In that case, _suspend_destination returns early without taking any action (as intended for server nodes), but the task summary still claims the destination was suspended. The message should accurately reflect whether suspension occurred.

Suggested change
"message": f"Destination suspended: {err}",
"message": f"Destination suspended: {err}" if destination_node.node_type == DicomNode.NodeType.FOLDER else f"Systemic error: {err}",

"log": (
"Systemic error detected. Destination folder suspended.\n"
f"{err}"
),
}
raise
finally:
if dest_operator:
dest_operator.close()

def _suspend_destination(
self,
destination_node: DicomNode,
job: MassTransferJob,
err: Exception,
) -> None:
"""Suspend a folder destination after a systemic error (e.g. disk full)."""
if destination_node.node_type != DicomNode.NodeType.FOLDER:
return

folder = destination_node.dicomfolder
folder.suspended = True
folder.save(update_fields=["suspended"])

logger.critical(
"Destination folder '%s' suspended due to systemic error (job %d): %s",
destination_node.name,
job.pk,
err,
)

from adit.core.utils.mail import send_mail_to_admins

send_mail_to_admins(
f"Mass transfer destination '{destination_node.name}' suspended",
f"Destination folder '{destination_node.name}' was automatically suspended "
f"due to a systemic error during mass transfer job {job.pk}.\n\n"
f"Error: {err}\n\n"
f"Please fix the underlying issue and unsuspend the folder in the admin panel.",
)
Comment on lines +389 to +395
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The send_mail_to_admins helper (imported from adit.core.utils.mail) appears to be designed for HTML content, as seen in its other usages where it is called with the html_content keyword argument. Passing a plain text string with \n as a positional argument will likely result in an email where line breaks are not rendered in most email clients. Consider using the html_content keyword argument and replacing newlines with <br> tags, or verify if the helper supports a plain text message argument.


def _create_pending_volumes(
self,
discovered: list[DiscoveredSeries],
Expand Down Expand Up @@ -535,6 +605,8 @@ def _transfer_single_series(
)
volume.status = MassTransferVolume.Status.ERROR
volume.log = str(err)
if _is_systemic_error(err):
raise
finally:
if volume.status == MassTransferVolume.Status.PENDING:
logger.error(
Expand Down
138 changes: 138 additions & 0 deletions adit/mass_transfer/tests/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import json
from datetime import date, datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -29,6 +30,7 @@
_birth_date_range,
_destination_base_dir,
_dicom_match,
_is_systemic_error,
_parse_int,
_series_folder_name,
_study_datetime,
Expand Down Expand Up @@ -520,6 +522,7 @@ def _make_process_env(
processor.mass_task.source.dicomserver = mocker.MagicMock()
processor.mass_task.destination.node_type = DicomNode.NodeType.FOLDER
processor.mass_task.destination.dicomfolder.path = str(tmp_path / "output")
processor.mass_task.destination.dicomfolder.suspended = False

processor.mass_task.pk = 42
processor.mass_task.partition_key = "20240101"
Expand Down Expand Up @@ -1989,3 +1992,138 @@ def fake_export(op, s, path, subject_id, pseudonymizer):
# The path should contain the job-identifying folder
expected_prefix = f"adit_mass_transfer_{job.pk}_{job.created.strftime('%Y%m%d')}_researcher"
assert expected_prefix in str(export_paths[0])


# ---------------------------------------------------------------------------
# Systemic error detection tests
# ---------------------------------------------------------------------------


class TestIsSystemicError:
def test_enospc(self):
assert _is_systemic_error(OSError(errno.ENOSPC, "No space left on device"))

def test_estale(self):
assert _is_systemic_error(OSError(errno.ESTALE, "Stale file handle"))

def test_erofs(self):
assert _is_systemic_error(OSError(errno.EROFS, "Read-only file system"))

def test_eio(self):
assert _is_systemic_error(OSError(errno.EIO, "Input/output error"))

def test_disk_space_dicom_error(self):
assert _is_systemic_error(DicomError("Out of disk space while trying to save 'foo.dcm'."))

def test_regular_oserror(self):
assert not _is_systemic_error(OSError(errno.ENOENT, "No such file"))

def test_regular_exception(self):
assert not _is_systemic_error(ValueError("bad value"))

def test_regular_dicom_error(self):
assert not _is_systemic_error(DicomError("Something else went wrong"))


@pytest.mark.django_db
def test_process_stops_on_systemic_error(mocker: MockerFixture, mass_transfer_env):
"""When export raises ENOSPC, the task stops immediately and the destination is suspended."""
env = mass_transfer_env
series = [
_make_discovered(patient_id="PAT1", series_uid="series-1"),
_make_discovered(patient_id="PAT1", series_uid="series-2"),
]

processor = MassTransferTaskProcessor(env.task)
mocker.patch.object(processor, "_discover_series", return_value=series)
mocker.patch("adit.mass_transfer.processors.DicomOperator")
mocker.patch.object(
processor,
"_export_series",
side_effect=DicomError("Out of disk space while trying to save 'test.dcm'."),
)
mocker.patch("adit.core.utils.mail.send_mail_to_admins")

result = processor.process()

assert result["status"] == MassTransferTask.Status.FAILURE
assert "suspended" in result["message"].lower() or "suspended" in result["log"].lower()

env.destination.refresh_from_db()
assert env.destination.suspended is True

# Only the first series should have been attempted (second skipped due to early exit)
vols = MassTransferVolume.objects.filter(job=env.job)
error_vols = vols.filter(status=MassTransferVolume.Status.ERROR)
assert error_vols.count() == 1


@pytest.mark.django_db
def test_process_continues_on_regular_error(mocker: MockerFixture, mass_transfer_env):
"""Regular errors (non-systemic) mark the volume as ERROR but continue to the next series."""
env = mass_transfer_env
series = [
_make_discovered(patient_id="PAT1", series_uid="series-1"),
_make_discovered(patient_id="PAT1", series_uid="series-2"),
]

call_count = 0

def export_first_fails(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise DicomError("Bad DICOM data")
return (1, "", "")

processor = MassTransferTaskProcessor(env.task)
mocker.patch.object(processor, "_discover_series", return_value=series)
mocker.patch("adit.mass_transfer.processors.DicomOperator")
mocker.patch.object(processor, "_export_series", side_effect=export_first_fails)

result = processor.process()

# Should be WARNING (one succeeded, one failed) — NOT FAILURE
assert result["status"] == MassTransferTask.Status.WARNING

env.destination.refresh_from_db()
assert env.destination.suspended is False

# Both series were attempted
assert call_count == 2


@pytest.mark.django_db
def test_process_skips_when_destination_suspended(mocker: MockerFixture, mass_transfer_env):
"""When the destination folder is suspended, process() returns WARNING immediately."""
env = mass_transfer_env
env.destination.suspended = True
env.destination.save()

processor = MassTransferTaskProcessor(env.task)
discover_mock = mocker.patch.object(processor, "_discover_series")

result = processor.process()

assert result["status"] == MassTransferTask.Status.WARNING
assert "suspended" in result["message"].lower()
discover_mock.assert_not_called()


@pytest.mark.django_db
def test_process_does_not_suspend_on_regular_error(mocker: MockerFixture, mass_transfer_env):
"""Non-systemic errors do NOT suspend the destination."""
env = mass_transfer_env
series = [_make_discovered(patient_id="PAT1", series_uid="series-1")]

processor = MassTransferTaskProcessor(env.task)
mocker.patch.object(processor, "_discover_series", return_value=series)
mocker.patch("adit.mass_transfer.processors.DicomOperator")
mocker.patch.object(processor, "_export_series", side_effect=DicomError("Export failed"))

result = processor.process()

assert result["status"] == MassTransferTask.Status.FAILURE

env.destination.refresh_from_db()
assert env.destination.suspended is False
Loading