fix(deployment_manager): harden and optimize concurrent GitHub file uploads#68
fix(deployment_manager): harden and optimize concurrent GitHub file uploads#68Copilot wants to merge 2 commits intooptimize-github-deployment-5034492779168511499from
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
🚅 Deployed to the EventRelay-pr-68 environment in EventRelay
|
…_github - Check user_response.status_code before parsing JSON to surface auth errors - Use asyncio.to_thread for file reads to avoid blocking the event loop - Re-raise asyncio.CancelledError to allow proper task cancellation - Add retry/backoff (3 retries, exponential) for 403/429 rate limit responses - Wrap Retry-After header parsing in try/except for non-integer values - Replace semaphore+gather with bounded queue-based worker pool (maxsize=20) - Start workers before filling queue for immediate consumption - Add try/finally in worker to guarantee queue.task_done() is always called - Use return_exceptions=True in gather to prevent one error cancelling all workers" Co-authored-by: groupthinking <154503486+groupthinking@users.noreply.github.com>
|
Just as a heads up, I was blocked by some firewall rules while working on your feedback. Expand below for details. Warning Firewall rules blocked me from connecting to one or more addresses (expand for details)I tried to connect to the following addresses, but was blocked by firewall rules:
If you need me to access, download, or install something from one of these locations, you can either:
|
| logger.error(f"Exhausted retries uploading {relative_path} due to rate limiting") | ||
| return None | ||
| except asyncio.CancelledError: | ||
| # Propagate cancellation so calling code can handle shutdown correctly |
| await queue.put(item) | ||
| # Sentinel values to signal each worker to stop | ||
| for _ in range(MAX_WORKERS): | ||
| await queue.put(None) |
| response = await client.put(upload_url, headers=headers, json=file_data) | ||
| if response.status_code in [200, 201]: | ||
| return str(relative_path) | ||
| elif response.status_code in [403, 429]: |
There was a problem hiding this comment.
Pull request overview
This PR hardens and improves the robustness of GitHub repository uploads in DeploymentManager._upload_to_github, primarily addressing async correctness (cancellation + non-blocking I/O) and operational behavior (rate limiting + bounded concurrency).
Changes:
- Add explicit auth status validation before parsing
/userresponse JSON. - Replace blocking file reads with
asyncio.to_thread(...), add rate-limit retry/backoff, and switch to a queue-based worker pool for bounded concurrency. - Regenerate/update
*.egg-infopackaging metadata (appears incidental to the deployment-manager change).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
src/youtube_extension/backend/deployment_manager.py |
Improves GitHub upload resilience (auth check, non-blocking reads, retry/backoff, worker pool). |
src/youtube_extension.egg-info/requires.txt |
Updates generated dependency metadata (should not be committed). |
src/youtube_extension.egg-info/SOURCES.txt |
Updates generated source manifest (should not be committed). |
src/youtube_extension.egg-info/PKG-INFO |
Updates generated long description/metadata (should not be committed; also contains conflicting runtime claims). |
Comments suppressed due to low confidence (1)
src/youtube_extension.egg-info/requires.txt:60
- PR description focuses on hardening
_upload_to_github, but this PR also includes regenerated*.egg-infometadata (dependencies + long description changes). Unless the intent is to change packaging metadata, these files should be removed from the PR to keep the change set scoped and reviewable.
google-genai>=1.0.0
google-generativeai>=0.3.0
opentelemetry-distro>=0.40b0
opentelemetry-exporter-otlp>=1.20.0
psutil>=5.9.0
ffmpeg-python>=0.2.0
qrcode[pil]>=7.0
opencv-python>=4.8.0
[deploy]
docker>=6.1.0
kubernetes>=28.1.0
helm>=3.12.0
flyctl>=0.1.0
[dev]
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
pytest-xdist>=3.5.0
pytest-mock>=3.12.0
black>=23.0.0
isort>=5.12.0
flake8>=6.0.0
mypy>=1.7.0
pre-commit>=3.5.0
ruff>=0.1.0
jupyter>=1.0.0
ipykernel>=6.25.0
notebook>=7.0.0
[docs]
mkdocs>=1.5.0
| PyYAML>=6.0.0 | ||
| requests>=2.31.0 | ||
| Pillow>=10.0.0 | ||
| google-genai>=1.0.0 | ||
| google-generativeai>=0.3.0 | ||
| opentelemetry-distro>=0.40b0 | ||
| opentelemetry-exporter-otlp>=1.20.0 |
| src/integration/supabase_db.py | ||
| src/integration/temporal_video_analysis.py | ||
| src/integration/vercel_deploy.py | ||
| src/integration/youtube_api.py |
| @documentation Document this API endpoint | ||
| @documentation Create a setup guide for new contributors | ||
| ``` | ||
| - Python >= 3.11 |
| # Retry with exponential backoff on 403/429 (rate limit / abuse detection). | ||
| # max_retries=3 means 1 initial attempt + 3 retries = 4 total requests. | ||
| max_retries = 3 | ||
| for attempt in range(max_retries + 1): | ||
| response = await client.put(upload_url, headers=headers, json=file_data) | ||
| if response.status_code in [200, 201]: | ||
| return str(relative_path) | ||
| elif response.status_code in [403, 429]: | ||
| if attempt == max_retries: | ||
| break # exhausted all retries, no point sleeping | ||
| try: | ||
| retry_after = int(response.headers.get("Retry-After", 2 ** (attempt + 1))) | ||
| except (ValueError, TypeError): | ||
| retry_after = 2 ** (attempt + 1) | ||
| logger.warning( | ||
| f"Rate limited uploading {relative_path} " | ||
| f"(attempt {attempt + 1}/{max_retries + 1}), retrying in {retry_after}s" | ||
| ) | ||
| await asyncio.sleep(retry_after) | ||
| else: | ||
| logger.warning( | ||
| f"Failed to upload {relative_path}: " | ||
| f"{response.status_code} - {response.text}" | ||
| ) | ||
| return None | ||
|
|
||
| logger.error(f"Exhausted retries uploading {relative_path} due to rate limiting") | ||
| return None | ||
| except asyncio.CancelledError: | ||
| # Propagate cancellation so calling code can handle shutdown correctly | ||
| raise | ||
| except Exception as e: | ||
| logger.warning(f"Error uploading {file_path}: {e}") | ||
| return None | ||
|
|
||
| # Collect tasks | ||
| for file_path in project_path_obj.rglob("*"): | ||
| # Skip excluded directories and dotfiles | ||
| if should_skip_path(file_path.relative_to(project_path_obj)): | ||
| continue | ||
| if file_path.is_file() and not file_path.name.startswith('.'): | ||
| relative_path = file_path.relative_to(project_path_obj) | ||
| upload_tasks.append(upload_file(client, file_path, relative_path)) | ||
|
|
||
| # Run uploads concurrently with a semaphore to avoid overwhelming the GitHub API | ||
| # Secondary rate limit for GitHub is generally not strictly documented for concurrent writes but 10-20 concurrent requests is a safe maximum. | ||
| semaphore = asyncio.Semaphore(10) | ||
|
|
||
| async def run_with_semaphore(coro): | ||
| async with semaphore: | ||
| return await coro | ||
|
|
||
| results = await asyncio.gather(*(run_with_semaphore(task) for task in upload_tasks)) | ||
| uploaded_files = [res for res in results if res is not None] | ||
| # Bounded worker-pool: only MAX_WORKERS coroutines exist at a time, keeping | ||
| # memory usage O(workers) rather than O(files) for large repositories. | ||
| # maxsize=MAX_WORKERS*2 keeps the queue bounded so producers don't outpace consumers. | ||
| MAX_WORKERS = 10 | ||
| queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_WORKERS * 2) | ||
| uploaded_files: list[str] = [] |
| # Collect file list up-front; coroutines are created lazily inside workers | ||
| files_to_upload = [ | ||
| (file_path, file_path.relative_to(project_path_obj)) | ||
| for file_path in project_path_obj.rglob("*") | ||
| if not should_skip_path(file_path.relative_to(project_path_obj)) | ||
| and file_path.is_file() | ||
| and not file_path.name.startswith('.') | ||
| ] |
| file_data = { | ||
| "message": f"Add {relative_path}", | ||
| "content": encoded_content | ||
| } | ||
|
|
||
| upload_url = f"https://api.github.com/repos/{username}/{repo_name}/contents/{relative_path}" | ||
| response = await client.put(upload_url, headers=headers, json=file_data) | ||
|
|
| # Retry with exponential backoff on 403/429 (rate limit / abuse detection). | ||
| # max_retries=3 means 1 initial attempt + 3 retries = 4 total requests. | ||
| max_retries = 3 | ||
| for attempt in range(max_retries + 1): | ||
| response = await client.put(upload_url, headers=headers, json=file_data) | ||
| if response.status_code in [200, 201]: | ||
| return str(relative_path) | ||
| elif response.status_code in [403, 429]: | ||
| if attempt == max_retries: | ||
| break # exhausted all retries, no point sleeping | ||
| try: | ||
| retry_after = int(response.headers.get("Retry-After", 2 ** (attempt + 1))) | ||
| except (ValueError, TypeError): | ||
| retry_after = 2 ** (attempt + 1) | ||
| logger.warning( | ||
| f"Rate limited uploading {relative_path} " | ||
| f"(attempt {attempt + 1}/{max_retries + 1}), retrying in {retry_after}s" | ||
| ) | ||
| await asyncio.sleep(retry_after) | ||
| else: | ||
| logger.warning( |
_upload_to_githubhad several correctness and robustness gaps introduced when migrating tohttpx: silent auth failures, blocking file I/O on the event loop, swallowedCancelledError, no rate-limit handling, and unbounded coroutine creation for large repos.Changes
user_response.status_code != 200before parsing JSON; raises a descriptive exception instead of aKeyErroronloginopen().read()→await asyncio.to_thread(file_path.read_bytes)to keep the event loop free during disk I/Oexcept asyncio.CancelledError: raisebefore the broadexcept Exception, so task cancellation (e.g. server shutdown) is not silently swallowedRetry-Afterheader with a safe fallback for non-integer values; no wasted sleep after the final attempt; exhausted retries are logged as errors (not silentlyNone)Semaphore(10) + gather(all_coroutines)with a proper producer/consumer pattern:asyncio.Queue(maxsize=20)prevents unbounded memory growth for large repostry/finallyensuresqueue.task_done()is always calledgather(..., return_exceptions=True)prevents one worker error from cancelling siblings🔒 GitHub Advanced Security automatically protects Copilot coding agent pull requests. You can protect all pull requests by enabling Advanced Security for your repositories. Learn more about Advanced Security.