diff --git a/dandi/files/zarr.py b/dandi/files/zarr.py index efb1efd2d..943e36288 100644 --- a/dandi/files/zarr.py +++ b/dandi/files/zarr.py @@ -1,6 +1,7 @@ from __future__ import annotations from base64 import b64encode +from collections import Counter from collections.abc import Generator, Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed from contextlib import closing @@ -8,12 +9,14 @@ from datetime import datetime from enum import Enum import json +import math import os import os.path -import urllib.parse from pathlib import Path +import random from time import sleep from typing import Any, Optional +import urllib.parse from dandischema.models import BareAsset, DigestType from pydantic import BaseModel, ConfigDict, ValidationError @@ -745,6 +748,7 @@ def mkzarr() -> str: items_to_upload = list(items) max_retries = 5 retry_count = 0 + current_jobs = jobs or 5 # Add all items to checksum tree (only done once) for it in items_to_upload: zcc.add_leaf(Path(it.entry_path), it.size, it.digest) @@ -774,7 +778,7 @@ def mkzarr() -> str: r = client.post(f"/zarr/{zarr_id}/files/", json=uploading) # Upload files in parallel - with ThreadPoolExecutor(max_workers=jobs or 5) as executor: + with ThreadPoolExecutor(max_workers=current_jobs) as executor: futures = [ executor.submit( _upload_zarr_file, @@ -817,14 +821,22 @@ def mkzarr() -> str: # Prepare for next iteration with retry items if items_to_upload := retry_items: retry_count += 1 + current_jobs = max(1, math.ceil(current_jobs / 2)) if retry_count <= max_retries: lgr.info( - "%s: %s got 403 errors, requesting new URLs", + "%s: %s got 403 errors, requesting new URLs" + " (attempt %d/%d, workers: %d)", asset_path, pluralize(len(items_to_upload), "file"), + retry_count, + max_retries, + current_jobs, + ) + # Exponential backoff with jitter before retry + sleep( + min(2**retry_count * 5, 120) + + random.uniform(0, 5) ) - # Small delay before retry - sleep(1 * retry_count) # Check if we exhausted retries if items_to_upload: @@ -899,6 +911,18 @@ def _handle_failed_items_and_raise( # Log all failures for item, error in failed_items: lgr.error("Failed to upload %s: %s", item.filepath, error) + + # Summary diagnostics + exc_counts = Counter(type(error).__name__ for _, error in failed_items) + exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common()) + lgr.error( + "Upload failure summary: %d/%d files failed; exception types: {%s}%s", + len(failed_items), + len(futures), + exc_summary, + " (systematic — all same exception type)" if len(exc_counts) == 1 else "", + ) + # Raise the first error raise failed_items[0][1] @@ -945,6 +969,7 @@ def _upload_zarr_file( json_resp=False, retry_if=_retry_zarr_file, headers=headers, + timeout=(60, 7200), ) except requests.HTTPError as e: post_upload_size_check(item.filepath, item.size, True)