Add parallel multi-tenant operations for faster bulk processing#154
Add parallel multi-tenant operations for faster bulk processing#154
Conversation
There was a problem hiding this comment.
Orca Security Scan Summary
| Status | Check | Issues by priority | |
|---|---|---|---|
| Infrastructure as Code | View in Orca | ||
| SAST | View in Orca | ||
| Secrets | View in Orca | ||
| Vulnerabilities | View in Orca |
There was a problem hiding this comment.
Pull request overview
This PR adds parallel multi-tenant operations to significantly improve performance for bulk operations on collections with many tenants. The changes convert sequential tenant processing loops into parallel execution using ThreadPoolExecutor, and replace N individual tenant API calls with single batch operations.
Changes:
- Converted
delete_tenantsandupdate_tenantsto use batch API calls (single call with a list of tenants) instead of N individual calls - Added parallel tenant processing for
create_data,update_data, anddelete_datausing ThreadPoolExecutor - Added
--parallel_workersCLI option (default:min(32, cpu_count + 4)) to control parallelism, with automatic per-tenantconcurrent_requestsscaling to prevent cluster overload
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| weaviate_cli/managers/tenant_manager.py | Converted delete_tenants and update_tenants to use batch API calls with lists instead of loops |
| weaviate_cli/managers/data_manager.py | Added parallel tenant processing with ThreadPoolExecutor for create_data, update_data, and delete_data; includes concurrent_requests scaling and error aggregation |
| weaviate_cli/defaults.py | Added parallel_workers default (MAX_WORKERS) to CreateDataDefaults, UpdateDataDefaults, and DeleteDataDefaults |
| weaviate_cli/commands/create.py | Added --parallel_workers CLI option for create data command |
| weaviate_cli/commands/update.py | Added --parallel_workers CLI option for update data command |
| weaviate_cli/commands/delete.py | Added --parallel_workers CLI option for delete data command |
| test/unittests/test_managers/test_tenant_manager.py | Updated test assertions to expect batch API calls (lists) instead of individual calls |
| test/unittests/test_managers/test_data_manager.py | Added 11 new tests covering parallel processing, error collection, concurrent_requests scaling, and edge cases |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Multi-tenant operations were processed sequentially, causing major bottlenecks with collections with hundreds or thousands of tenants. Changes: - tenant_manager: replace per-tenant remove/update loops with single batch API calls (1 call instead of N) - data_manager: use ThreadPoolExecutor to process multiple tenants concurrently for create/update/delete data operations - data_manager: when parallel_workers > 1, concurrent_requests per tenant is scaled down to keep total connections bounded - add --parallel_workers CLI option to create/update/delete data commands (default: MAX_WORKERS = min(32, cpu_count + 4)) - update defaults.py with parallel_workers field for data operations Test changes: - Updated test_tenant_manager.py assertions to expect batch API calls (list argument) instead of per-tenant calls; this reflects the new batch behavior which is the explicit goal of this feature - Added 11 new tests in test_data_manager.py covering parallel tenant processing, sequential fallback, error collection, and concurrent request scaling Closes #153 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ror handling - Suppress per-tenant progress messages in parallel mode (update/delete) to avoid interleaved output; messages only print when parallel_workers <= 1 - Make sequential error handling consistent with parallel: collect all tenant errors instead of failing fast on the first one, then raise a combined exception (matches parallel path behaviour) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds four parallel-processing tests for create_data that mirror the TestUpdateDataParallel and TestDeleteDataParallel classes, covering: all-tenants-in-parallel, sequential fallback, error collection, and non-MT collection handling. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
__delete_data always returns a non-negative count or raises; it never returns -1. Drop the dead sentinel branches from both the parallel and sequential paths in delete_data and rely on exceptions for error propagation instead, simplifying the control flow. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Introduce actual_workers = min(parallel_workers, len(tenants), concurrent_requests) and use it as both the ThreadPoolExecutor max_workers and the divisor for per-tenant concurrent_requests: - Over-throttle fix: 2 tenants + 32 workers previously divided by 32, leaving each thread with 1 request instead of half the budget. Now divides by min(32, 2, …) = 2, fully utilising the budget. - Over-budget fix: 32 workers + 4 concurrent_requests previously floored to 1/thread × 32 threads = 32 total. Capping actual_workers at concurrent_requests keeps total in-flight ≤ concurrent_requests. Update the concurrent_requests scaling test to reflect the corrected expected value (test was asserting the old over-throttled behaviour). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
f6f1775 to
7ae65a1
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Validate --parallel_workers with click.IntRange(min=1) in create, delete, update commands - Fix nondeterministic collection return in parallel create_data mode - Guard empty tenants_to_update before calling SDK update - Fix test assertion to match list-based remove() call Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
e2e test run with parallel multi-tenant ingestion -> https://github.com/weaviate/weaviate-e2e-tests/actions/runs/23441661618 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- update_data and delete_data now use min(parallel_workers, len(tenants)) to avoid over-allocating threads, matching create_data's existing pattern Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
weaviate_cli/managers/data_manager.py:986
create_datareturnscollection, but in sequential mode this becomes the last tenant-scoped collection (col.with_tenant(...)) while in parallel mode it stays as the basecol. This makes the return value dependent on execution mode and tenant ordering. Consider returning the base collection consistently (e.g., alwayscol) or changing the method to returnNone(since call sites don’t appear to use the return value) to avoid an accidental API contract change for downstream callers.
collection = col
if _parallel_mode:
_lock = threading.Lock()
_errors: List[str] = []
with ThreadPoolExecutor(max_workers=actual_workers) as executor:
future_to_tenant = {
executor.submit(_ingest_one_tenant, t): t for t in tenants
}
for future in as_completed(future_to_tenant):
t = future_to_tenant[future]
try:
inserted, _coll = future.result()
with _lock:
total_inserted += inserted
except Exception as exc:
_errors.append(f"Tenant '{t}': {exc}")
if _errors:
raise Exception(
"Errors during parallel data ingestion:\n" + "\n".join(_errors)
)
else:
for tenant in tenants:
inserted, collection = _ingest_one_tenant(tenant)
total_inserted += inserted
if json_output:
click.echo(
json.dumps(
{
"status": "success",
"collection": col.name,
"objects_inserted": total_inserted,
},
indent=2,
)
)
return collection
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if len(tenants) > 1 and parallel_workers > 1: | ||
| actual_workers = min(parallel_workers, len(tenants)) | ||
| _lock = threading.Lock() | ||
| _errors: List[str] = [] | ||
| with ThreadPoolExecutor(max_workers=actual_workers) as executor: | ||
| future_to_tenant = { | ||
| executor.submit(_update_one_tenant, t): t for t in tenants | ||
| } | ||
| for future in as_completed(future_to_tenant): | ||
| t = future_to_tenant[future] | ||
| try: | ||
| ret = future.result() | ||
| if ret == -1: | ||
| _errors.append( | ||
| f"Failed to update objects in class '{col.name}' for tenant '{t}'" | ||
| ) | ||
| else: | ||
| with _lock: | ||
| total_updated += ret | ||
| except Exception as exc: | ||
| _errors.append(f"Tenant '{t}': {exc}") | ||
| if _errors: | ||
| raise Exception( | ||
| "Errors during parallel data update:\n" + "\n".join(_errors) | ||
| ) |
There was a problem hiding this comment.
update_data can execute __update_data concurrently across tenants. When verbose=True, __update_data emits multiple print(...) progress lines; in parallel mode these will interleave across threads and can make the CLI output confusing. Consider either disabling verbose progress output when parallel_workers > 1, or adding a shared output lock / per-tenant buffered logging so each tenant’s progress is readable.
| if len(tenants) > 1 and parallel_workers > 1: | ||
| actual_workers = min(parallel_workers, len(tenants)) | ||
| _lock = threading.Lock() | ||
| _errors: List[str] = [] | ||
| with ThreadPoolExecutor(max_workers=actual_workers) as executor: | ||
| future_to_tenant = { | ||
| executor.submit(_delete_one_tenant, t): t for t in tenants | ||
| } | ||
| for future in as_completed(future_to_tenant): | ||
| t = future_to_tenant[future] | ||
| try: | ||
| with _lock: | ||
| total_deleted += future.result() | ||
| except Exception as exc: | ||
| _errors.append(f"Tenant '{t}': {exc}") | ||
| if _errors: | ||
| raise Exception( | ||
| f"Failed to delete objects in class '{col.name}' for tenant '{tenant}'" | ||
| "Errors during parallel data deletion:\n" + "\n".join(_errors) | ||
| ) |
There was a problem hiding this comment.
delete_data can execute __delete_data concurrently across tenants. When verbose=True, __delete_data prints batch/progress lines; in parallel mode these will interleave across threads and can become unreadable. Consider suppressing verbose per-tenant progress in parallel mode or adding an output lock / buffering strategy so output remains coherent.
| @@ -875,10 +894,10 @@ def create_data( | |||
| multi_vector=multi_vector, | |||
| dynamic_batch=dynamic_batch, | |||
| batch_size=batch_size, | |||
| concurrent_requests=concurrent_requests, | |||
| concurrent_requests=effective_concurrent, | |||
| json_output=json_output, | |||
| ) | |||
| after_length = len(col) | |||
| _after = len(col) | |||
| else: | |||
| if not auto_tenant_creation_enabled and not col.tenants.exists(tenant): | |||
| raise Exception( | |||
| @@ -894,12 +913,12 @@ def create_data( | |||
| f"Tenant '{tenant}' is not active. Please activate it using <update tenants> command" | |||
| ) | |||
| if auto_tenant_creation_enabled and not col.tenants.exists(tenant): | |||
| initial_length = 0 | |||
| _initial = 0 | |||
| else: | |||
| initial_length = len(col.with_tenant(tenant)) | |||
| if not json_output: | |||
| _initial = len(col.with_tenant(tenant)) | |||
| if not json_output and not _parallel_mode: | |||
| click.echo(f"Processing objects for tenant '{tenant}'") | |||
| collection = self.__ingest_data( | |||
| _coll = self.__ingest_data( | |||
| collection=col.with_tenant(tenant), | |||
| num_objects=limit, | |||
| cl=cl_map[consistency_level], | |||
| @@ -911,18 +930,48 @@ def create_data( | |||
| multi_vector=multi_vector, | |||
| dynamic_batch=dynamic_batch, | |||
| batch_size=batch_size, | |||
| concurrent_requests=concurrent_requests, | |||
| concurrent_requests=effective_concurrent, | |||
| json_output=json_output, | |||
| ) | |||
| after_length = len(col.with_tenant(tenant)) | |||
| _after = len(col.with_tenant(tenant)) | |||
| if wait_for_indexing: | |||
| collection.batch.wait_for_vector_indexing() | |||
| inserted = after_length - initial_length | |||
| total_inserted += inserted | |||
| if inserted != limit: | |||
| click.echo( | |||
| f"Error occurred while ingesting data for tenant '{tenant}'. Expected number of objects inserted: {limit}. Actual number of objects inserted: {inserted}. Double check with weaviate-cli get collection" | |||
| _coll.batch.wait_for_vector_indexing() | |||
| _inserted = _after - _initial | |||
| if _inserted != limit: | |||
| with _output_lock: | |||
| click.echo( | |||
| f"Error occurred while ingesting data for tenant '{tenant}'. " | |||
| f"Expected number of objects inserted: {limit}. " | |||
| f"Actual number of objects inserted: {_inserted}. " | |||
| f"Double check with weaviate-cli get collection" | |||
| ) | |||
| return _inserted, _coll | |||
There was a problem hiding this comment.
When _parallel_mode is enabled, _ingest_one_tenant runs __ingest_data, which prints progress/summary output (click.echo/print) per tenant. With multiple threads, this output can interleave and become hard to read (even when verbose=False). Consider serializing stdout writes with a lock, buffering per-tenant output and printing it after the futures complete, or suppressing per-tenant text output in parallel mode and printing a single aggregated summary.
Summary
delete tenantsandupdate tenantsnow issue a single batch API call instead of N individual calls, eliminating the main bottleneck for collections with many tenantscreate data,update data, anddelete datause aThreadPoolExecutorto process multiple tenants concurrently, controlled by a new--parallel_workersoption (default:min(32, cpu_count + 4))parallel_workers > 1, per-tenantconcurrent_requestsis scaled down proportionally so total in-flight connections stay bounded, preventing cluster overloadTest plan
make test)make lint)test_tenant_manager.pyassertions to expect batch API calls (list args) instead of N individual calls — reflects the explicit behavior changetest_data_manager.pycovering:parallel_workers=1concurrent_requestsreduction for parallel tenants (prevents overload)concurrent_requestsunchanged for single tenantCloses #153
🤖 Generated with Claude Code