Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions adit/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,17 @@ def post_process(self, suppress_email=False) -> bool:
self.save()
return False

if self.status == DicomJob.Status.CANCELING:
self.status = DicomJob.Status.CANCELED
self.save()
if self.status in [DicomJob.Status.CANCELING, DicomJob.Status.CANCELED]:
if self.status == DicomJob.Status.CANCELING:
self.status = DicomJob.Status.CANCELED
self.save()
return False
Comment on lines +276 to 280
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This condition correctly includes DicomJob.Status.CANCELED, but it leads to an unnecessary database write if the job status is already CANCELED. You can avoid this by checking the status before saving.

Suggested change
if self.status in [DicomJob.Status.CANCELING, DicomJob.Status.CANCELED]:
self.status = DicomJob.Status.CANCELED
self.save()
return False
if self.status in (DicomJob.Status.CANCELING, DicomJob.Status.CANCELED):
if self.status == DicomJob.Status.CANCELING:
self.status = DicomJob.Status.CANCELED
self.save()
return False


# Job is finished and we evaluate its final status
has_success = self.tasks.filter(status=DicomTask.Status.SUCCESS).exists()
has_warning = self.tasks.filter(status=DicomTask.Status.WARNING).exists()
has_failure = self.tasks.filter(status=DicomTask.Status.FAILURE).exists()
has_canceled = self.tasks.filter(status=DicomTask.Status.CANCELED).exists()

if has_success and not has_warning and not has_failure:
self.status = DicomJob.Status.SUCCESS
Expand All @@ -298,6 +300,9 @@ def post_process(self, suppress_email=False) -> bool:
elif has_failure:
self.status = DicomJob.Status.FAILURE
self.message = "All tasks failed."
elif has_canceled:
self.status = DicomJob.Status.CANCELED
self.message = "All tasks were canceled."
else:
# at least one of success, warnings or failures must be > 0
raise AssertionError(f"Invalid task status list of {self}.")
Expand Down
6 changes: 6 additions & 0 deletions adit/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None:
dicom_task.status = DicomTask.Status.FAILURE
ensure_db_connection()

except futures.CancelledError:
logger.info("Processing of %s was canceled.", dicom_task)
dicom_task.status = DicomTask.Status.CANCELED
dicom_task.message = "Task was canceled."
ensure_db_connection()

except RetriableDicomError as err:
logger.exception("Retriable error occurred during %s.", dicom_task)

Expand Down
15 changes: 15 additions & 0 deletions adit/core/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ def test_job_post_process_canceling_status(self):
assert job.status == DicomJob.Status.CANCELED
assert job.end is None

@pytest.mark.django_db
def test_job_post_process_all_tasks_canceled(self):
job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)

ExampleTransferTaskFactory.create(job=job, status=DicomTask.Status.CANCELED)
ExampleTransferTaskFactory.create(job=job, status=DicomTask.Status.CANCELED)

result = job.post_process()
job.refresh_from_db()

assert result is True
assert job.status == DicomJob.Status.CANCELED
assert job.message == "All tasks were canceled."
assert job.end is not None

@pytest.mark.django_db
@time_machine.travel("2025-01-15 14:30:00+00:00")
def test_job_timezone_correctness(self):
Expand Down
46 changes: 46 additions & 0 deletions adit/core/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from time import sleep
from typing import cast

import pytest
from adit_radis_shared.common.utils.testing_helpers import run_worker_once
from procrastinate import JobContext
from pytest_mock import MockerFixture

from adit.core.errors import RetriableDicomError
from adit.core.models import DicomJob, DicomTask
from adit.core.processors import DicomTaskProcessor
from adit.core.tasks import process_dicom_task
from adit.core.types import ProcessingResult
from adit.core.utils.model_utils import get_model_label

Expand Down Expand Up @@ -157,6 +162,47 @@ def process(self):
assert dicom_task.attempts == 1


@pytest.mark.django_db(transaction=True)
def test_process_dicom_task_that_gets_canceled_via_abort_context(mocker: MockerFixture, settings):
settings.DICOM_TASK_CANCELED_MONITOR_INTERVAL = 0.01
settings.DICOM_TASK_PROCESS_TIMEOUT = 2

dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)
dicom_task = ExampleTransferTaskFactory.create(
status=DicomTask.Status.PENDING,
job=dicom_job,
)

def process(self):
sleep(10)
return {
"status": DicomTask.Status.SUCCESS,
"message": "Success!",
"log": "",
}

class FakeContext:
def __init__(self):
self.job = object()

def should_abort(self):
return True

mocker.patch.object(ExampleProcessor, "process", process)

model_label = get_model_label(ExampleTransferTask)
process_dicom_task(cast(JobContext, FakeContext()), model_label, dicom_task.pk)

dicom_job.refresh_from_db()
assert dicom_job.status == DicomJob.Status.CANCELED
assert dicom_job.message == "All tasks were canceled."

dicom_task.refresh_from_db()
assert dicom_task.status == DicomTask.Status.CANCELED
assert dicom_task.message == "Task was canceled."
assert dicom_task.attempts == 1


@pytest.mark.django_db(transaction=True)
def test_process_dicom_task_transitions_to_failure_after_max_retries(mocker: MockerFixture):
"""Test that a task correctly transitions to FAILURE after exhausting all retries.
Expand Down
Loading