Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions dandi/files/zarr.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from __future__ import annotations

from base64 import b64encode
from collections import Counter
from collections.abc import Generator, Iterator
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from contextlib import closing
from dataclasses import dataclass, field, replace
from datetime import datetime
from enum import Enum
import json
import math
import os
import os.path
import urllib.parse
from pathlib import Path
import random
from time import sleep
from typing import Any, Optional
import urllib.parse

from dandischema.models import BareAsset, DigestType
from pydantic import BaseModel, ConfigDict, ValidationError
Expand Down Expand Up @@ -745,6 +748,7 @@ def mkzarr() -> str:
items_to_upload = list(items)
max_retries = 5
retry_count = 0
current_jobs = jobs or 5
# Add all items to checksum tree (only done once)
for it in items_to_upload:
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)
Expand Down Expand Up @@ -774,7 +778,7 @@ def mkzarr() -> str:
r = client.post(f"/zarr/{zarr_id}/files/", json=uploading)

# Upload files in parallel
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
with ThreadPoolExecutor(max_workers=current_jobs) as executor:
futures = [
executor.submit(
_upload_zarr_file,
Expand Down Expand Up @@ -817,14 +821,22 @@ def mkzarr() -> str:
# Prepare for next iteration with retry items
if items_to_upload := retry_items:
retry_count += 1
current_jobs = max(1, math.ceil(current_jobs / 2))
if retry_count <= max_retries:
lgr.info(
"%s: %s got 403 errors, requesting new URLs",
"%s: %s got 403 errors, requesting new URLs"
" (attempt %d/%d, workers: %d)",
asset_path,
pluralize(len(items_to_upload), "file"),
retry_count,
max_retries,
current_jobs,
)
# Exponential backoff with jitter before retry
sleep(
min(2**retry_count * 5, 120)
+ random.uniform(0, 5)
)
# Small delay before retry
sleep(1 * retry_count)

# Check if we exhausted retries
if items_to_upload:
Expand Down Expand Up @@ -899,6 +911,18 @@ def _handle_failed_items_and_raise(
# Log all failures
for item, error in failed_items:
lgr.error("Failed to upload %s: %s", item.filepath, error)

# Summary diagnostics
exc_counts = Counter(type(error).__name__ for _, error in failed_items)
exc_summary = ", ".join(f"{k}: {v}" for k, v in exc_counts.most_common())
lgr.error(
"Upload failure summary: %d/%d files failed; exception types: {%s}%s",
len(failed_items),
len(futures),
exc_summary,
" (systematic — all same exception type)" if len(exc_counts) == 1 else "",
)

# Raise the first error
raise failed_items[0][1]

Expand Down Expand Up @@ -945,6 +969,7 @@ def _upload_zarr_file(
json_resp=False,
retry_if=_retry_zarr_file,
headers=headers,
timeout=(60, 7200),
)
except requests.HTTPError as e:
post_upload_size_check(item.filepath, item.size, True)
Expand Down
Loading