diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index 1b2e4a20e..0a5db54b6 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -18,7 +18,6 @@ namespace DurableTask.AzureStorage.Messaging using System.IO; using System.Linq; using System.Threading.Tasks; - using Azure; using DurableTask.Core; using DurableTask.Core.History; using Newtonsoft.Json; @@ -234,5 +233,11 @@ bool IsNonexistantInstance() { return this.RuntimeState.Events.Count == 0 || this.RuntimeState.ExecutionStartedEvent == null; } + + public Task EndSessionAsync() + { + // No-op + return Task.CompletedTask; + } } } diff --git a/src/DurableTask.Core/IOrchestrationSession.cs b/src/DurableTask.Core/IOrchestrationSession.cs index bb518cbb1..cd8e1df56 100644 --- a/src/DurableTask.Core/IOrchestrationSession.cs +++ b/src/DurableTask.Core/IOrchestrationSession.cs @@ -11,6 +11,7 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +#nullable enable namespace DurableTask.Core { using System.Collections.Generic; @@ -30,5 +31,11 @@ public interface IOrchestrationSession /// and the dispatcher will shut down the session. /// Task> FetchNewOrchestrationMessagesAsync(TaskOrchestrationWorkItem workItem); + + /// + /// Ends the session. + /// + /// A task that completes when the session has been ended. + Task EndSessionAsync(); } } diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index f63048a1a..a3d163e28 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -193,6 +193,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) if (concurrencyLockAcquired) { this.concurrentSessionLock.Release(); + await workItem.Session.EndSessionAsync(); } } } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index dc97e324e..51866630b 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -295,6 +295,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) "OnProcessWorkItemSession-Release", $"Releasing extended session after {processCount} batch(es)."); this.concurrentSessionLock.Release(); + await workItem.Session.EndSessionAsync(); } } } @@ -553,10 +554,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work orchestratorMessages.AddRange(subOrchestrationRewindMessages); workItem.OrchestrationRuntimeState = newRuntimeState; runtimeState = newRuntimeState; - // Setting this to true here will end an extended session if it is in progress. - // We don't want to save the state across executions, since we essentially manually modify - // the orchestration history here and so that stored by the extended session is incorrect. - isRewinding = true; break; default: throw TraceHelper.TraceExceptionInstance(