From 6e19df6287478cefecd24c965d1c91eb54f8eca3 Mon Sep 17 00:00:00 2001 From: lucius1274 Date: Mon, 16 Mar 2026 09:07:06 +0000 Subject: [PATCH 1/3] Added fix for task kill --- adit/core/models.py | 6 ++++- adit/core/tasks.py | 7 ++++++ adit/core/tests/test_models.py | 15 +++++++++++ adit/core/tests/test_tasks.py | 46 ++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 1 deletion(-) diff --git a/adit/core/models.py b/adit/core/models.py index 75e0d11ab..3bca6cd8a 100644 --- a/adit/core/models.py +++ b/adit/core/models.py @@ -273,7 +273,7 @@ def post_process(self, suppress_email=False) -> bool: self.save() return False - if self.status == DicomJob.Status.CANCELING: + if self.status in [DicomJob.Status.CANCELING, DicomJob.Status.CANCELED]: self.status = DicomJob.Status.CANCELED self.save() return False @@ -282,6 +282,7 @@ def post_process(self, suppress_email=False) -> bool: has_success = self.tasks.filter(status=DicomTask.Status.SUCCESS).exists() has_warning = self.tasks.filter(status=DicomTask.Status.WARNING).exists() has_failure = self.tasks.filter(status=DicomTask.Status.FAILURE).exists() + has_canceled = self.tasks.filter(status=DicomTask.Status.CANCELED).exists() if has_success and not has_warning and not has_failure: self.status = DicomJob.Status.SUCCESS @@ -298,6 +299,9 @@ def post_process(self, suppress_email=False) -> bool: elif has_failure: self.status = DicomJob.Status.FAILURE self.message = "All tasks failed." + elif has_canceled: + self.status = DicomJob.Status.CANCELED + self.message = "All tasks were canceled." else: # at least one of success, warnings or failures must be > 0 raise AssertionError(f"Invalid task status list of {self}.") diff --git a/adit/core/tasks.py b/adit/core/tasks.py index de7cb0d5d..354614263 100644 --- a/adit/core/tasks.py +++ b/adit/core/tasks.py @@ -95,6 +95,7 @@ def process_dicom_task(context: JobContext, model_label: str, task_id: int): dicom_task.save() logger.info(f"Processing of {dicom_task} started.") + # sleep(30) @concurrent.process(timeout=settings.DICOM_TASK_PROCESS_TIMEOUT, daemon=True) def _process_dicom_task(model_label: str, task_id: int) -> ProcessingResult: @@ -126,6 +127,12 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None: dicom_task.status = DicomTask.Status.FAILURE ensure_db_connection() + except futures.CancelledError: + logger.info("Processing of %s was canceled.", dicom_task) + dicom_task.status = DicomTask.Status.CANCELED + dicom_task.message = "Task was canceled." + ensure_db_connection() + except RetriableDicomError as err: logger.exception("Retriable error occurred during %s.", dicom_task) diff --git a/adit/core/tests/test_models.py b/adit/core/tests/test_models.py index 39f16b04d..1c9b008e9 100644 --- a/adit/core/tests/test_models.py +++ b/adit/core/tests/test_models.py @@ -168,6 +168,21 @@ def test_job_post_process_canceling_status(self): assert job.status == DicomJob.Status.CANCELED assert job.end is None + @pytest.mark.django_db + def test_job_post_process_all_tasks_canceled(self): + job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING) + + ExampleTransferTaskFactory.create(job=job, status=DicomTask.Status.CANCELED) + ExampleTransferTaskFactory.create(job=job, status=DicomTask.Status.CANCELED) + + result = job.post_process() + job.refresh_from_db() + + assert result is True + assert job.status == DicomJob.Status.CANCELED + assert job.message == "All tasks were canceled." + assert job.end is not None + @pytest.mark.django_db @time_machine.travel("2025-01-15 14:30:00+00:00") def test_job_timezone_correctness(self): diff --git a/adit/core/tests/test_tasks.py b/adit/core/tests/test_tasks.py index bd87c3bbc..787da19ca 100644 --- a/adit/core/tests/test_tasks.py +++ b/adit/core/tests/test_tasks.py @@ -1,10 +1,15 @@ +from time import sleep +from typing import cast + import pytest from adit_radis_shared.common.utils.testing_helpers import run_worker_once +from procrastinate import JobContext from pytest_mock import MockerFixture from adit.core.errors import RetriableDicomError from adit.core.models import DicomJob, DicomTask from adit.core.processors import DicomTaskProcessor +from adit.core.tasks import process_dicom_task from adit.core.types import ProcessingResult from adit.core.utils.model_utils import get_model_label @@ -157,6 +162,47 @@ def process(self): assert dicom_task.attempts == 1 +@pytest.mark.django_db(transaction=True) +def test_process_dicom_task_that_gets_canceled_via_abort_context(mocker: MockerFixture, settings): + settings.DICOM_TASK_CANCELED_MONITOR_INTERVAL = 0.01 + settings.DICOM_TASK_PROCESS_TIMEOUT = 2 + + dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING) + dicom_task = ExampleTransferTaskFactory.create( + status=DicomTask.Status.PENDING, + job=dicom_job, + ) + + def process(self): + sleep(10) + return { + "status": DicomTask.Status.SUCCESS, + "message": "Success!", + "log": "", + } + + class FakeContext: + def __init__(self): + self.job = object() + + def should_abort(self): + return True + + mocker.patch.object(ExampleProcessor, "process", process) + + model_label = get_model_label(ExampleTransferTask) + process_dicom_task(cast(JobContext, FakeContext()), model_label, dicom_task.pk) + + dicom_job.refresh_from_db() + assert dicom_job.status == DicomJob.Status.CANCELED + assert dicom_job.message == "All tasks were canceled." + + dicom_task.refresh_from_db() + assert dicom_task.status == DicomTask.Status.CANCELED + assert dicom_task.message == "Task was canceled." + assert dicom_task.attempts == 1 + + @pytest.mark.django_db(transaction=True) def test_process_dicom_task_transitions_to_failure_after_max_retries(mocker: MockerFixture): """Test that a task correctly transitions to FAILURE after exhausting all retries. From 1563c0c0f875505bfb842665b5637bb0075f6495 Mon Sep 17 00:00:00 2001 From: lucius1274 Date: Mon, 16 Mar 2026 09:23:38 +0000 Subject: [PATCH 2/3] removed old comment --- adit/core/tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/adit/core/tasks.py b/adit/core/tasks.py index 354614263..876f6ecc2 100644 --- a/adit/core/tasks.py +++ b/adit/core/tasks.py @@ -95,7 +95,6 @@ def process_dicom_task(context: JobContext, model_label: str, task_id: int): dicom_task.save() logger.info(f"Processing of {dicom_task} started.") - # sleep(30) @concurrent.process(timeout=settings.DICOM_TASK_PROCESS_TIMEOUT, daemon=True) def _process_dicom_task(model_label: str, task_id: int) -> ProcessingResult: From fcff70fea8adb1bf56dc1f77b55a1f5057662b8b Mon Sep 17 00:00:00 2001 From: lucius1274 Date: Mon, 16 Mar 2026 09:31:50 +0000 Subject: [PATCH 3/3] made database write only on change --- adit/core/models.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/adit/core/models.py b/adit/core/models.py index 3bca6cd8a..c3fdf9420 100644 --- a/adit/core/models.py +++ b/adit/core/models.py @@ -274,8 +274,9 @@ def post_process(self, suppress_email=False) -> bool: return False if self.status in [DicomJob.Status.CANCELING, DicomJob.Status.CANCELED]: - self.status = DicomJob.Status.CANCELED - self.save() + if self.status == DicomJob.Status.CANCELING: + self.status = DicomJob.Status.CANCELED + self.save() return False # Job is finished and we evaluate its final status