Enhance purge with parallel batch deletes and partial purge timeout#1321
Enhance purge with parallel batch deletes and partial purge timeout#1321YunchuWang wants to merge 28 commits intomainfrom
Conversation
- Add TimeSpan? Timeout to PurgeInstanceFilter for partial purge support - Add bool? IsComplete to PurgeHistoryResult to indicate completion status - Add new PurgeInstanceHistoryAsync overload with TimeSpan timeout parameter - Use CancellationToken-based timeout (linked CTS) in DeleteHistoryAsync - Already-dispatched deletions complete before returning partial results - Backward compatible: no timeout = original behavior (IsComplete = null) - Forward IsComplete through ToCorePurgeHistoryResult to PurgeResult - Add scenario tests for partial purge timeout, generous timeout, and compat
- Always cap timeout to 30s max, even if not specified or exceeds 30s - Pass effectiveToken into DeleteAllDataForOrchestrationInstance so in-flight deletes are also cancelled on timeout - Catch OperationCanceledException from Task.WhenAll for timed-out in-flight deletes - External cancellationToken cancellation still propagates normally
There was a problem hiding this comment.
Pull request overview
Improves purge scalability and robustness for DurableTask’s Azure Storage backend by adding parallelized table batch deletes, optional timeout-based partial purging, better 404/idempotency handling, and expanded test coverage.
Changes:
- Add optional purge timeout (
PurgeInstanceFilter.Timeout) and propagate completion status viaIsCompleteinto corePurgeResult. - Implement parallel table batch deletion with 404 fallback to per-entity deletes.
- Add scenario + unit tests for partial purge behavior, blob cleanup, and parallel batch delete behavior.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs | Adds helper API to invoke the new timed purge overload in tests. |
| test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | Adds new purge/partial-purge scenario tests and validation for large-message blob cleanup. |
| src/DurableTask.Core/PurgeInstanceFilter.cs | Introduces optional Timeout for partial purge. |
| src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs | Extends purge-by-time signature to include an optional timeout. |
| src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs | Extends tracking store purge API contract to include optional timeout. |
| src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | Implements timeout-aware, parallel purge-by-time behavior and uses parallel batch delete. |
| src/DurableTask.AzureStorage/Storage/Table.cs | Adds DeleteBatchParallelAsync with transactional chunking and 404 fallback. |
| src/DurableTask.AzureStorage/PurgeHistoryResult.cs | Adds IsComplete and forwards it to core PurgeResult. |
| src/DurableTask.AzureStorage/MessageManager.cs | Improves 404 handling for large-message blob deletion by relying on list/delete with exception handling. |
| src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs | Adds timed purge overload and wires PurgeInstanceFilter.Timeout into the call path. |
| Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs | Adds unit tests validating parallel batch delete chunking, fallback, and cancellation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
- Hard-code 30s CancellationToken-based timeout in DeleteHistoryAsync - Remove configurable Timeout from PurgeInstanceFilter (not needed) - Remove timeout overload from AzureStorageOrchestrationService - IsComplete = true when all purged within 30s, false when timed out - Callers loop until IsComplete = true for large-scale purge
- Add TimeSpan? Timeout property to PurgeInstanceFilter (opt-in, default null) - When null: unbounded purge, IsComplete=null (backward compat, no behavior change) - When set: CancellationToken-based timeout, IsComplete=true/false - Thread Timeout through IOrchestrationServicePurgeClient path - Zero breaking changes: existing callers unaffected
There was a problem hiding this comment.
Pull request overview
This PR enhances the Azure Storage purge pipeline to improve throughput and reliability for large purges by introducing parallelized batch deletes, a timeout-driven partial purge mechanism, and forwarding completion status back to the core purge result shape.
Changes:
- Added
PurgeInstanceFilter.Timeoutand plumbed timeout support into Azure Storage tracking-store purging. - Implemented
Table.DeleteBatchParallelAsyncwith 404/idempotency fallback and updated purge to use it. - Added/updated purge-related tests and extended purge result types to carry
IsComplete.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | Adds new purge scenario tests for scalability/idempotency/large-blob cleanup and a test intended to validate completion semantics. |
| src/DurableTask.Core/PurgeInstanceFilter.cs | Adds Timeout option to the core purge filter contract. |
| src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs | Extends time-range purge signature to accept optional timeout. |
| src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs | Extends tracking store purge API with an optional timeout parameter. |
| src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | Implements timeout-aware, parallel instance purging and returns IsComplete based on timeout. |
| src/DurableTask.AzureStorage/Storage/Table.cs | Adds DeleteBatchParallelAsync with parallel transactions and 404 fallback to individual deletes. |
| src/DurableTask.AzureStorage/PurgeHistoryResult.cs | Adds IsComplete to AzureStorage purge result and forwards it to DurableTask.Core.PurgeResult. |
| src/DurableTask.AzureStorage/MessageManager.cs | Improves 404 handling for large message blob cleanup by relying on try/catch rather than container existence checks. |
| src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs | Wires PurgeInstanceFilter.Timeout into the tracking-store purge path used by IOrchestrationServicePurgeClient. |
| Test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs | Adds unit tests for DeleteBatchParallelAsync (but currently placed outside the referenced test project directory). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
Show resolved
Hide resolved
- Update PurgeInstanceFilter.Timeout docs: in-flight deletions are cancelled (intentional) - Add using var for SemaphoreSlim disposal - Fix DateTime.Now/UtcNow mixing in purge tests (use UtcNow consistently) - Rename PurgeReturnsIsComplete test to match actual assertions - Move TableDeleteBatchParallelTests.cs from Test/ to test/ (correct project path) - Fix typos: grater->greater, status->statuses in XML docs - Use LINQ Select for foreach loop per code quality suggestion
There was a problem hiding this comment.
Pull request overview
This PR improves the Azure Storage purge pipeline to better handle large-scale instance purges by adding parallelized table batch deletes, introducing an optional timeout for partial purges, and improving idempotency around already-deleted storage artifacts. It also expands scenario/unit test coverage to validate the new purge behaviors and scalability characteristics.
Changes:
- Add
PurgeInstanceFilter.Timeoutand propagateIsCompleteviaPurgeHistoryResult→PurgeResult. - Implement parallel table batch deletion with 404 fallback to per-entity deletes.
- Update purge and blob cleanup implementations for better cancellation/timeout behavior and add comprehensive tests.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs | Adds unit tests validating new parallel batch delete behavior (including 404 fallback and cancellation). |
| test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | Adds end-to-end purge scenario tests and uses UTC timestamps for purge windows. |
| src/DurableTask.Core/PurgeInstanceFilter.cs | Introduces optional Timeout for partial purge semantics. |
| src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs | Extends tracking store purge API shape to accept optional timeout. |
| src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs | Updates tracking store interface to include optional timeout parameter. |
| src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | Implements timeout-linked cancellation + throttled parallel instance purges and uses parallel history row deletes. |
| src/DurableTask.AzureStorage/Storage/Table.cs | Adds DeleteBatchParallelAsync with concurrent chunk submission and 404 fallback behavior. |
| src/DurableTask.AzureStorage/PurgeHistoryResult.cs | Adds IsComplete and forwards completion to core PurgeResult. |
| src/DurableTask.AzureStorage/MessageManager.cs | Improves large-message blob deletion to handle missing containers via exception-based 404 handling. |
| src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs | Threads the new timeout value through purge calls and fixes doc typos. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
Show resolved
Hide resolved
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
- PurgeHistoryResultTests: constructor IsComplete (true/false/null), ToCorePurgeHistoryResult propagation, backward compat - PurgeInstanceFilterTests: Timeout default null, set/reset, PurgeResult IsComplete tri-state, old constructor compat
|
Regarding the pendingTasks memory concern: With the new opt-in timeout feature (default 30s when used), the maximum number of pending tasks is naturally bounded by how many instances can be dispatched within the timeout window (~100 concurrent 30s a few thousand tasks at most). For the no-timeout path (backward compat), the existing behavior is preserved. The SemaphoreSlim(100) already limits actual concurrency. Switching to Parallel.ForEachAsync would be a larger refactor that changes the async enumeration pattern better suited for a follow-up. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
- Revert to effectiveToken so in-flight deletes are cancelled on timeout - Update PurgeInstanceFilter.Timeout XML doc to match behavior - Docs and comments now consistently say in-flight deletes are cancelled
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
Show resolved
Hide resolved
|
Hi @cgillum Could you please review this PR? |
|
|
||
| /// <inheritdoc /> | ||
| public virtual Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, CancellationToken cancellationToken = default) | ||
| public virtual Task<PurgeHistoryResult> PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable<OrchestrationStatus> runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Why do we need a TimeSpan timeout parameter when we already have CancellationToken? We should be using the existing CancellationToken for timeout handling.
There was a problem hiding this comment.
Good question — I traced the full call chain and there are two reasons we need the explicit imeout parameter rather than relying solely on CancellationToken:
1. CancellationToken cannot flow through the current call chain
There are 3 breaks where CancellationToken is dropped:
| Layer | Method | CancellationToken? |
|---|---|---|
SDK (GrpcDurableTaskClient) |
PurgeInstancesCoreAsync |
✅ Passes to gRPC stub |
Extension (LocalGrpcListener.PurgeInstances) |
context.CancellationToken available |
❌ Not passed (compare: adjacent QueryInstances does pass it) |
Core (IOrchestrationServicePurgeClient) |
PurgeInstanceStateAsync(PurgeInstanceFilter) |
❌ Interface has no CancellationToken parameter |
Azure Storage (AzureStorageOrchestrationService) |
PurgeInstanceHistoryAsync |
❌ Calls tracking store with default |
Tracking Store (ITrackingStore) |
PurgeInstanceHistoryAsync |
✅ Has CancellationToken = default but never receives a real one |
The root cause is IOrchestrationServicePurgeClient — it's a public interface with no CancellationToken. Fixing that is a breaking change across all backend implementations.
2. Even if CancellationToken flowed through, gRPC deadline kills the channel before we can return partial results
The isolated worker SDK calls purge over gRPC with a ~30s deadline. When the deadline fires:
context.CancellationTokenis cancelled- The gRPC channel is closed immediately
- The server cannot send back a response — the client gets
RpcException(DeadlineExceeded) - The client receives zero information about how many instances were deleted
The timeout parameter acts as a soft timeout (e.g. 25s) that fires before the gRPC deadline (30s), giving the server a 5-second window to:
- Stop accepting new instance deletions
- Wait for in-flight deletes to finish
- Build and return
PurgeResult { DeletedInstanceCount = 17402, IsComplete = false } - The client loops and calls again
This is the same pattern DTS uses internally (PurgeTimeout = 25s capped to avoid grain call timeouts).
Summary
CancellationToken = hard cancel (can't return partial result after gRPC deadline)
Timeout = soft timeout (returns partial result before deadline hits)
They solve different problems and aren't mutually exclusive. Happy to also fix the CancellationToken gaps (LocalGrpcListener + interface) as a separate follow-up if you'd prefer.
There was a problem hiding this comment.
Looks like Copilot posted duplicate responses here. Also, I think it's over analyzing my question. Decoupling the cancellation token from the gRPC request cancellation makes sense, but that's not what I'm asking. You can still use the existing cancellation token parameter to implement a purge timeout that's separate from the request timeout.
There was a problem hiding this comment.
Makes sense — addressed. Removed the TimeSpan? timeout parameter from all internal method signatures (ITrackingStore, TrackingStoreBase, AzureTableTrackingStore). The Timeout-to-CancellationToken conversion now happens once at the AzureStorageOrchestrationService layer in PurgeInstanceStateAsync(PurgeInstanceFilter), and the tracking store just observes the CancellationToken. PurgeInstanceFilter.Timeout property is kept as the API surface for callers (additive, not breaking). Also cleaned up the duplicate responses — sorry about that. @cgillum
Address PR review: replace reliance on global ThrottlingHttpPipelinePolicy with a local SemaphoreSlim(10) in DeleteBatchParallelAsync to prevent purge operations from starving other storage operations.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs
Outdated
Show resolved
Hide resolved
…rallelTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/DurableTask.AzureStorage/MessageManager.cs:1
- The implementation only counts the list operation when no blobs are found. When blobs exist,
storageOperationCountcurrently counts deletes only, despite the comment indicating the list request should be counted as well. Consider counting the list operation unconditionally (e.g., start at 1, or add 1 after the listing completes) so metrics remain consistent with the comment and prior behavior.
// ----------------------------------------------------------------------------------
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Outdated
Show resolved
Hide resolved
…omments - Replace hardcoded MaxPurgeInstanceConcurrency=100 with MaxStorageOperationConcurrency/3, ensuring purge never takes more than 1/3 of the global HTTP budget and auto-scales with deployment size. - Fix stale test comment that referenced 'no internal semaphore' after SemaphoreSlim(10) was added to DeleteBatchParallelAsync. - Change flaky 1ms timeout in scenario test to 100ms to avoid OS timer resolution issues (~15ms on Windows).
|
@cgillum Hi Chris, could you kindly take a look at this PR when you get a chance? Would really appreciate your review. Thanks! |
Address cgillum's review: remove TimeSpan? timeout from internal method signatures (ITrackingStore, TrackingStoreBase, AzureTableTrackingStore). Convert PurgeInstanceFilter.Timeout to CancellationToken at the Service layer in PurgeInstanceStateAsync, then pass it through the existing CancellationToken parameter. The tracking store only observes a single cancellation mechanism. PurgeInstanceFilter.Timeout property is preserved (additive, not breaking).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Outdated
Show resolved
Hide resolved
src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Outdated
Show resolved
Hide resolved
…icePurgeClient Don't expose a public method just for testing. The test helper now calls IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter) which is the real production call path and handles Timeout->CT conversion internally.
… expression' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Fix P0: When no timeout is specified, route through the original code path (no CancellationToken) so IsComplete remains null for backward compatibility. Only create CancellationTokenSource(timeout) when PurgeInstanceFilter.Timeout is set. - Simplify 'timedOut ? false : true' to '!timedOut'. - Fix null warning: use null-forgiving operator for test loop variable. - Count failed batch attempt in RequestCount (+1) for accurate metrics. - Increase timeout test instance count from 10 to 50 for reliability.
Summary
Enhance the Azure Storage purge implementation with parallel batch deletes, CancellationToken-based partial purge timeout, improved error handling, and comprehensive tests.
Motivation
Purging large numbers of orchestration instances (100K+) with the current implementation causes:
DeleteBatchAsyncfails with 404 when entities are already deleted (race condition)Changes
Core (DurableTask.Core)
PurgeInstanceFilter.Timeout(TimeSpan?): Optional timeout for partial purgePurgeResult.IsComplete(bool?): Already existed, now properly populatedAzure Storage (DurableTask.AzureStorage)
PurgeHistoryResult.IsComplete: New property + constructor overload, forwarded viaToCorePurgeHistoryResult()AzureStorageOrchestrationService.PurgeInstanceHistoryAsync(..., TimeSpan timeout): New overloadAzureTableTrackingStore.DeleteHistoryAsync: CancellationToken-based timeout using linkedCancellationTokenSourceTable.DeleteBatchParallelAsync: New parallel batch delete with concurrent transactions and 404 fallbackMessageManager.DeleteLargeMessageBlobs: Fixed 404 handling with try/catch instead ofExistsAsync+ deleteSemaphoreSlim(100)for instance-level parallelismBehavior
When
Timeoutis set:CancellationTokenSource(timeout)linked with the caller'sCancellationTokenThrowIfCancellationRequestedOperationCanceledException, waits for in-flight deletions, returnsIsComplete = falseWhen
Timeoutis not set:IsComplete = nullfor backward compatibility)Benchmark Results
100K Instances (EP1, separate ASPs/storage)
500K Instances (EP1, isolated worker SDK path with 25s timeout)
Breaking Changes
None. All changes are additive:
Timeoutproperty onPurgeInstanceFilterPurgeHistoryResultPurgeInstanceHistoryAsyncoverload (original method unchanged)Tests Added
PartialPurge_TimesOutThenCompletesOnRetryPartialPurge_GenerousTimeout_CompletesAllPartialPurge_WithoutTimeout_ReturnsNullIsCompletePurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidationPurgeSingleInstanceWithIdempotencyPurgeSingleInstance_WithLargeBlobs_CleansUpBlobsPurgeInstance_WithManyHistoryRows_DeletesAllDeleteBatchParallelAsyncRelated PRs