From c354b96da2e240d5ebc721bb596480b0d763595a Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Thu, 26 Mar 2026 15:58:21 +0100 Subject: [PATCH] feat: resume endpoint --- .../java/org/eclipse/dataplane/Dataplane.java | 26 +++++++++++++++++++ .../dataflow/DataFlowResumeMessage.java | 24 +++++++++++++++++ .../org/eclipse/dataplane/logic/OnResume.java | 24 +++++++++++++++++ .../port/DataPlaneSignalingApiController.java | 10 +++++++ .../org/eclipse/dataplane/ControlPlane.java | 5 ++++ .../eclipse/dataplane/DataplaneClient.java | 9 +++++++ .../dataplane/scenario/StreamingPullTest.java | 11 ++++++-- 7 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResumeMessage.java create mode 100644 src/main/java/org/eclipse/dataplane/logic/OnResume.java diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 549a9d5..7b68030 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -22,6 +22,7 @@ import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; @@ -33,6 +34,7 @@ import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage; import org.eclipse.dataplane.logic.OnCompleted; import org.eclipse.dataplane.logic.OnPrepare; +import org.eclipse.dataplane.logic.OnResume; import org.eclipse.dataplane.logic.OnStart; import org.eclipse.dataplane.logic.OnStarted; import org.eclipse.dataplane.logic.OnSuspend; @@ -77,6 +79,7 @@ public class Dataplane { private OnStart onStart = dataFlow -> Result.failure(new UnsupportedOperationException("onStart is not implemented")); private OnTerminate onTerminate = dataFlow -> Result.failure(new UnsupportedOperationException("onTerminate is not implemented")); private OnSuspend onSuspend = dataFlow -> Result.failure(new UnsupportedOperationException("onSuspend is not implemented")); + private OnResume onResume = dataFlow -> Result.failure(new UnsupportedOperationException("onResume is not implemented")); private OnStarted onStarted = dataFlow -> Result.failure(new UnsupportedOperationException("onStarted is not implemented")); private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented")); @@ -193,6 +196,24 @@ public Result suspend(String flowId, DataFlowSuspendMessage message) { .map(it -> null); } + public Result resume(String flowId, DataFlowResumeMessage message) { + return dataFlowStore.findById(flowId) + .map(dataFlow -> { + if (message.dataAddress() != null) { + dataFlow.setDataAddress(message.dataAddress()); + } + return dataFlow; + }) + .compose(onResume::action) + .compose(dataFlow -> { + dataFlow.transitionToStarted(); + + var response = new DataFlowStatusMessage(id, flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); + + return save(dataFlow).map(it -> response); + }); + } + public Result terminate(String dataFlowId, DataFlowTerminateMessage message) { return dataFlowStore.findById(dataFlowId) .map(dataFlow -> { @@ -443,6 +464,11 @@ public Builder onSuspend(OnSuspend onSuspend) { return this; } + public Builder onResume(OnResume onResume) { + dataplane.onResume = onResume; + return this; + } + public Builder onTerminate(OnTerminate onTerminate) { dataplane.onTerminate = onTerminate; return this; diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResumeMessage.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResumeMessage.java new file mode 100644 index 0000000..d1120cb --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResumeMessage.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.domain.dataflow; + +import org.eclipse.dataplane.domain.DataAddress; + +public record DataFlowResumeMessage( + String messageId, + String processId, + DataAddress dataAddress +) { +} diff --git a/src/main/java/org/eclipse/dataplane/logic/OnResume.java b/src/main/java/org/eclipse/dataplane/logic/OnResume.java new file mode 100644 index 0000000..e38fc53 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/logic/OnResume.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.logic; + +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.dataflow.DataFlow; + +public interface OnResume { + + Result action(DataFlow dataFlow); + +} diff --git a/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java b/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java index 96218ca..950f497 100644 --- a/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java +++ b/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java @@ -9,6 +9,7 @@ * * Contributors: * Think-it GmbH - initial API and implementation + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint * */ @@ -28,6 +29,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; @@ -86,6 +88,14 @@ public Response suspend(@PathParam("flowId") String flowId, DataFlowSuspendMessa return Response.ok().build(); } + @POST + @Path("/{flowId}/resume") + public Response resume(@PathParam("flowId") String flowId, DataFlowResumeMessage message) { + var response = dataplane.resume(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS); + + return Response.ok(response).build(); + } + @POST @Path("/{flowId}/terminate") public Response terminate(@PathParam("flowId") String flowId, DataFlowTerminateMessage message) { diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 46d3bd9..305dbf9 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -24,6 +24,7 @@ import jakarta.ws.rs.core.Context; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; @@ -83,6 +84,10 @@ public ValidatableResponse providerSuspend(String flowId, DataFlowSuspendMessage return providerClient.suspend(flowId, suspendMessage); } + public ValidatableResponse providerResume(String flowId, DataFlowResumeMessage resumeMessage) { + return providerClient.resume(flowId, resumeMessage); + } + public ValidatableResponse providerStatus(String flowId) { return providerClient.status(flowId); } diff --git a/src/test/java/org/eclipse/dataplane/DataplaneClient.java b/src/test/java/org/eclipse/dataplane/DataplaneClient.java index a0d0109..316afe2 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneClient.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneClient.java @@ -19,6 +19,7 @@ import io.restassured.specification.RequestSpecification; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; @@ -93,6 +94,14 @@ public ValidatableResponse suspend(String flowId, DataFlowSuspendMessage suspend .log().ifValidationFails(); } + public ValidatableResponse resume(String flowId, DataFlowResumeMessage resumeMessage) { + return baseRequest() + .body(resumeMessage) + .post("/v1/dataflows/{id}/resume", flowId) + .then() + .log().ifValidationFails(); + } + private RequestSpecification baseRequest() { var requestSpecification = given() .contentType(ContentType.JSON) diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 5fd267f..838b5c1 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -9,6 +9,7 @@ * * Contributors: * Think-it GmbH - initial API and implementation + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint * */ @@ -22,6 +23,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; @@ -133,7 +135,8 @@ void shouldSuspendAndResumeOnProvider() { consumerDataPlane.assertNoMoreDataIsTransferred(); - var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); + var resumeMessage = resumeMessage(providerProcessId); + var resumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200); consumerDataPlane.assertDataIsFlowing(); @@ -151,6 +154,10 @@ private DataFlowStartMessage startMessage(String providerProcessId, String trans transferType, null, emptyList(), emptyMap()); } + private DataFlowResumeMessage resumeMessage(String providerProcessId) { + return new DataFlowResumeMessage("theMessageId", providerProcessId, null); + } + private static class ConsumerDataPlane { private final Path storage; @@ -221,6 +228,7 @@ private static class ProviderDataPlane { .registerAuthorization(new TestAuthorization()) .onStart(this::onStart) .onSuspend(this::stopDataFlow) + .onResume(this::onStart) .onTerminate(this::stopDataFlow) .build(); private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); @@ -265,6 +273,5 @@ private Result stopDataFlow(DataFlow dataFlow) { return Result.failure(e); } } - } }