From 287829370fea5c48ecc68f72f3c43ccf34b93169 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Fri, 6 Mar 2026 14:01:14 -0300 Subject: [PATCH] Introduce input, output methods to FuncDSL Signed-off-by: Matheus Cruz --- .../fluent/func/dsl/FuncDSL.java | 105 ++++++++++++ ...cDSLDataFlowTransformationHelpersTest.java | 161 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index ff2db37e0..8b2f9aa09 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -1322,4 +1322,109 @@ public static FuncCallHttpStep post( return http(name).POST().endpoint(endpoint, auth).body(body); } + + /** + * Extracts and deserializes the workflow input data into the specified type from a workflow + * context. + * + *

This utility method provides type-safe access to the workflow's initial input. + * + *

Use this method when you have access to the {@link WorkflowContextData} and need to retrieve + * the original input that was provided when the workflow instance was started. + * + *

Usage Example: + * + *

{@code
+   * inputFrom((object, WorkflowContextData workflowContext) -> {
+   *   OrderRequest order = input(workflowContext, OrderRequest.class);
+   *   return new Input(order);
+   * });
+   * }
+ * + * @param the type to deserialize the input into + * @param context the workflow context containing instance data and input + * @param inputClass the class object representing the target type for deserialization + * @return the deserialized workflow input object of type T + */ + public static T input(WorkflowContextData context, Class inputClass) { + return context + .instanceData() + .input() + .as(inputClass) + .orElseThrow( + () -> + new IllegalStateException( + "Workflow input is missing or cannot be deserialized into type " + + inputClass.getName() + + " when calling FuncDSL.input(WorkflowContextData, Class).")); + } + + /** + * Extracts and deserializes the task input data into the specified type from a task context. + * + *

This utility method provides type-safe access to a task's input. + * + *

Use this method when you have access to the {@link TaskContextData} and need to retrieve the + * input provided to that task. + * + *

Usage Example: + * + *

{@code
+   * inputFrom((Object obj, TaskContextData taskContextData) -> {
+   *   OrderRequest order = input(taskContextData, OrderRequest.class);
+   *   return order;
+   * });
+   * }
+ * + * @param the type to deserialize the input into + * @param taskContextData the task context from which to retrieve the task input + * @param inputClass the class object representing the target type for deserialization + * @return the deserialized task input object of type T + */ + public static T input(TaskContextData taskContextData, Class inputClass) { + return taskContextData + .input() + .as(inputClass) + .orElseThrow( + () -> + new IllegalStateException( + "Workflow input is missing or cannot be deserialized into type " + + inputClass.getName() + + " when calling FuncDSL.input(TaskContextData, Class).")); + } + + /** + * Extracts and deserializes the output data from a task into the specified type. + * + *

This utility method provides type-safe access to a task's output. + * + *

Use this method when you need to access the result/output produced by a task execution. This + * is particularly useful in subsequent tasks that need to process or transform the output of a + * previous task in the workflow. + * + *

Usage Example: + * + *

{@code
+   * .exportAs((object, workflowContext, taskContextData) -> {
+   *     Long output = output(taskContextData, Long.class);
+   *     return output * 2;
+   *  })
+   * }
+ * + * @param the type to deserialize the task output into + * @param taskContextData the task context containing the output data + * @param outputClass the class object representing the target type for deserialization + * @return the deserialized task output object of type T + */ + public static T output(TaskContextData taskContextData, Class outputClass) { + return taskContextData + .output() + .as(outputClass) + .orElseThrow( + () -> + new IllegalStateException( + "Task output is missing or cannot be deserialized into type " + + outputClass.getName() + + " when calling FuncDSL.output(TaskContextData, Class).")); + } } diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java new file mode 100644 index 000000000..b1a8236bc --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLDataFlowTransformationHelpersTest.java @@ -0,0 +1,161 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.input; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.output; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.Test; + +public class FuncDSLDataFlowTransformationHelpersTest { + + @Test + void test_input_with_inputFrom() { + + SoftAssertions softly = new SoftAssertions(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("reviewSubmissionWithModel") + .tasks( + function( + "add5", + (Long input) -> { + softly.assertThat(input).isEqualTo(10L); + return input + 5; + }, + Long.class), + function("returnEnriched", (Long enrichedValue) -> enrichedValue, Long.class) + .inputFrom( + (object, workflowContext) -> { + softly.assertThat(object).isEqualTo(15L); + Long input = input(workflowContext, Long.class); + softly.assertThat(input).isEqualTo(10L); + return object + input; + }, + Long.class)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowDefinition def = app.workflowDefinition(workflow); + WorkflowModel model = def.instance(10L).start().join(); + Number number = model.asNumber().orElseThrow(); + softly.assertThat(number.longValue()).isEqualTo(25L); + } + + softly.assertAll(); + } + + @Test + void test_input_with_outputAs() { + + SoftAssertions softly = new SoftAssertions(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("enrichOutputWithModelTest") + .tasks( + function( + "add5", + (Long input) -> { + softly.assertThat(input).isEqualTo(10L); + return input + 5; + }, + Long.class) + .outputAs( + (object, workflowContext, taskContextData) -> { + softly.assertThat(object).isEqualTo(15L); + Long input = input(workflowContext, Long.class); + softly.assertThat(input).isEqualTo(10L); + return input + object; + }, + Long.class)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowDefinition def = app.workflowDefinition(workflow); + + WorkflowModel model = def.instance(10L).start().join(); + Number number = model.asNumber().orElseThrow(); + + softly.assertThat(number.longValue()).isEqualTo(25L); + } + + softly.assertAll(); + } + + @Test + void test_output_with_exportAs() { + + SoftAssertions softly = new SoftAssertions(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("enrichOutputWithInputTest") + .tasks( + function( + "add5", + (Long input) -> { + softly.assertThat(input).isEqualTo(10L); + return input + 5; + }, + Long.class) + .exportAs( + (object, workflowContext, taskContextData) -> { + Long taskOutput = output(taskContextData, Long.class); + softly.assertThat(taskOutput).isEqualTo(15L); + return taskOutput * 2; + })) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowDefinition def = app.workflowDefinition(workflow); + WorkflowModel model = def.instance(10L).start().join(); + Number number = model.asNumber().orElseThrow(); + softly.assertThat(number.longValue()).isEqualTo(15L); + } + + softly.assertAll(); + } + + @Test + void test_input_with_inputFrom_fluent_way() { + SoftAssertions softly = new SoftAssertions(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("enrichOutputWithInputTest") + .tasks( + function("sumFive", (Long input) -> input + 5, Long.class) + .inputFrom( + (object, workflowContext, taskContextData) -> + input(taskContextData, Long.class) * 2)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowDefinition def = app.workflowDefinition(workflow); + WorkflowModel model = def.instance(10L).start().join(); + Number number = model.asNumber().orElseThrow(); + + softly.assertThat(number.longValue()).isEqualTo(25L); + } + + softly.assertAll(); + } +}