-
Notifications
You must be signed in to change notification settings - Fork 7
Add staging environment for testing worker scaling on Docker Swarm #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bf3a45c
ea4d781
1a28368
687dc29
4a5a0f1
b5fb532
c747e22
61a1b98
0efaa85
1086dbf
281303d
f4925c2
deccb11
a3c7413
8f5444a
253a1d3
268f078
3c028e8
8662207
31bd1e3
d96a036
ff1d71f
9a86256
3e53c06
ccd8bd9
e9370fd
9e61dd5
05c5a94
d6ac90e
3cdd321
a23f260
1dc05e1
402b904
7362344
fd5c99b
f2ce957
71c0a01
b91af72
70ca1ee
b339598
77cdac8
58c4135
7c2dfcf
2b43ab4
5e63ec3
8d301f1
ac9c2a1
982a1bd
04d7aaa
8f9afd5
d430772
b9920b9
9c8a4b1
934c8c5
9fe4214
d81b4d1
02d59be
04184b9
14d9ac9
d787e72
62c8ae6
474d8d1
578b4ad
f7cab69
f7c97aa
82993fd
16051f1
28d65ab
d9e8a0c
d228522
c7a0c61
832bf6d
dcd9057
6600340
10367c8
c3e516b
67ce954
1609647
1d7a4b0
3866eea
cb38b3c
7a43d1b
c14192c
f01872a
6764c29
d2b81f6
21d8f95
bfd9943
0e4eaff
41bf368
a1c4003
5a100f8
2156528
4a67599
cb5c0b3
21178bd
d20cee2
1a8a8d2
a70ff7f
3452466
ad6818b
4c5e242
d38eebf
6e3a191
409053b
2eb85d5
50adf2c
962904d
9e167e2
c93cf1f
8a6e9d5
1279ecf
a541702
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,5 +76,6 @@ | |
| "HTML (EEx)", | ||
| "HTML (Eex)", | ||
| "plist" | ||
| ] | ||
| ], | ||
| "containers.containers.label": "ContainerName" | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # Generated by Django 5.2.8 on 2026-03-09 12:00 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('core', '0015_delete_queuedtask'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name='dicomserver', | ||
| name='max_search_results', | ||
| field=models.PositiveIntegerField(default=200), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| # Generated by Django 6.0.3 on 2026-03-29 13:52 | ||
|
|
||
| import django.core.validators | ||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ("core", "0016_add_max_search_results_to_dicomserver"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AlterField( | ||
| model_name="dicomserver", | ||
| name="max_search_results", | ||
| field=models.PositiveIntegerField( | ||
| default=200, validators=[django.core.validators.MinValueValidator(1)] | ||
| ), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,32 +53,30 @@ def backup_db(*args, **kwargs): | |
| call_command("dbbackup", "--clean", "-v 2") | ||
|
|
||
|
|
||
| @app.task( | ||
| queue="dicom", | ||
| pass_context=True, | ||
| # TODO: Increase the priority slightly when it will be retried | ||
| # See https://github.com/procrastinate-org/procrastinate/issues/1096 | ||
| # | ||
| # Two-level retry strategy: | ||
| # 1. Network layer (Stamina): Fast retries for transient failures (5-10 attempts) | ||
| # - Applied at DIMSE/DICOMweb connector level | ||
| # - Handles: connection timeouts, HTTP 503, temporary server unavailability | ||
| # 2. Task layer (Procrastinate): Slow retries for complete operation failures | ||
| # - Applied here (max_attempts below) | ||
| # - Only triggers after network-level retries are exhausted | ||
| # - Retries the entire task | ||
| retry=RetryStrategy( | ||
| max_attempts=settings.DICOM_TASK_MAX_ATTEMPTS, | ||
| wait=settings.DICOM_TASK_RETRY_WAIT, | ||
| linear_wait=settings.DICOM_TASK_LINEAR_WAIT, | ||
| retry_exceptions={RetriableDicomError}, | ||
| ), | ||
| DICOM_TASK_RETRY_STRATEGY = RetryStrategy( | ||
| max_attempts=settings.DICOM_TASK_MAX_ATTEMPTS, | ||
| wait=settings.DICOM_TASK_RETRY_WAIT, | ||
| linear_wait=settings.DICOM_TASK_LINEAR_WAIT, | ||
| retry_exceptions={RetriableDicomError}, | ||
| ) | ||
| def process_dicom_task(context: JobContext, model_label: str, task_id: int): | ||
|
|
||
|
|
||
| def _run_dicom_task( | ||
| context: JobContext, | ||
| model_label: str, | ||
| task_id: int, | ||
| *, | ||
| process_timeout: int | None = None, | ||
| ): | ||
| assert context.job | ||
|
|
||
| dicom_task = get_dicom_task(model_label, task_id) | ||
| assert dicom_task.status == DicomTask.Status.PENDING | ||
| # The assertion status == PENDING assumed that tasks always arrive fresh, | ||
| # but in reality a retried task can arrive in a half-finished state. | ||
| # A task may still be IN_PROGRESS if the worker was killed before the | ||
| # finally block could update its status. Accept both PENDING and | ||
| # IN_PROGRESS so the retry can proceed. | ||
| assert dicom_task.status in (DicomTask.Status.PENDING, DicomTask.Status.IN_PROGRESS) | ||
|
|
||
| # When the first DICOM task of a job is processed then the status of the | ||
| # job switches from PENDING to IN_PROGRESS | ||
|
|
@@ -96,7 +94,7 @@ def process_dicom_task(context: JobContext, model_label: str, task_id: int): | |
|
|
||
| logger.info(f"Processing of {dicom_task} started.") | ||
|
|
||
| @concurrent.process(timeout=settings.DICOM_TASK_PROCESS_TIMEOUT, daemon=True) | ||
| @concurrent.process(timeout=process_timeout, daemon=True) | ||
| def _process_dicom_task(model_label: str, task_id: int) -> ProcessingResult: | ||
| dicom_task = get_dicom_task(model_label, task_id) | ||
| processor = get_dicom_processor(dicom_task) | ||
|
|
@@ -121,11 +119,18 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None: | |
| dicom_task.log = result["log"] | ||
| ensure_db_connection() | ||
|
|
||
| except futures.CancelledError: | ||
| dicom_task.status = DicomTask.Status.CANCELED | ||
| dicom_task.message = "Task was canceled." | ||
| ensure_db_connection() | ||
|
|
||
|
|
||
| except futures.TimeoutError: | ||
| dicom_task.message = "Task was aborted due to timeout." | ||
| dicom_task.status = DicomTask.Status.FAILURE | ||
| ensure_db_connection() | ||
|
|
||
|
|
||
|
Comment on lines
+122
to
133
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Canceled tasks now bypass the cleanup hook. This is the only terminal failure path in 🛠️ Suggested fix except futures.CancelledError:
dicom_task.status = DicomTask.Status.CANCELED
dicom_task.message = "Task was canceled."
ensure_db_connection()
+ dicom_task.cleanup_on_failure()🤖 Prompt for AI Agents |
||
| except RetriableDicomError as err: | ||
| logger.exception("Retriable error occurred during %s.", dicom_task) | ||
|
|
||
|
|
@@ -146,6 +151,7 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None: | |
| dicom_task.message = str(err) | ||
|
|
||
| ensure_db_connection() | ||
|
|
||
| raise err | ||
|
|
||
| except Exception as err: | ||
|
|
@@ -162,6 +168,7 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None: | |
|
|
||
| ensure_db_connection() | ||
|
|
||
|
|
||
| finally: | ||
| dicom_task.end = timezone.now() | ||
| dicom_task.save() | ||
|
|
@@ -176,3 +183,25 @@ def _monitor_task(context: JobContext, future: ProcessFuture) -> None: | |
|
|
||
| # TODO: https://github.com/procrastinate-org/procrastinate/issues/1106 | ||
| db.close_old_connections() | ||
|
|
||
|
|
||
| @app.task( | ||
| queue="dicom", | ||
| pass_context=True, | ||
| # TODO: Increase the priority slightly when it will be retried | ||
| # See https://github.com/procrastinate-org/procrastinate/issues/1096 | ||
| # | ||
| # Two-level retry strategy: | ||
| # 1. Network layer (Stamina): Fast retries for transient failures (5-10 attempts) | ||
| # - Applied at DIMSE/DICOMweb connector level | ||
| # - Handles: connection timeouts, HTTP 503, temporary server unavailability | ||
| # 2. Task layer (Procrastinate): Slow retries for complete operation failures | ||
| # - Applied here (max_attempts below) | ||
| # - Only triggers after network-level retries are exhausted | ||
| # - Retries the entire task | ||
| retry=DICOM_TASK_RETRY_STRATEGY, | ||
| ) | ||
| def process_dicom_task(context: JobContext, model_label: str, task_id: int): | ||
| _run_dicom_task( | ||
| context, model_label, task_id, process_timeout=settings.DICOM_TASK_PROCESS_TIMEOUT | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.