Skip to content

Commit 32efcc8

Browse files
committed
Add ContinueAsNewOptions with NewVersion support for orchestration version migration
1 parent e427bbc commit 32efcc8

File tree

6 files changed

+194
-8
lines changed

6 files changed

+194
-8
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Microsoft.DurableTask;
5+
6+
/// <summary>
7+
/// Options for <see cref="TaskOrchestrationContext.ContinueAsNew(ContinueAsNewOptions, object?, bool)"/>.
8+
/// </summary>
9+
public class ContinueAsNewOptions
10+
{
11+
/// <summary>
12+
/// Gets or sets the new version for the restarted orchestration instance.
13+
/// </summary>
14+
/// <remarks>
15+
/// When set, the framework uses this version to route the restarted instance to the
16+
/// appropriate orchestrator implementation. This is the safest migration point for
17+
/// eternal orchestrations since the history is fully reset, eliminating any replay
18+
/// conflict risk.
19+
/// </remarks>
20+
public string? NewVersion { get; set; }
21+
}

src/Abstractions/TaskOrchestrationContext.cs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,16 +395,16 @@ public virtual Task CallSubOrchestratorAsync(
395395
/// replays when rebuilding state.
396396
/// </para><para>
397397
/// The results of any incomplete tasks will be discarded when an orchestrator calls
398-
/// <see cref="ContinueAsNew"/>. For example, if a timer is scheduled and then <see cref="ContinueAsNew"/>
398+
/// <see cref="ContinueAsNew(object?, bool)"/>. For example, if a timer is scheduled and then <see cref="ContinueAsNew(object?, bool)"/>
399399
/// is called before the timer fires, the timer event will be discarded. The only exception to this
400400
/// is external events. By default, if an external event is received by an orchestration but not yet
401401
/// processed, the event is saved in the orchestration state unit it is received by a call to
402402
/// <see cref="WaitForExternalEvent{T}(string, CancellationToken)"/>. These events will continue to remain in memory
403-
/// even after an orchestrator restarts using <see cref="ContinueAsNew"/>. You can disable this behavior and
403+
/// even after an orchestrator restarts using <see cref="ContinueAsNew(object?, bool)"/>. You can disable this behavior and
404404
/// remove any saved external events by specifying <c>false</c> for the <paramref name="preserveUnprocessedEvents"/>
405405
/// parameter value.
406406
/// </para><para>
407-
/// Orchestrator implementations should complete immediately after calling the <see cref="ContinueAsNew"/> method.
407+
/// Orchestrator implementations should complete immediately after calling the <see cref="ContinueAsNew(object?, bool)"/> method.
408408
/// </para>
409409
/// </remarks>
410410
/// <param name="newInput">The JSON-serializable input data to re-initialize the instance with.</param>
@@ -415,6 +415,31 @@ public virtual Task CallSubOrchestratorAsync(
415415
/// </param>
416416
public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true);
417417

418+
/// <summary>
419+
/// Restarts the orchestration with a new version, clearing the history.
420+
/// </summary>
421+
/// <remarks>
422+
/// <para>
423+
/// This overload allows specifying a new version for the restarted orchestration, enabling
424+
/// version-based dispatch. The new version is used by the framework to route the restarted
425+
/// instance to the appropriate orchestrator implementation. This is the safest migration point
426+
/// for eternal orchestrations since the history is fully reset.
427+
/// </para><para>
428+
/// Orchestrator implementations should complete immediately after calling this method.
429+
/// </para>
430+
/// </remarks>
431+
/// <param name="options">Options for the continue-as-new operation, including the new version.</param>
432+
/// <param name="newInput">The JSON-serializable input data to re-initialize the instance with.</param>
433+
/// <param name="preserveUnprocessedEvents">
434+
/// If set to <c>true</c>, re-adds any unprocessed external events into the new execution
435+
/// history when the orchestration instance restarts. If <c>false</c>, any unprocessed
436+
/// external events will be discarded when the orchestration instance restarts.
437+
/// </param>
438+
public virtual void ContinueAsNew(ContinueAsNewOptions options, object? newInput = null, bool preserveUnprocessedEvents = true)
439+
{
440+
this.ContinueAsNew(newInput, preserveUnprocessedEvents);
441+
}
442+
418443
/// <summary>
419444
/// Creates a new GUID that is safe for replay within an orchestration or operation.
420445
/// </summary>

src/Abstractions/TaskOrchestrator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public interface ITaskOrchestrator
7878
/// </item>
7979
/// <item>
8080
/// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are
81-
/// bounded or use <see cref="TaskOrchestrationContext.ContinueAsNew"/> to restart an orchestrator with a new
81+
/// bounded or use <see cref="TaskOrchestrationContext.ContinueAsNew(object?, bool)"/> to restart an orchestrator with a new
8282
/// input.
8383
/// </item>
8484
/// <item>

src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
226226
version,
227227
instanceId,
228228
policy.ToDurableTaskCoreRetryOptions(),
229-
input,
229+
input,
230230
options.Tags);
231231
}
232232
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
@@ -236,7 +236,7 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
236236
orchestratorName.Name,
237237
version,
238238
instanceId,
239-
input,
239+
input,
240240
options?.Tags),
241241
orchestratorName.Name,
242242
handler,
@@ -248,7 +248,7 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
248248
orchestratorName.Name,
249249
version,
250250
instanceId,
251-
input,
251+
input,
252252
options?.Tags);
253253
}
254254
}
@@ -337,7 +337,20 @@ public override void SetCustomStatus(object? customStatus)
337337
/// <inheritdoc/>
338338
public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
339339
{
340-
this.innerContext.ContinueAsNew(newInput);
340+
this.ContinueAsNew(options: null, newInput, preserveUnprocessedEvents);
341+
}
342+
343+
/// <inheritdoc/>
344+
public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput = null, bool preserveUnprocessedEvents = true)
345+
{
346+
if (options?.NewVersion is not null)
347+
{
348+
this.innerContext.ContinueAsNew(options.NewVersion, newInput);
349+
}
350+
else
351+
{
352+
this.innerContext.ContinueAsNew(newInput);
353+
}
341354

342355
if (preserveUnprocessedEvents)
343356
{

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,36 @@ public async Task ContinueAsNew()
580580
Assert.Equal(10, metadata.ReadOutputAs<int>());
581581
}
582582

583+
[Fact]
584+
public async Task ContinueAsNewWithNewVersion()
585+
{
586+
TaskName orchestratorName = nameof(ContinueAsNewWithNewVersion);
587+
588+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
589+
{
590+
b.AddTasks(tasks => tasks.AddOrchestratorFunc<int, string>(orchestratorName, async (ctx, input) =>
591+
{
592+
if (input == 0)
593+
{
594+
// First generation: migrate to "v2"
595+
await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None);
596+
ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1);
597+
return string.Empty;
598+
}
599+
600+
// Second generation: return the version to verify it changed
601+
return ctx.Version;
602+
}));
603+
});
604+
605+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 0);
606+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
607+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
608+
Assert.NotNull(metadata);
609+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
610+
Assert.Equal("v2", metadata.ReadOutputAs<string>());
611+
}
612+
583613
[Fact]
584614
public async Task SubOrchestration()
585615
{

test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,103 @@ static void VerifyWrapper<T>(
4646
wrapper.GetInput<T>().Should().Be(input);
4747
}
4848

49+
[Fact]
50+
public void ContinueAsNew_WithoutVersion_CallsInnerContextWithoutVersion()
51+
{
52+
// Arrange
53+
TrackingOrchestrationContext innerContext = new();
54+
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
55+
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");
56+
57+
// Act
58+
wrapper.ContinueAsNew("new-input", preserveUnprocessedEvents: false);
59+
60+
// Assert
61+
innerContext.LastContinueAsNewInput.Should().Be("new-input");
62+
innerContext.LastContinueAsNewVersion.Should().BeNull();
63+
}
64+
65+
[Fact]
66+
public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion()
67+
{
68+
// Arrange
69+
TrackingOrchestrationContext innerContext = new();
70+
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
71+
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");
72+
73+
// Act
74+
wrapper.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, "new-input", preserveUnprocessedEvents: false);
75+
76+
// Assert
77+
innerContext.LastContinueAsNewInput.Should().Be("new-input");
78+
innerContext.LastContinueAsNewVersion.Should().Be("v2");
79+
}
80+
81+
[Fact]
82+
public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion()
83+
{
84+
// Arrange
85+
TrackingOrchestrationContext innerContext = new();
86+
OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null);
87+
TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input");
88+
89+
// Act
90+
wrapper.ContinueAsNew(options: null, newInput: "new-input", preserveUnprocessedEvents: false);
91+
92+
// Assert
93+
innerContext.LastContinueAsNewInput.Should().Be("new-input");
94+
innerContext.LastContinueAsNewVersion.Should().BeNull();
95+
}
96+
97+
class TrackingOrchestrationContext : OrchestrationContext
98+
{
99+
public TrackingOrchestrationContext()
100+
{
101+
this.OrchestrationInstance = new()
102+
{
103+
InstanceId = Guid.NewGuid().ToString(),
104+
ExecutionId = Guid.NewGuid().ToString(),
105+
};
106+
}
107+
108+
public object? LastContinueAsNewInput { get; private set; }
109+
110+
public string? LastContinueAsNewVersion { get; private set; }
111+
112+
public override void ContinueAsNew(object input)
113+
{
114+
this.LastContinueAsNewInput = input;
115+
this.LastContinueAsNewVersion = null;
116+
}
117+
118+
public override void ContinueAsNew(string newVersion, object input)
119+
{
120+
this.LastContinueAsNewInput = input;
121+
this.LastContinueAsNewVersion = newVersion;
122+
}
123+
124+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input)
125+
=> throw new NotImplementedException();
126+
127+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input)
128+
=> throw new NotImplementedException();
129+
130+
public override Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input, IDictionary<string, string> tags)
131+
=> throw new NotImplementedException();
132+
133+
public override Task<T> CreateTimer<T>(DateTime fireAt, T state)
134+
=> throw new NotImplementedException();
135+
136+
public override Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
137+
=> throw new NotImplementedException();
138+
139+
public override Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters)
140+
=> throw new NotImplementedException();
141+
142+
public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData)
143+
=> throw new NotImplementedException();
144+
}
145+
49146
class TestOrchestrationContext : OrchestrationContext
50147
{
51148
public TestOrchestrationContext()

0 commit comments

Comments
 (0)