Skip to content

Commit e427bbc

Browse files
authored
Fix concurrent timer race condition in InMemoryOrchestrationService (#678)
* initial commit * add test * address copilot comment * address copilot comment
1 parent 1b31c9a commit e427bbc

File tree

3 files changed

+108
-7
lines changed

3 files changed

+108
-7
lines changed

src/InProcessTestHost/InProcessTestHost.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<RootNamespace>Microsoft.DurableTask.Testing</RootNamespace>
66
<AssemblyName>Microsoft.DurableTask.InProcessTestHost</AssemblyName>
77
<PackageId>Microsoft.DurableTask.InProcessTestHost</PackageId>
8-
<Version>0.2.0-preview.1</Version>
8+
<Version>0.2.1-preview.1</Version>
99

1010
<!-- Suppress CA1848: Use LoggerMessage delegates for high-performance logging scenarios -->
1111
<NoWarn>$(NoWarn);CA1848</NoWarn>

src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ public PurgeResult PurgeInstanceState(PurgeInstanceFilter purgeInstanceFilter)
879879
class ReadyToRunQueue
880880
{
881881
readonly Channel<SerializedInstanceState> readyToRunQueue = Channel.CreateUnbounded<SerializedInstanceState>();
882-
readonly Dictionary<string, object> readyInstances = new(StringComparer.OrdinalIgnoreCase);
882+
readonly ConcurrentDictionary<string, SerializedInstanceState> readyInstances = new(StringComparer.OrdinalIgnoreCase);
883883

884884
public void Reset()
885885
{
@@ -893,7 +893,7 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken
893893
SerializedInstanceState state = await this.readyToRunQueue.Reader.ReadAsync(ct);
894894
lock (state)
895895
{
896-
if (this.readyInstances.Remove(state.InstanceId))
896+
if (this.readyInstances.TryRemove(state.InstanceId, out _))
897897
{
898898
if (state.IsLoaded)
899899
{
@@ -909,12 +909,9 @@ public async ValueTask<SerializedInstanceState> TakeNextAsync(CancellationToken
909909

910910
public void Schedule(SerializedInstanceState state)
911911
{
912-
// TODO: There is a race condition here. If another thread is calling TakeNextAsync
913-
// and removed the queue item before updating the dictionary, then we'll fail
914-
// to update the readyToRunQueue and the orchestration will get stuck.
915912
if (this.readyInstances.TryAdd(state.InstanceId, state))
916913
{
917-
if (!this.readyToRunQueue.Writer.TryWrite(state))
914+
if (!this.readyToRunQueue.Writer.TryWrite(state))
918915
{
919916
throw new InvalidOperationException($"unable to write to queue for {state.InstanceId}");
920917
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Microsoft.DurableTask;
5+
using Microsoft.DurableTask.Client;
6+
using Microsoft.DurableTask.Testing;
7+
using Microsoft.DurableTask.Worker;
8+
using Xunit;
9+
10+
namespace InProcessTestHost.Tests;
11+
12+
/// <summary>
13+
/// Tests to verify that multiple orchestrations with identical timer FireAt timestamps
14+
/// all complete correctly without any being dropped.
15+
/// </summary>
16+
public class ConcurrentTimerTests
17+
{
18+
[Fact]
19+
// Test that multiple orchestrations with the same timer that fire at the same time
20+
// can all complete correctly.
21+
public async Task MultipleOrchestrations_WithSameTimerFireAt_AllComplete()
22+
{
23+
const int orchestrationCount = 10;
24+
const string orchestratorName = "TimerOrchestrator";
25+
26+
await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
27+
{
28+
tasks.AddOrchestratorFunc<DateTime, string>(orchestratorName, async (ctx, fireAt) =>
29+
{
30+
await ctx.CreateTimer(fireAt, CancellationToken.None);
31+
return $"done:{ctx.InstanceId}";
32+
});
33+
});
34+
35+
DateTime sharedFireAt = DateTime.UtcNow.AddSeconds(5);
36+
37+
string[] instanceIds = new string[orchestrationCount];
38+
for (int i = 0; i < orchestrationCount; i++)
39+
{
40+
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync(
41+
orchestratorName, sharedFireAt);
42+
}
43+
44+
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));
45+
46+
Task<OrchestrationMetadata>[] waitTasks = instanceIds
47+
.Select(id => host.Client.WaitForInstanceCompletionAsync(
48+
id, getInputsAndOutputs: true, cts.Token))
49+
.ToArray();
50+
51+
OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);
52+
53+
for (int i = 0; i < orchestrationCount; i++)
54+
{
55+
Assert.NotNull(results[i]);
56+
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
57+
string output = results[i].ReadOutputAs<string>()!;
58+
Assert.Equal($"done:{instanceIds[i]}", output);
59+
}
60+
}
61+
62+
[Fact]
63+
// Test that fan-out sub-orchestrations with timers that all fire at the same time
64+
// can all complete correctly.
65+
public async Task SubOrchestrations_WithIdenticalTimers_AllComplete()
66+
{
67+
const int subOrchestrationCount = 10;
68+
const string parentName = "ParentOrchestrator";
69+
const string childName = "ChildTimerOrchestrator";
70+
71+
await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks =>
72+
{
73+
tasks.AddOrchestratorFunc<int>(parentName, async ctx =>
74+
{
75+
DateTime sharedFireAt = ctx.CurrentUtcDateTime.AddSeconds(2);
76+
77+
// A parent orchestration will schedule 10 sub-orchestrations which has a timer
78+
// fires at the same time.
79+
Task<string>[] childTasks = Enumerable.Range(0, subOrchestrationCount)
80+
.Select(i => ctx.CallSubOrchestratorAsync<string>(childName, sharedFireAt))
81+
.ToArray();
82+
83+
string[] results = await Task.WhenAll(childTasks);
84+
return results.Length;
85+
});
86+
87+
tasks.AddOrchestratorFunc<DateTime, string>(childName, async (ctx, fireAt) =>
88+
{
89+
await ctx.CreateTimer(fireAt, CancellationToken.None);
90+
return $"child-done:{ctx.InstanceId}";
91+
});
92+
});
93+
94+
string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync(parentName);
95+
96+
using CancellationTokenSource cts = new(TimeSpan.FromSeconds(60));
97+
OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
98+
instanceId, getInputsAndOutputs: true, cts.Token);
99+
100+
Assert.NotNull(metadata);
101+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
102+
Assert.Equal(subOrchestrationCount, metadata.ReadOutputAs<int>());
103+
}
104+
}

0 commit comments

Comments
 (0)