From de244ff99e8fcd8aaa38d634fb93d526481b0b51 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 18 Mar 2026 17:39:49 -0700 Subject: [PATCH 1/8] initial implementation --- .../Entities/EventFormat/RequestMessage.cs | 5 + .../Entities/OrchestrationEntityContext.cs | 10 + src/DurableTask.Core/History/HistoryEvent.cs | 6 + src/DurableTask.Core/IOrchestrationService.cs | 6 + src/DurableTask.Core/Logging/EventIds.cs | 1 + src/DurableTask.Core/Logging/LogEvents.cs | 49 +++ src/DurableTask.Core/Logging/LogHelper.cs | 44 ++- .../TaskActivityDispatcher.cs | 282 +++++++++++------- src/DurableTask.Core/TaskEntityDispatcher.cs | 151 +++++++++- .../TaskOrchestrationDispatcher.cs | 32 +- 10 files changed, 460 insertions(+), 126 deletions(-) diff --git a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs index ce355ab12..9c208d7b6 100644 --- a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs +++ b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs @@ -122,6 +122,11 @@ internal class RequestMessage : EntityMessage /// public string? ClientSpanId { get; set; } + /// + /// The dispatch count of this request message. + /// + public int DispatchCount { get; set; } + /// public override string GetShortDescription() { diff --git a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs index 323cba441..fb0c7ee17 100644 --- a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs +++ b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs @@ -341,6 +341,16 @@ public void CompleteAcquire(OperationResult result, Guid criticalSectionId) this.lockAcquisitionPending = false; } + /// + /// Called when the entity lock acquisition fails. + /// + public void AbandonAcquire() + { + this.criticalSectionLocks = null; + this.criticalSectionId = null; + this.lockAcquisitionPending = false; + } + internal void AdjustOutgoingMessage(string instanceId, RequestMessage requestMessage, DateTime? cappedTime, out string eventName) { if (cappedTime.HasValue) diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs index 8e6a8f316..a1a1317a8 100644 --- a/src/DurableTask.Core/History/HistoryEvent.cs +++ b/src/DurableTask.Core/History/HistoryEvent.cs @@ -89,6 +89,12 @@ protected HistoryEvent(int eventId) [DataMember] public virtual EventType EventType { get; private set; } + /// + /// Gets or sets the number of times this event has been dispatched. + /// + [DataMember] + public int DispatchCount { get; set; } + /// /// Implementation for . /// diff --git a/src/DurableTask.Core/IOrchestrationService.cs b/src/DurableTask.Core/IOrchestrationService.cs index dc4b61c70..76b6b664a 100644 --- a/src/DurableTask.Core/IOrchestrationService.cs +++ b/src/DurableTask.Core/IOrchestrationService.cs @@ -102,6 +102,12 @@ public interface IOrchestrationService /// BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } + /// + /// Gets the maximum amount of times the same event can be dispatched before it is considered "poisonous" + /// and the corresponding operation is failed. + /// + int MaxDispatchCount { get; } + /// /// Wait for the next orchestration work item and return the orchestration work item /// diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs index f23459c79..e31fa7067 100644 --- a/src/DurableTask.Core/Logging/EventIds.cs +++ b/src/DurableTask.Core/Logging/EventIds.cs @@ -71,5 +71,6 @@ static class EventIds public const int OrchestrationDebugTrace = 73; public const int OrchestrationCompletedWithWarning = 74; + public const int PoisonMessageDetected = 75; } } diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index 360528638..7f3611793 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -1945,5 +1945,54 @@ void IEventSourceEvent.WriteEventSource() => Utils.PackageVersion); } + /// + /// Log event representing the discarding of a "poison" message. + /// + internal class PoisonMessageDetected : StructuredLogEvent, IEventSourceEvent + { + public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, string taskEventId, string details) + { + this.InstanceId = orchestrationInstance?.InstanceId ?? string.Empty; + this.ExecutionId = orchestrationInstance?.ExecutionId ?? string.Empty; + this.EventType = eventType; + this.EventId = taskEventId; + this.Details = details; + } + + [StructuredLogField] + public string InstanceId { get; } + + [StructuredLogField] + public string ExecutionId { get; } + + [StructuredLogField] + public string EventType { get; } + + [StructuredLogField] + public string EventId { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.PoisonMessageDetected, + nameof(EventIds.PoisonMessageDetected)); + + public override LogLevel Level => LogLevel.Error; + + protected override string CreateLogMessage() => + $"{this.InstanceId}: Poison message detected for {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + StructuredEventSource.Log.DiscardingMessage( + this.InstanceId, + this.ExecutionId, + this.EventType, + this.TaskEventId, + this.Details, + Utils.AppName, + Utils.PackageVersion); + } + } } diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 1ae501b79..407a251c4 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -13,13 +13,15 @@ #nullable enable namespace DurableTask.Core.Logging { - using System; - using System.Collections.Generic; - using System.Text; using DurableTask.Core.Command; + using DurableTask.Core.Common; + using DurableTask.Core.Entities.EventFormat; using DurableTask.Core.Entities.OperationFormat; using DurableTask.Core.History; using Microsoft.Extensions.Logging; + using System; + using System.Collections.Generic; + using System.Text; class LogHelper { @@ -745,6 +747,42 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception); } } + + /// + /// Logs that a "poison message" has been detected and is being dropped. + /// + /// The orchestration instance this event was sent to. + /// The "poisoned" event. + /// Extra details related to the processing of this poison message. + internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string details) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog(new LogEvents.PoisonMessageDetected( + orchestrationInstance, + historyEvent.EventType.ToString(), + Utils.GetTaskEventId(historyEvent).ToString(), + details)); + } + } + + /// + /// Logs that a "poison" entity request message has been detected and is being dropped. + /// + /// The orchestration instance this event was sent to. + /// The "poisoned" reuest message. + /// Extra details related to the processing of this poison message. + internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, RequestMessage requestMessage, string details) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog(new LogEvents.PoisonMessageDetected( + orchestrationInstance, + requestMessage.IsLockRequest ? "LockRequest" : "OperationRequest", + requestMessage.Id.ToString(), + details)); + } + } #endregion internal void OrchestrationDebugTrace(string instanceId, string executionId, string details) diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 8f4c24dca..b4ac5d898 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -37,6 +37,7 @@ public sealed class TaskActivityDispatcher readonly LogHelper logHelper; readonly ErrorPropagationMode errorPropagationMode; readonly IExceptionPropertiesProvider? exceptionPropertiesProvider; + readonly int maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -61,6 +62,7 @@ internal TaskActivityDispatcher( this.logHelper = logHelper; this.errorPropagationMode = errorPropagationMode; this.exceptionPropertiesProvider = exceptionPropertiesProvider; + this.maxDispatchCount = orchestrationService.MaxDispatchCount; this.dispatcher = new WorkItemDispatcher( "TaskActivityDispatcher", @@ -120,6 +122,18 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) this.logHelper.TaskActivityDispatcherError( workItem, $"The activity worker received a message that does not have any OrchestrationInstance information."); + if (taskMessage.Event.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + orchestrationInstance, + taskMessage.Event, + $"Activity has received an event with no parent orchestration instance and dispatch count " + + $"{taskMessage.Event.DispatchCount} which exceeds the maximum dispatch count of {this.maxDispatchCount}. " + + $"The event will be discarded."); + // All orchestration services that implement poison message handling must have logic to handle a null response message in this case. + await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage: null); + return; + } throw TraceHelper.TraceException( TraceEventType.Error, "TaskActivityDispatcher-MissingOrchestrationInstance", @@ -131,11 +145,23 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) this.logHelper.TaskActivityDispatcherError( workItem, $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported."); + if (taskMessage.Event.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + orchestrationInstance, + taskMessage.Event, + $"Activity has received an event of invalid type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' " + + $"is supported. Since the dispatch count of the event {taskMessage.Event.DispatchCount} exceeds the maximum dispatch " + + $"count of {this.maxDispatchCount}, the event will be discarded."); + // All orchestration services that implement poison message handling must have logic to handle a null response message in this case. + await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage: null); + return; + } throw TraceHelper.TraceException( TraceEventType.Critical, "TaskActivityDispatcher-UnsupportedEventType", new NotSupportedException("Activity worker does not support event of type: " + - taskMessage.Event.EventType)); + taskMessage.Event.EventType)); } scheduledEvent = (TaskScheduledEvent)taskMessage.Event; @@ -147,132 +173,170 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) { string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name."; this.logHelper.TaskActivityDispatcherError(workItem, message); - throw TraceHelper.TraceException( - TraceEventType.Error, - "TaskActivityDispatcher-MissingActivityName", - new InvalidOperationException(message)); + if (taskMessage.Event.DispatchCount <= this.maxDispatchCount) + { + throw TraceHelper.TraceException( + TraceEventType.Error, + "TaskActivityDispatcher-MissingActivityName", + new InvalidOperationException(message)); + } } - this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent); - TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version); - - if (workItem.LockedUntilUtc < DateTime.MaxValue) + HistoryEvent? eventToRespond = null; + if (taskMessage.Event.DispatchCount > this.maxDispatchCount) { - // start a task to run RenewUntil - renewTask = Task.Factory.StartNew( - () => this.RenewUntil(workItem, renewCancellationTokenSource.Token), - renewCancellationTokenSource.Token); + string messageSuffix = string.Empty; + if (scheduledEvent.Name == null) + { + messageSuffix = " The event also does not specify an Activity name."; + } + + this.logHelper.PoisonMessageDetected( + orchestrationInstance, + taskMessage.Event, + $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " + + $"count of {this.maxDispatchCount}.{messageSuffix} The task will be failed."); + string reason = $"Activity {EventType.TaskScheduled} event has dispatch count {taskMessage.Event.DispatchCount} " + + $"which exceeds the maximum dispatch count of {this.maxDispatchCount}.{messageSuffix}"; + + eventToRespond = new TaskFailedEvent( + -1, + scheduledEvent.EventId, + reason: null, + details: null, + new + ( + "PoisonMessage", + reason, + stackTrace: null, + innerFailure: null, + isNonRetriable: true) + ); + traceActivity?.SetStatus(ActivityStatusCode.Error, reason); } + else + { + this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent); + TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name!, scheduledEvent.Version); - var dispatchContext = new DispatchMiddlewareContext(); - dispatchContext.SetProperty(taskMessage.OrchestrationInstance); - dispatchContext.SetProperty(taskActivity); - dispatchContext.SetProperty(scheduledEvent); + if (workItem.LockedUntilUtc < DateTime.MaxValue) + { + // start a task to run RenewUntil + renewTask = Task.Factory.StartNew( + () => this.RenewUntil(workItem, renewCancellationTokenSource.Token), + renewCancellationTokenSource.Token); + } - // In transitionary phase (activity queued from old code, accessed in new code) context can be null. - if (taskMessage.OrchestrationExecutionContext != null) - { - dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext); - } + var dispatchContext = new DispatchMiddlewareContext(); + dispatchContext.SetProperty(taskMessage.OrchestrationInstance); + dispatchContext.SetProperty(taskActivity); + dispatchContext.SetProperty(scheduledEvent); - // correlation - CorrelationTraceClient.Propagate(() => - { - workItem.TraceContextBase?.SetActivityToCurrent(); - diagnosticActivity = workItem.TraceContextBase?.CurrentActivity; - }); + // In transitionary phase (activity queued from old code, accessed in new code) context can be null. + if (taskMessage.OrchestrationExecutionContext != null) + { + dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext); + } - ActivityExecutionResult? result; - try - { - await this.dispatchPipeline.RunAsync(dispatchContext, async _ => + // correlation + CorrelationTraceClient.Propagate(() => { - if (taskActivity == null) - { - // This likely indicates a deployment error of some kind. Because these unhandled exceptions are - // automatically retried, resolving this may require redeploying the app code so that the activity exists again. - // CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps - // the app owner doesn't care to preserve existing instances when doing code deployments? - throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found"); - } - - var context = new TaskContext( - taskMessage.OrchestrationInstance, - scheduledEvent.Name, - scheduledEvent.Version, - scheduledEvent.EventId); - context.ErrorPropagationMode = this.errorPropagationMode; - context.ExceptionPropertiesProvider = this.exceptionPropertiesProvider; - - HistoryEvent? responseEvent; - - try - { - string? output = await taskActivity.RunAsync(context, scheduledEvent.Input); - responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output); - } - catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e)) - { - // These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from - // activities are expected to be translated into TaskFailureException and handled outside the middleware - // context (see further below). - TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e); - string? details = this.IncludeDetails - ? $"Unhandled exception while executing task: {e}" - : null; - responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e)); - - traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message); - - this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)responseEvent, e); - } - - var result = new ActivityExecutionResult { ResponseEvent = responseEvent }; - dispatchContext.SetProperty(result); + workItem.TraceContextBase?.SetActivityToCurrent(); + diagnosticActivity = workItem.TraceContextBase?.CurrentActivity; }); - result = dispatchContext.GetProperty(); - } - catch (TaskFailureException e) - { - // These are normal task activity failures. They can come from Activity implementations or from middleware. - TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e); - string? details = this.IncludeDetails ? e.Details : null; - var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails); + ActivityExecutionResult? result; + try + { + await this.dispatchPipeline.RunAsync(dispatchContext, async _ => + { + if (taskActivity == null) + { + // This likely indicates a deployment error of some kind. Because these unhandled exceptions are + // automatically retried, resolving this may require redeploying the app code so that the activity exists again. + // CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps + // the app owner doesn't care to preserve existing instances when doing code deployments? + throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found"); + } + + var context = new TaskContext( + taskMessage.OrchestrationInstance, + scheduledEvent.Name!, + scheduledEvent.Version, + scheduledEvent.EventId); + context.ErrorPropagationMode = this.errorPropagationMode; + context.ExceptionPropertiesProvider = this.exceptionPropertiesProvider; + + HistoryEvent? responseEvent; + + try + { + string? output = await taskActivity.RunAsync(context, scheduledEvent.Input); + responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output); + } + catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e)) + { + // These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from + // activities are expected to be translated into TaskFailureException and handled outside the middleware + // context (see further below). + TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e); + string? details = this.IncludeDetails + ? $"Unhandled exception while executing task: {e}" + : null; + responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e)); + + traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message); + + this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name!, (TaskFailedEvent)responseEvent, e); + } + + var result = new ActivityExecutionResult { ResponseEvent = responseEvent }; + dispatchContext.SetProperty(result); + }); + + result = dispatchContext.GetProperty(); + } + catch (TaskFailureException e) + { + // These are normal task activity failures. They can come from Activity implementations or from middleware. + TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e); + string? details = this.IncludeDetails ? e.Details : null; + var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails); - traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message); + traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message); - this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, failureEvent, e); - CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e)); - result = new ActivityExecutionResult { ResponseEvent = failureEvent }; - } - catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException)) - { - traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message); + this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name!, failureEvent, e); + CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e)); + result = new ActivityExecutionResult { ResponseEvent = failureEvent }; + } + catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException)) + { + traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message); - // These are considered retriable - this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}"); - throw; - } + // These are considered retriable + this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}"); + throw; + } - HistoryEvent? eventToRespond = result?.ResponseEvent; + eventToRespond = result?.ResponseEvent; - if (eventToRespond is TaskCompletedEvent completedEvent) - { - this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name, completedEvent); - } - else if (eventToRespond is null) - { - // Default response if middleware prevents a response from being generated - eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null); - } + if (eventToRespond is TaskCompletedEvent completedEvent) + { + this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name!, completedEvent); + } + else if (eventToRespond is null) + { + // Default response if middleware prevents a response from being generated + eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null); + } - if (traceActivity != null && eventToRespond is TaskCompletedEvent) - { - // Ensure successful executions don't preserve a prior error status from custom instrumentation. - traceActivity.SetStatus(ActivityStatusCode.OK, "Completed"); + if (traceActivity != null && eventToRespond is TaskCompletedEvent) + { + // Ensure successful executions don't preserve a prior error status from custom instrumentation. + traceActivity.SetStatus(ActivityStatusCode.OK, "Completed"); + } } - + var responseTaskMessage = new TaskMessage { Event = eventToRespond, diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index f63048a1a..27d0f7954 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -25,6 +25,7 @@ namespace DurableTask.Core using System; using System.Collections.Generic; using System.Diagnostics; + using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -43,6 +44,7 @@ public class TaskEntityDispatcher readonly ErrorPropagationMode errorPropagationMode; readonly TaskOrchestrationDispatcher.NonBlockingCountdownLock concurrentSessionLock; readonly IExceptionPropertiesProvider exceptionPropertiesProvider; + readonly int maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -69,7 +71,8 @@ internal TaskEntityDispatcher( this.exceptionPropertiesProvider = exceptionPropertiesProvider; this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!; this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties; - + this.maxDispatchCount = orchestrationService.MaxDispatchCount; + this.dispatcher = new WorkItemDispatcher( "TaskEntityDispatcher", item => item == null ? string.Empty : item.InstanceId, @@ -434,16 +437,19 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( return schedulerState; } - void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) + void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request, FailureDetails failureDetails = null) { - this.logHelper.EntityLockAcquired(effects.InstanceId, request); + if (request.DispatchCount <= this.maxDispatchCount) + { + this.logHelper.EntityLockAcquired(effects.InstanceId, request); - // mark the entity state as locked - schedulerState.LockedBy = request.ParentInstanceId; + // mark the entity state as locked + schedulerState.LockedBy = request.ParentInstanceId; - request.Position++; + request.Position++; + } - if (request.Position < request.LockSet.Length) + if (request.Position < request.LockSet.Length && request.DispatchCount <= this.maxDispatchCount) { // send lock request to next entity in the lock set var target = new OrchestrationInstance() { InstanceId = request.LockSet[request.Position].ToString() }; @@ -453,7 +459,22 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, { // send lock acquisition completed response back to originating orchestration instance var target = new OrchestrationInstance() { InstanceId = request.ParentInstanceId, ExecutionId = request.ParentExecutionId }; - this.SendLockResponseMessage(effects, target, request.Id); + + // In the case of a poison message, it will be the locking instance's responsibility to unlock any other entities for whom the + // lock request may have succeeded. + this.SendLockResponseMessage( + effects, + target, + request.Id, + request.DispatchCount > this.maxDispatchCount ? + new FailureDetails( + "PoisonMessage", + $"Entity lock request has dispatch count {request.DispatchCount} " + + $"which exceeds the maximum dispatch count of {this.maxDispatchCount}.", + stackTrace: null, + innerFailure: null, + isNonRetriable: true) + : null); } } @@ -519,8 +540,18 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } catch (Exception exception) { + if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + eventRaisedEvent, + $"Dropping entity message after deserialization error since its dispatch count {eventRaisedEvent.DispatchCount} " + + $"exceeds the maximum dispatch count of {this.maxDispatchCount}. Stopping processing of remaining requests."); + break; + } throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception); } + requestMessage.DispatchCount = eventRaisedEvent.DispatchCount; IEnumerable deliverNow; @@ -584,20 +615,53 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } catch (Exception exception) { + // We don't necessarily want to unlock the entity in this case because this release message may have been issued by an instance that does + // not currently hold the lock. + if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + eventRaisedEvent, + $"Dropping entity lock release message after deserialization error since its dispatch count {eventRaisedEvent.DispatchCount} " + + $"exceeds the maximum dispatch count of {this.maxDispatchCount}. Stopping processing of remaining requests."); + break; + } throw new EntitySchedulerException("Failed to deserialize lock release message - may be corrupted or wrong version.", exception); } + // Even if the message has exceeded the dispatch count, we will still honor the request and unlock the entity to avoid leaving it in a bad state. if (schedulerState.LockedBy == message.ParentInstanceId) { this.logHelper.EntityLockReleased(instanceId, message); schedulerState.LockedBy = null; } + + if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + eventRaisedEvent, + $"Entity lock release message from parent instance '{message.ParentInstanceId}' processed but stopping processing " + + $"of remaining messages since the dispatch count for this message {eventRaisedEvent.DispatchCount} " + + $"has exceeded the maximum allowed dispatch count {this.maxDispatchCount}."); + break; + } } else { // this is a continue message. // Resumes processing of previously queued operations, if any. schedulerState.Suspended = false; + + if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + eventRaisedEvent, + $"Entity self-continue message processed but stopping processing of remaining messages since the dispatch count " + + $"for this message {eventRaisedEvent.DispatchCount} has exceeded the maximum allowed dispatch count {this.maxDispatchCount}."); + break; + } } break; @@ -626,6 +690,14 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } var request = schedulerState.Dequeue(); + if (request.DispatchCount > this.maxDispatchCount) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + request, + $"Entity request has dispatch count {request.DispatchCount} which exceeds the maximum dispatch count " + + $"of {this.maxDispatchCount} and will be failed."); + } if (request.IsLockRequest) { @@ -720,7 +792,7 @@ public void ToBeContinued(SchedulerState schedulerState) parentTraceContext); } - // We still want to add the trace activity to the list even if it was not successfully created and is null. This is because otherwise we have no easy way of mapping OperationResults to Activities otherwise if the lists + // We still want to add the trace activity to the list even if it was not successfully created and is null. This is because otherwise we have no easy way of mapping OperationResults to Activities if the lists // do not have the same length in TraceHelper.EndActivitiesForProcessingEntityInvocation. We will simply skip ending the Activity if it is null in this method traceActivities.Add(traceActivity); @@ -845,12 +917,13 @@ void SendLockRequestMessage(WorkItemEffects effects, SchedulerState schedulerSta this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.RequestMessageEventName, message); } - void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance target, Guid requestId) + void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance target, Guid requestId, FailureDetails failureDetails) { var message = new ResponseMessage() { // content is ignored by receiver but helps with tracing - Result = ResponseMessage.LockAcquisitionCompletion, + Result = ResponseMessage.LockAcquisitionCompletion, + FailureDetails = failureDetails, }; this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.ResponseMessageEventName(requestId), message); } @@ -954,13 +1027,30 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState) { + var startTime = DateTime.UtcNow; var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId); + + bool poisonMessagesExist = workToDoNow.Operations.Any(op => op.DispatchCount > this.maxDispatchCount); + var operationsToSend = operations; + + if (poisonMessagesExist) + { + operationsToSend = new List(); + for (int i = 0; i < operations.Count; i++) + { + if (workToDoNow.Operations[i].DispatchCount <= this.maxDispatchCount) + { + operationsToSend.Add(operations[i]); + } + } + } + // the request object that will be passed to the worker var request = new EntityBatchRequest() { InstanceId = instance.InstanceId, EntityState = serializedEntityState, - Operations = operations, + Operations = operationsToSend, }; this.logHelper.EntityBatchExecuting(request); @@ -998,11 +1088,46 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => } var result = await taskEntity.ExecuteOperationBatchAsync(request); - + dispatchContext.SetProperty(result); }); var result = dispatchContext.GetProperty(); + + if (poisonMessagesExist) + { + var resultAfterPoisonMessageHandling = new List(operations.Count); + int middlewareResultIndex = 0; + + for (int i = 0; i < operations.Count; i++) + { + if (workToDoNow.Operations[i].DispatchCount <= this.maxDispatchCount) + { + resultAfterPoisonMessageHandling.Add(result.Results[middlewareResultIndex++]); + } + else + { + resultAfterPoisonMessageHandling.Add( + new() + { + FailureDetails = new + ( + "PoisonMessage", + $"Entity operation request has dispatch count {workToDoNow.Operations[i].DispatchCount} " + + $"which exceeds the maximum dispatch count of {this.maxDispatchCount}.", + stackTrace: null, + innerFailure: null, + isNonRetriable: true + ), + StartTimeUtc = startTime, + EndTimeUtc = DateTime.UtcNow + } + ); + } + } + result.Results = resultAfterPoisonMessageHandling; + } + TraceHelper.EndActivitiesForProcessingEntityInvocation(traceActivities, result.Results, result.FailureDetails); this.logHelper.EntityBatchExecuted(request, result); diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index c85536793..871c77473 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -50,6 +50,7 @@ public class TaskOrchestrationDispatcher readonly TaskOrchestrationEntityParameters? entityParameters; readonly VersioningSettings? versioningSettings; readonly IExceptionPropertiesProvider? exceptionPropertiesProvider; + readonly int maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -80,6 +81,7 @@ internal TaskOrchestrationDispatcher( this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties); this.versioningSettings = versioningSettings; this.exceptionPropertiesProvider = exceptionPropertiesProvider; + this.maxDispatchCount = orchestrationService.MaxDispatchCount; this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", @@ -432,6 +434,34 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work } } + var poisonEvents = runtimeState.NewEvents.Where(evt => evt.DispatchCount > this.maxDispatchCount); + if (poisonEvents.Any()) + { + foreach (var poisonEvent in poisonEvents) + { + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance!, + poisonEvent, + $"Orchestration has received an event with dispatch count {poisonEvent.DispatchCount} which exceeds the maximum dispatch" + + $"count of {this.maxDispatchCount} and will be failed."); + } + + var failureAction = new OrchestrationCompleteOrchestratorAction + { + Id = runtimeState.PastEvents.Count, + FailureDetails = new FailureDetails( + "PoisonMessages", + $"Orchestration has received messages of type {string.Join(",", poisonEvents.Select(e => e.EventType))} " + + $"with dispatch counts {string.Join(",", poisonEvents.Select(e => e.DispatchCount))} which exceed the " + + $"maximum dispatch count of {this.maxDispatchCount}.", + stackTrace: null, + innerFailure: null, + isNonRetriable: true), + OrchestrationStatus = OrchestrationStatus.Failed, + }; + decisions = new List { failureAction }; + } + this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name); TraceHelper.TraceInstance( TraceEventType.Verbose, @@ -440,7 +470,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work "Executing user orchestration: {0}", JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)); - if (!versioningFailed) + if (!versioningFailed && !poisonEvents.Any()) { // In this case we skip the orchestration's execution since all tasks have been completed and it is in a terminal state. // Instead we "rewind" its execution by removing all failed tasks (see ProcessRewindOrchestrationDecision). From 8f6bf9d4400df242552a1ae72f5e6fb0cea55189 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 24 Mar 2026 13:11:39 -0700 Subject: [PATCH 2/8] fixing the compilation errors --- .../FabricOrchestrationService.cs | 2 ++ .../AzureStorageOrchestrationService.cs | 8 ++++++++ src/DurableTask.Core/Logging/LogEvents.cs | 6 +++--- src/DurableTask.Core/Logging/LogHelper.cs | 4 ++-- src/DurableTask.Emulator/LocalOrchestrationService.cs | 8 ++++++++ .../ServiceBusOrchestrationService.cs | 8 ++++++++ 6 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs index 8f6c46d1c..0ffc83dc3 100644 --- a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs +++ b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs @@ -473,6 +473,8 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } = BehaviorOnContinueAsNew.Ignore; + public int MaxDispatchCount => int.MaxValue; + // Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation. public async Task LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken) { diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9ad814c43..4e25e8596 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2169,6 +2169,14 @@ public bool UseSeparateQueuesForEntityWorkItems set => this.settings.UseSeparateQueueForEntityWorkItems = value; } + /// + /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and + /// moved to poison storage. + /// Currently this storage provider does not have poison message handling so this value is set to the + /// maximum integer value. + /// + public int MaxDispatchCount => int.MaxValue; + /// /// Disposes of the current object. /// diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index 7f3611793..2024270d5 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -1950,12 +1950,12 @@ void IEventSourceEvent.WriteEventSource() => /// internal class PoisonMessageDetected : StructuredLogEvent, IEventSourceEvent { - public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, string taskEventId, string details) + public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, int taskEventId, string details) { this.InstanceId = orchestrationInstance?.InstanceId ?? string.Empty; this.ExecutionId = orchestrationInstance?.ExecutionId ?? string.Empty; this.EventType = eventType; - this.EventId = taskEventId; + this.TaskEventId = taskEventId; this.Details = details; } @@ -1969,7 +1969,7 @@ public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string public string EventType { get; } [StructuredLogField] - public string EventId { get; } + public int TaskEventId { get; } [StructuredLogField] public string Details { get; } diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 407a251c4..7950e8c2f 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -761,7 +761,7 @@ internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance this.WriteStructuredLog(new LogEvents.PoisonMessageDetected( orchestrationInstance, historyEvent.EventType.ToString(), - Utils.GetTaskEventId(historyEvent).ToString(), + Utils.GetTaskEventId(historyEvent), details)); } } @@ -779,7 +779,7 @@ internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, this.WriteStructuredLog(new LogEvents.PoisonMessageDetected( orchestrationInstance, requestMessage.IsLockRequest ? "LockRequest" : "OperationRequest", - requestMessage.Id.ToString(), + taskEventId: -1, details)); } } diff --git a/src/DurableTask.Emulator/LocalOrchestrationService.cs b/src/DurableTask.Emulator/LocalOrchestrationService.cs index 50e43ecd5..e612ea42f 100644 --- a/src/DurableTask.Emulator/LocalOrchestrationService.cs +++ b/src/DurableTask.Emulator/LocalOrchestrationService.cs @@ -686,6 +686,14 @@ void Dispose(bool disposing) /// EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => null; + /// + /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and + /// moved to poison storage. + /// Currently this storage provider does not have poison message handling so this value is set to the + /// maximum integer value. + /// + public int MaxDispatchCount => int.MaxValue; + /// Task IEntityOrchestrationService.LockNextEntityWorkItemAsync( TimeSpan receiveTimeout, diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs index 3d49bb749..ac5671471 100644 --- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs @@ -920,6 +920,14 @@ public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkIte /// public int MaxConcurrentTaskActivityWorkItems => this.Settings.TaskActivityDispatcherSettings.MaxConcurrentActivities; + /// + /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and + /// moved to poison storage. + /// Currently this storage provider does not have poison message handling so this value is set to the + /// maximum integer value. + /// + public int MaxDispatchCount => int.MaxValue; + /// /// Wait for an lock the next task activity to be processed /// From ea5a36f1e6053eab93ad913fe8da43724a5f34e1 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 24 Mar 2026 13:16:10 -0700 Subject: [PATCH 3/8] addressing copilot comments --- src/DurableTask.Core/Logging/LogHelper.cs | 2 +- src/DurableTask.Core/TaskEntityDispatcher.cs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 7950e8c2f..d4f729d40 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -770,7 +770,7 @@ internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance /// Logs that a "poison" entity request message has been detected and is being dropped. /// /// The orchestration instance this event was sent to. - /// The "poisoned" reuest message. + /// The "poisoned" request message. /// Extra details related to the processing of this poison message. internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, RequestMessage requestMessage, string details) { diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 27d0f7954..08b608a15 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -437,7 +437,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( return schedulerState; } - void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request, FailureDetails failureDetails = null) + void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) { if (request.DispatchCount <= this.maxDispatchCount) { @@ -546,7 +546,7 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc runtimeState.OrchestrationInstance, eventRaisedEvent, $"Dropping entity message after deserialization error since its dispatch count {eventRaisedEvent.DispatchCount} " + - $"exceeds the maximum dispatch count of {this.maxDispatchCount}. Stopping processing of remaining requests."); + $"exceeds the maximum dispatch count of {this.maxDispatchCount}."); break; } throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception); From 4c7665d3cdb0aee7eec4d8c569ab9b1087dfa185 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 24 Mar 2026 15:18:29 -0700 Subject: [PATCH 4/8] fixing the error in the logger where I was incorrectly calling DiscardingMessage --- src/DurableTask.Core/Logging/LogEvents.cs | 2 +- .../Logging/StructuredEventSource.cs | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index 7997650f6..eaf6bf3a0 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -2015,7 +2015,7 @@ protected override string CreateLogMessage() => $"{this.InstanceId}: Poison message detected for {GetEventDescription(this.EventType, this.TaskEventId)}: {this.Details}"; void IEventSourceEvent.WriteEventSource() => - StructuredEventSource.Log.DiscardingMessage( + StructuredEventSource.Log.PoisonMessageDetected( this.InstanceId, this.ExecutionId, this.EventType, diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs index 129bac158..0e59e7c36 100644 --- a/src/DurableTask.Core/Logging/StructuredEventSource.cs +++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs @@ -656,6 +656,31 @@ internal void DiscardingMessage( } } + [Event(EventIds.PoisonMessageDetected, Level = EventLevel.Error, Version = 1)] + internal void PoisonMessageDetected( + string InstanceId, + string ExecutionId, + string EventType, + int TaskEventId, + string Details, + string AppName, + string ExtensionVersion) + { + if (this.IsEnabled(EventLevel.Error)) + { + // TODO: Use WriteEventCore for better performance + this.WriteEvent( + EventIds.PoisonMessageDetected, + InstanceId ?? string.Empty, + ExecutionId ?? string.Empty, + EventType, + TaskEventId, + Details, + AppName, + ExtensionVersion); + } + } + [Event(EventIds.EntityBatchExecuting, Level = EventLevel.Informational, Version = 1)] internal void EntityBatchExecuting( string InstanceId, From 3bd1dc9a452e6869da0e2544d0ab9fe969b24974 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 25 Mar 2026 12:37:27 -0700 Subject: [PATCH 5/8] moved the max dispatch count from IOrchestrationService to dispatch parameters --- .../FabricOrchestrationService.cs | 2 - .../AzureStorageOrchestrationService.cs | 8 ---- src/DurableTask.Core/IOrchestrationService.cs | 6 --- .../TaskActivityDispatcher.cs | 11 +++-- src/DurableTask.Core/TaskEntityDispatcher.cs | 13 +++--- src/DurableTask.Core/TaskHubWorker.cs | 40 +++++++++++++++++-- .../TaskOrchestrationDispatcher.cs | 9 +++-- .../LocalOrchestrationService.cs | 8 ---- .../ServiceBusOrchestrationService.cs | 8 ---- 9 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs index 0ffc83dc3..8f6c46d1c 100644 --- a/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs +++ b/src/DurableTask.AzureServiceFabric/FabricOrchestrationService.cs @@ -473,8 +473,6 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } = BehaviorOnContinueAsNew.Ignore; - public int MaxDispatchCount => int.MaxValue; - // Note: Do not rely on cancellationToken parameter to this method because the top layer does not yet implement any cancellation. public async Task LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken) { diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 4e25e8596..9ad814c43 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -2169,14 +2169,6 @@ public bool UseSeparateQueuesForEntityWorkItems set => this.settings.UseSeparateQueueForEntityWorkItems = value; } - /// - /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and - /// moved to poison storage. - /// Currently this storage provider does not have poison message handling so this value is set to the - /// maximum integer value. - /// - public int MaxDispatchCount => int.MaxValue; - /// /// Disposes of the current object. /// diff --git a/src/DurableTask.Core/IOrchestrationService.cs b/src/DurableTask.Core/IOrchestrationService.cs index 76b6b664a..dc4b61c70 100644 --- a/src/DurableTask.Core/IOrchestrationService.cs +++ b/src/DurableTask.Core/IOrchestrationService.cs @@ -102,12 +102,6 @@ public interface IOrchestrationService /// BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } - /// - /// Gets the maximum amount of times the same event can be dispatched before it is considered "poisonous" - /// and the corresponding operation is failed. - /// - int MaxDispatchCount { get; } - /// /// Wait for the next orchestration work item and return the orchestration work item /// diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index b4ac5d898..68a34d4cc 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -37,7 +37,7 @@ public sealed class TaskActivityDispatcher readonly LogHelper logHelper; readonly ErrorPropagationMode errorPropagationMode; readonly IExceptionPropertiesProvider? exceptionPropertiesProvider; - readonly int maxDispatchCount; + readonly int? maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -48,13 +48,16 @@ public sealed class TaskActivityDispatcher /// The log helper /// The error propagation mode /// The exception properties provider for extracting custom properties from exceptions + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no maximum enforced. internal TaskActivityDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager objectManager, DispatchMiddlewarePipeline dispatchPipeline, LogHelper logHelper, ErrorPropagationMode errorPropagationMode, - IExceptionPropertiesProvider? exceptionPropertiesProvider) + IExceptionPropertiesProvider? exceptionPropertiesProvider, + int? maxDispatchCount = null) { this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager)); @@ -62,7 +65,7 @@ internal TaskActivityDispatcher( this.logHelper = logHelper; this.errorPropagationMode = errorPropagationMode; this.exceptionPropertiesProvider = exceptionPropertiesProvider; - this.maxDispatchCount = orchestrationService.MaxDispatchCount; + this.maxDispatchCount = maxDispatchCount; this.dispatcher = new WorkItemDispatcher( "TaskActivityDispatcher", @@ -173,7 +176,7 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) { string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name."; this.logHelper.TaskActivityDispatcherError(workItem, message); - if (taskMessage.Event.DispatchCount <= this.maxDispatchCount) + if (this.maxDispatchCount == null || taskMessage.Event.DispatchCount <= this.maxDispatchCount) { throw TraceHelper.TraceException( TraceEventType.Error, diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 08b608a15..d944115da 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -44,7 +44,7 @@ public class TaskEntityDispatcher readonly ErrorPropagationMode errorPropagationMode; readonly TaskOrchestrationDispatcher.NonBlockingCountdownLock concurrentSessionLock; readonly IExceptionPropertiesProvider exceptionPropertiesProvider; - readonly int maxDispatchCount; + readonly int? maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -55,13 +55,16 @@ public class TaskEntityDispatcher /// The log helper /// The error propagation mode /// The exception properties provider for extracting custom properties from exceptions + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no maximum enforced. internal TaskEntityDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager entityObjectManager, DispatchMiddlewarePipeline entityDispatchPipeline, LogHelper logHelper, ErrorPropagationMode errorPropagationMode, - IExceptionPropertiesProvider exceptionPropertiesProvider) + IExceptionPropertiesProvider exceptionPropertiesProvider, + int? maxDispatchCount = null) { this.objectManager = entityObjectManager ?? throw new ArgumentNullException(nameof(entityObjectManager)); this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); @@ -71,7 +74,7 @@ internal TaskEntityDispatcher( this.exceptionPropertiesProvider = exceptionPropertiesProvider; this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!; this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties; - this.maxDispatchCount = orchestrationService.MaxDispatchCount; + this.maxDispatchCount = maxDispatchCount; this.dispatcher = new WorkItemDispatcher( "TaskEntityDispatcher", @@ -439,7 +442,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) { - if (request.DispatchCount <= this.maxDispatchCount) + if (this.maxDispatchCount == null || request.DispatchCount <= this.maxDispatchCount) { this.logHelper.EntityLockAcquired(effects.InstanceId, request); @@ -449,7 +452,7 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, request.Position++; } - if (request.Position < request.LockSet.Length && request.DispatchCount <= this.maxDispatchCount) + if (request.Position < request.LockSet.Length && (this.maxDispatchCount == null || request.DispatchCount <= this.maxDispatchCount)) { // send lock request to next entity in the lock set var target = new OrchestrationInstance() { InstanceId = request.LockSet[request.Position].ToString() }; diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index 65dfbb47e..2aa83e63b 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -38,6 +38,7 @@ public sealed class TaskHubWorker : IDisposable readonly INameVersionObjectManager orchestrationManager; readonly INameVersionObjectManager entityManager; readonly VersioningSettings versioningSettings; + readonly int? maxDispatchCount; readonly DispatchMiddlewarePipeline orchestrationDispatchPipeline = new DispatchMiddlewarePipeline(); readonly DispatchMiddlewarePipeline entityDispatchPipeline = new DispatchMiddlewarePipeline(); @@ -220,6 +221,36 @@ public TaskHubWorker( this.versioningSettings = versioningSettings; } + /// + /// Create a new TaskHubWorker with given OrchestrationService and name version managers + /// + /// Reference the orchestration service implementation + /// NameVersionObjectManager for Orchestrations + /// NameVersionObjectManager for Activities + /// The NameVersionObjectManager for entities. The version is the entity key. + /// The that define how orchestration versions are handled + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. + /// The to use for logging + public TaskHubWorker( + IOrchestrationService orchestrationService, + INameVersionObjectManager orchestrationObjectManager, + INameVersionObjectManager activityObjectManager, + INameVersionObjectManager entityObjectManager, + VersioningSettings versioningSettings, + int maxDispatchCount, + ILoggerFactory loggerFactory = null) + { + this.orchestrationManager = orchestrationObjectManager ?? throw new ArgumentException("orchestrationObjectManager"); + this.activityManager = activityObjectManager ?? throw new ArgumentException("activityObjectManager"); + this.entityManager = entityObjectManager ?? throw new ArgumentException("entityObjectManager"); + this.orchestrationService = orchestrationService ?? throw new ArgumentException("orchestrationService"); + this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); + this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false; + this.versioningSettings = versioningSettings; + this.maxDispatchCount = maxDispatchCount; + } + /// /// Gets the orchestration dispatcher /// @@ -303,14 +334,16 @@ public async Task StartAsync() this.logHelper, this.ErrorPropagationMode, this.versioningSettings, - this.ExceptionPropertiesProvider); + this.ExceptionPropertiesProvider, + this.maxDispatchCount); this.activityDispatcher = new TaskActivityDispatcher( this.orchestrationService, this.activityManager, this.activityDispatchPipeline, this.logHelper, this.ErrorPropagationMode, - this.ExceptionPropertiesProvider); + this.ExceptionPropertiesProvider, + this.maxDispatchCount); if (this.dispatchEntitiesSeparately) { @@ -320,7 +353,8 @@ public async Task StartAsync() this.entityDispatchPipeline, this.logHelper, this.ErrorPropagationMode, - this.ExceptionPropertiesProvider); + this.ExceptionPropertiesProvider, + this.maxDispatchCount); } await this.orchestrationService.StartAsync(); diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 871c77473..608048b43 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -50,7 +50,7 @@ public class TaskOrchestrationDispatcher readonly TaskOrchestrationEntityParameters? entityParameters; readonly VersioningSettings? versioningSettings; readonly IExceptionPropertiesProvider? exceptionPropertiesProvider; - readonly int maxDispatchCount; + readonly int? maxDispatchCount; /// /// Initializes a new instance of the class with an exception properties provider. @@ -62,6 +62,8 @@ public class TaskOrchestrationDispatcher /// The error propagation mode /// The versioning settings /// The exception properties provider for extracting custom properties from exceptions + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no maximum enforced. internal TaskOrchestrationDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager objectManager, @@ -69,7 +71,8 @@ internal TaskOrchestrationDispatcher( LogHelper logHelper, ErrorPropagationMode errorPropagationMode, VersioningSettings versioningSettings, - IExceptionPropertiesProvider? exceptionPropertiesProvider) + IExceptionPropertiesProvider? exceptionPropertiesProvider, + int? maxDispatchCount = null) { this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager)); this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService)); @@ -81,7 +84,7 @@ internal TaskOrchestrationDispatcher( this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties); this.versioningSettings = versioningSettings; this.exceptionPropertiesProvider = exceptionPropertiesProvider; - this.maxDispatchCount = orchestrationService.MaxDispatchCount; + this.maxDispatchCount = maxDispatchCount; this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", diff --git a/src/DurableTask.Emulator/LocalOrchestrationService.cs b/src/DurableTask.Emulator/LocalOrchestrationService.cs index e612ea42f..50e43ecd5 100644 --- a/src/DurableTask.Emulator/LocalOrchestrationService.cs +++ b/src/DurableTask.Emulator/LocalOrchestrationService.cs @@ -686,14 +686,6 @@ void Dispose(bool disposing) /// EntityBackendQueries IEntityOrchestrationService.EntityBackendQueries => null; - /// - /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and - /// moved to poison storage. - /// Currently this storage provider does not have poison message handling so this value is set to the - /// maximum integer value. - /// - public int MaxDispatchCount => int.MaxValue; - /// Task IEntityOrchestrationService.LockNextEntityWorkItemAsync( TimeSpan receiveTimeout, diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs index ac5671471..3d49bb749 100644 --- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs @@ -920,14 +920,6 @@ public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkIte /// public int MaxConcurrentTaskActivityWorkItems => this.Settings.TaskActivityDispatcherSettings.MaxConcurrentActivities; - /// - /// The maximum amount of times a message can be dispatched before it is considered "poisoned" and - /// moved to poison storage. - /// Currently this storage provider does not have poison message handling so this value is set to the - /// maximum integer value. - /// - public int MaxDispatchCount => int.MaxValue; - /// /// Wait for an lock the next task activity to be processed /// From 2e3c7c22baf0fabbceb7c6026d542be27e7dc6ff Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 1 Apr 2026 12:03:43 -0700 Subject: [PATCH 6/8] updated the implementations to remove all exception-throwing in the case of poison message handling, except for entity unlock requests --- src/DurableTask.Core/History/HistoryEvent.cs | 8 ++ .../TaskActivityDispatcher.cs | 48 +++++------ src/DurableTask.Core/TaskEntityDispatcher.cs | 86 ++++++++++--------- .../TaskOrchestrationDispatcher.cs | 63 +++++++++++--- 4 files changed, 129 insertions(+), 76 deletions(-) diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs index a1a1317a8..951faffa1 100644 --- a/src/DurableTask.Core/History/HistoryEvent.cs +++ b/src/DurableTask.Core/History/HistoryEvent.cs @@ -95,6 +95,14 @@ protected HistoryEvent(int eventId) [DataMember] public int DispatchCount { get; set; } + /// + /// Gets or sets whether or not this event has been marked as "poisoned". + /// This can occur if the event's dispatch count exceeds a certain threshold, + /// or if some other error occurs during dispatch. + /// + [DataMember] + public bool IsPoisoned { get; set; } + /// /// Implementation for . /// diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 68a34d4cc..4e3a27c62 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -125,14 +125,13 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) this.logHelper.TaskActivityDispatcherError( workItem, $"The activity worker received a message that does not have any OrchestrationInstance information."); - if (taskMessage.Event.DispatchCount > this.maxDispatchCount) + if (this.maxDispatchCount != null) { this.logHelper.PoisonMessageDetected( orchestrationInstance, taskMessage.Event, - $"Activity has received an event with no parent orchestration instance and dispatch count " + - $"{taskMessage.Event.DispatchCount} which exceeds the maximum dispatch count of {this.maxDispatchCount}. " + - $"The event will be discarded."); + $"Activity has received an event with no parent orchestration instance ID."); + taskMessage.Event.IsPoisoned = true; // All orchestration services that implement poison message handling must have logic to handle a null response message in this case. await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage: null); return; @@ -145,17 +144,16 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) if (taskMessage.Event.EventType != EventType.TaskScheduled) { - this.logHelper.TaskActivityDispatcherError( - workItem, - $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported."); - if (taskMessage.Event.DispatchCount > this.maxDispatchCount) + string message = $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only " + + $"'{EventType.TaskScheduled}' is supported."; + this.logHelper.TaskActivityDispatcherError(workItem, message); + if (this.maxDispatchCount != null) { this.logHelper.PoisonMessageDetected( orchestrationInstance, taskMessage.Event, - $"Activity has received an event of invalid type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' " + - $"is supported. Since the dispatch count of the event {taskMessage.Event.DispatchCount} exceeds the maximum dispatch " + - $"count of {this.maxDispatchCount}, the event will be discarded."); + message); + taskMessage.Event.IsPoisoned = true; // All orchestration services that implement poison message handling must have logic to handle a null response message in this case. await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage: null); return; @@ -176,31 +174,33 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) { string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name."; this.logHelper.TaskActivityDispatcherError(workItem, message); - if (this.maxDispatchCount == null || taskMessage.Event.DispatchCount <= this.maxDispatchCount) + if (this.maxDispatchCount == null) { throw TraceHelper.TraceException( TraceEventType.Error, "TaskActivityDispatcher-MissingActivityName", new InvalidOperationException(message)); } + else + { + scheduledEvent.IsPoisoned = true; + } } HistoryEvent? eventToRespond = null; - if (taskMessage.Event.DispatchCount > this.maxDispatchCount) + if (scheduledEvent.DispatchCount > this.maxDispatchCount || scheduledEvent.IsPoisoned) { - string messageSuffix = string.Empty; - if (scheduledEvent.Name == null) - { - messageSuffix = " The event also does not specify an Activity name."; - } + string message = scheduledEvent.IsPoisoned + ? "Activity worker has recenved an event that does not specify an Activity name" + : $"Activity worker has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds " + + $"the maximum dispatch count of {this.maxDispatchCount}"; + + scheduledEvent.IsPoisoned = true; this.logHelper.PoisonMessageDetected( orchestrationInstance, taskMessage.Event, - $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " + - $"count of {this.maxDispatchCount}.{messageSuffix} The task will be failed."); - string reason = $"Activity {EventType.TaskScheduled} event has dispatch count {taskMessage.Event.DispatchCount} " + - $"which exceeds the maximum dispatch count of {this.maxDispatchCount}.{messageSuffix}"; + $"{message}. The task will be failed."); eventToRespond = new TaskFailedEvent( -1, @@ -210,12 +210,12 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) new ( "PoisonMessage", - reason, + message, stackTrace: null, innerFailure: null, isNonRetriable: true) ); - traceActivity?.SetStatus(ActivityStatusCode.Error, reason); + traceActivity?.SetStatus(ActivityStatusCode.Error, message); } else { diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index d944115da..5aa58880a 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -256,24 +256,29 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI try { + bool firstExecutionIfExtendedSession = schedulerState == null; + // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch. - if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper)) + if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState( + workItem, + nameof(TaskEntityDispatcher), + this.errorPropagationMode, + this.logHelper, + isPoisonMessageHandlingEnabled: this.maxDispatchCount != null, + out string reason) || + // we start with processing all the requests and figuring out which ones to execute now + // results can depend on whether the entity is locked, what the maximum batch size is, + // and whether the messages arrived out of order + !this.DetermineWork(workItem.OrchestrationRuntimeState, + ref schedulerState, + out Work workToDoNow, + out reason)) { // TODO : mark an orchestration as faulted if there is data corruption - this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration"); + this.logHelper.DroppingOrchestrationWorkItem(workItem, reason); } else { - bool firstExecutionIfExtendedSession = schedulerState == null; - - // we start with processing all the requests and figuring out which ones to execute now - // results can depend on whether the entity is locked, what the maximum batch size is, - // and whether the messages arrived out of order - - this.DetermineWork(workItem.OrchestrationRuntimeState, - ref schedulerState, - out Work workToDoNow); - if (workToDoNow.OperationCount > 0) { // execute the user-defined operations on this entity, via the middleware @@ -498,7 +503,7 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) #region Preprocess to determine work - void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch) + bool DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch, out string reason) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; bool deserializeState = schedulerState == null; @@ -524,6 +529,20 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } catch (Exception exception) { + // Poison message handling is enabled + if (this.maxDispatchCount != null) + { + foreach (var historyEvent in runtimeState.Events) + { + historyEvent.IsPoisoned = true; + } + reason = $"Failed to deserialize the entity scheduler state from the {EventType.ExecutionStarted} input."; + this.logHelper.PoisonMessageDetected( + runtimeState.OrchestrationInstance, + e, + $"Dropping entity work item: {reason}"); + return false; + } throw new EntitySchedulerException("Failed to deserialize entity scheduler state - may be corrupted or wrong version.", exception); } } @@ -534,6 +553,8 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name)) { + eventRaisedEvent.IsPoisoned = eventRaisedEvent.DispatchCount > this.maxDispatchCount; + // we are receiving an operation request or a lock request var requestMessage = new RequestMessage(); @@ -543,13 +564,12 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } catch (Exception exception) { - if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) + if (this.maxDispatchCount != null) { this.logHelper.PoisonMessageDetected( runtimeState.OrchestrationInstance, eventRaisedEvent, - $"Dropping entity message after deserialization error since its dispatch count {eventRaisedEvent.DispatchCount} " + - $"exceeds the maximum dispatch count of {this.maxDispatchCount}."); + $"Dropping entity request after deserialization error."); break; } throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception); @@ -618,17 +638,6 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } catch (Exception exception) { - // We don't necessarily want to unlock the entity in this case because this release message may have been issued by an instance that does - // not currently hold the lock. - if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) - { - this.logHelper.PoisonMessageDetected( - runtimeState.OrchestrationInstance, - eventRaisedEvent, - $"Dropping entity lock release message after deserialization error since its dispatch count {eventRaisedEvent.DispatchCount} " + - $"exceeds the maximum dispatch count of {this.maxDispatchCount}. Stopping processing of remaining requests."); - break; - } throw new EntitySchedulerException("Failed to deserialize lock release message - may be corrupted or wrong version.", exception); } @@ -638,17 +647,6 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc this.logHelper.EntityLockReleased(instanceId, message); schedulerState.LockedBy = null; } - - if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) - { - this.logHelper.PoisonMessageDetected( - runtimeState.OrchestrationInstance, - eventRaisedEvent, - $"Entity lock release message from parent instance '{message.ParentInstanceId}' processed but stopping processing " + - $"of remaining messages since the dispatch count for this message {eventRaisedEvent.DispatchCount} " + - $"has exceeded the maximum allowed dispatch count {this.maxDispatchCount}."); - break; - } } else { @@ -659,10 +657,11 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc if (eventRaisedEvent.DispatchCount > this.maxDispatchCount) { this.logHelper.PoisonMessageDetected( - runtimeState.OrchestrationInstance, - eventRaisedEvent, - $"Entity self-continue message processed but stopping processing of remaining messages since the dispatch count " + - $"for this message {eventRaisedEvent.DispatchCount} has exceeded the maximum allowed dispatch count {this.maxDispatchCount}."); + runtimeState.OrchestrationInstance, + eventRaisedEvent, + $"Entity self-continue message processed but will be marked as poisoned since the dispatch count for this message " + + $"{eventRaisedEvent.DispatchCount} has exceeded the maximum allowed dispatch count {this.maxDispatchCount}."); + eventRaisedEvent.IsPoisoned = true; break; } } @@ -713,6 +712,9 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc } } } + + reason = null; + return true; } bool EntityIsDeleted(SchedulerState schedulerState) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 608048b43..2324e7419 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -378,10 +378,16 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work try { // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch. - if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) + if (!ReconcileMessagesWithState( + workItem, + nameof(TaskOrchestrationDispatcher), + this.errorPropagationMode, + logHelper, + isPoisonMessageHandlingEnabled: this.maxDispatchCount != null, + out string? reason)) { // TODO : mark an orchestration as faulted if there is data corruption - this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration"); + this.logHelper.DroppingOrchestrationWorkItem(workItem, reason!); TraceHelper.TraceSession( TraceEventType.Error, "TaskOrchestrationDispatcher-DeletedOrchestration", @@ -442,6 +448,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work { foreach (var poisonEvent in poisonEvents) { + poisonEvent.IsPoisoned = true; this.logHelper.PoisonMessageDetected( runtimeState.OrchestrationInstance!, poisonEvent, @@ -898,14 +905,38 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ => /// The name of the dispatcher, used for tracing. /// The error propagation mode. /// The log helper. + /// Whether or not poison message handling is enabled, in which case the messages + /// part of an invalid work item will be marked as "poisoned", and no exceptions will be thrown. + /// In the case that work item should be dropped (this method return false), provides the reason why. /// True if workItem should be processed further. False otherwise. - internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workItem, string dispatcher, ErrorPropagationMode errorPropagationMode, LogHelper logHelper) + internal static bool ReconcileMessagesWithState( + TaskOrchestrationWorkItem workItem, + string dispatcher, + ErrorPropagationMode errorPropagationMode, + LogHelper logHelper, + bool isPoisonMessageHandlingEnabled, + out string? reason) { + void MarkAllMessagesPoisoned() + { + foreach (var instanceMessage in workItem.NewMessages) + { + instanceMessage.Event.IsPoisoned = true; + } + } + foreach (TaskMessage message in workItem.NewMessages) { OrchestrationInstance orchestrationInstance = message.OrchestrationInstance; if (string.IsNullOrWhiteSpace(orchestrationInstance?.InstanceId)) { + if (isPoisonMessageHandlingEnabled) + { + MarkAllMessagesPoisoned(); + reason = $"Work item includes a message with no orchestration instance ID with event type {message.Event.EventType}"; + return false; + } + throw TraceHelper.TraceException( TraceEventType.Error, $"{dispatcher}-OrchestrationInstanceMissing", @@ -915,6 +946,15 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt if (!workItem.OrchestrationRuntimeState.IsValid) { // we get here if the orchestration history is somehow corrupted (partially deleted, etc.) + if (isPoisonMessageHandlingEnabled) + { + MarkAllMessagesPoisoned(); + } + string corruptionType = workItem.OrchestrationRuntimeState.Events.Count == 1 ? + $"its history contains exactly one event which is neither an {EventType.ExecutionStarted} or " + + $"{EventType.OrchestratorStarted} but rather has type {workItem.OrchestrationRuntimeState.Events[0].EventType}" : + $"its history contains multiple events but no {EventType.ExecutionStarted} event"; + reason = $"Orchestration runtime state is invalid: {corruptionType}"; return false; } @@ -923,6 +963,11 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt // we get here because of: // i) responses for scheduled tasks after the orchestrations have been completed // ii) responses for explicitly deleted orchestrations + if (isPoisonMessageHandlingEnabled) + { + MarkAllMessagesPoisoned(); + } + reason = $"Orchestration contains no {EventType.ExecutionStarted} event in its history and did not receive one as part of its new messages."; return false; } @@ -930,15 +975,12 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt && workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running && workItem.NewMessages.Count > 1) { - foreach (TaskMessage droppedMessage in workItem.NewMessages) + if (isPoisonMessageHandlingEnabled) { - if (droppedMessage.Event.EventType != EventType.ExecutionRewound) - { - logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " + - "that is attempting to rewind from a terminal state. The only message that can be sent in " + - "this case is the rewind request."); - } + MarkAllMessagesPoisoned(); } + reason = "Multiple messages sent to an instance that is attempting to rewind from a terminal state. " + + "The only message that can be sent in this case is the rewind request."; return false; } @@ -1030,6 +1072,7 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt } } + reason = null; return true; } From eecc0777b28b801cdaec9ff83ceb6684c19ca6e9 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 1 Apr 2026 12:49:33 -0700 Subject: [PATCH 7/8] comment updates --- src/DurableTask.Core/TaskActivityDispatcher.cs | 6 ++++-- src/DurableTask.Core/TaskEntityDispatcher.cs | 6 ++++-- src/DurableTask.Core/TaskHubWorker.cs | 6 ++++-- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 8 +++++--- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 4e3a27c62..fd6f08d8d 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -48,8 +48,10 @@ public sealed class TaskActivityDispatcher /// The log helper /// The error propagation mode /// The exception properties provider for extracting custom properties from exceptions - /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" - /// and the corresponding operation is failed. If not set, there is no maximum enforced. + /// + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no poison message handling enabled. + /// internal TaskActivityDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager objectManager, diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 5aa58880a..43a4a4115 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -55,8 +55,10 @@ public class TaskEntityDispatcher /// The log helper /// The error propagation mode /// The exception properties provider for extracting custom properties from exceptions - /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" - /// and the corresponding operation is failed. If not set, there is no maximum enforced. + /// + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no poison message handling enabled. + /// internal TaskEntityDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager entityObjectManager, diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index 2aa83e63b..86d66afaf 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -229,8 +229,10 @@ public TaskHubWorker( /// NameVersionObjectManager for Activities /// The NameVersionObjectManager for entities. The version is the entity key. /// The that define how orchestration versions are handled - /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" - /// and the corresponding operation is failed. + /// + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" and the corresponding operation + /// is failed. Providing this value effectively enables poison message handling in the dispatchers. + /// /// The to use for logging public TaskHubWorker( IOrchestrationService orchestrationService, diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 2324e7419..9c6169d43 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -62,8 +62,10 @@ public class TaskOrchestrationDispatcher /// The error propagation mode /// The versioning settings /// The exception properties provider for extracting custom properties from exceptions - /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" - /// and the corresponding operation is failed. If not set, there is no maximum enforced. + /// + /// The maximum amount of times the same event can be dispatched before it is considered "poisoned" + /// and the corresponding operation is failed. If not set, there is no poison message handling enabled. + /// internal TaskOrchestrationDispatcher( IOrchestrationService orchestrationService, INameVersionObjectManager objectManager, @@ -907,7 +909,7 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ => /// The log helper. /// Whether or not poison message handling is enabled, in which case the messages /// part of an invalid work item will be marked as "poisoned", and no exceptions will be thrown. - /// In the case that work item should be dropped (this method return false), provides the reason why. + /// In the case that the work item should be dropped (this method return false), provides the reason why. /// True if workItem should be processed further. False otherwise. internal static bool ReconcileMessagesWithState( TaskOrchestrationWorkItem workItem, From f0fc35d11c2a3cef4f9453dbd2cf8cc4db82017e Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 1 Apr 2026 13:05:35 -0700 Subject: [PATCH 8/8] fixed a typo, added an argument range check for the max dispatch count --- src/DurableTask.Core/TaskActivityDispatcher.cs | 7 ++++++- src/DurableTask.Core/TaskEntityDispatcher.cs | 5 +++++ src/DurableTask.Core/TaskHubWorker.cs | 5 +++++ src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 6 ++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index fd6f08d8d..1920c7c0d 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -67,6 +67,11 @@ internal TaskActivityDispatcher( this.logHelper = logHelper; this.errorPropagationMode = errorPropagationMode; this.exceptionPropertiesProvider = exceptionPropertiesProvider; + + if (maxDispatchCount <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxDispatchCount), "The maximum dispatch count must be greater than 0"); + } this.maxDispatchCount = maxDispatchCount; this.dispatcher = new WorkItemDispatcher( @@ -193,7 +198,7 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) if (scheduledEvent.DispatchCount > this.maxDispatchCount || scheduledEvent.IsPoisoned) { string message = scheduledEvent.IsPoisoned - ? "Activity worker has recenved an event that does not specify an Activity name" + ? "Activity worker has received an event that does not specify an Activity name" : $"Activity worker has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds " + $"the maximum dispatch count of {this.maxDispatchCount}"; diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 43a4a4115..235007b23 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -76,6 +76,11 @@ internal TaskEntityDispatcher( this.exceptionPropertiesProvider = exceptionPropertiesProvider; this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!; this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties; + + if (maxDispatchCount <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxDispatchCount), "The maximum dispatch count must be greater than 0"); + } this.maxDispatchCount = maxDispatchCount; this.dispatcher = new WorkItemDispatcher( diff --git a/src/DurableTask.Core/TaskHubWorker.cs b/src/DurableTask.Core/TaskHubWorker.cs index 86d66afaf..5112efd43 100644 --- a/src/DurableTask.Core/TaskHubWorker.cs +++ b/src/DurableTask.Core/TaskHubWorker.cs @@ -250,6 +250,11 @@ public TaskHubWorker( this.logHelper = new LogHelper(loggerFactory?.CreateLogger("DurableTask.Core")); this.dispatchEntitiesSeparately = (orchestrationService as IEntityOrchestrationService)?.EntityBackendProperties?.UseSeparateQueueForEntityWorkItems ?? false; this.versioningSettings = versioningSettings; + + if (maxDispatchCount <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxDispatchCount), "The maximum dispatch count must be greater than 0"); + } this.maxDispatchCount = maxDispatchCount; } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 9c6169d43..a0c4143db 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -86,8 +86,14 @@ internal TaskOrchestrationDispatcher( this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties); this.versioningSettings = versioningSettings; this.exceptionPropertiesProvider = exceptionPropertiesProvider; + + if (maxDispatchCount <= 0) + { + throw new ArgumentOutOfRangeException(nameof(maxDispatchCount), "The maximum dispatch count must be greater than 0"); + } this.maxDispatchCount = maxDispatchCount; + this.dispatcher = new WorkItemDispatcher( "TaskOrchestrationDispatcher", item => item == null ? string.Empty : item.InstanceId,