Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
Expand All @@ -33,6 +34,7 @@
import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage;
import org.eclipse.dataplane.logic.OnCompleted;
import org.eclipse.dataplane.logic.OnPrepare;
import org.eclipse.dataplane.logic.OnResume;
import org.eclipse.dataplane.logic.OnStart;
import org.eclipse.dataplane.logic.OnStarted;
import org.eclipse.dataplane.logic.OnSuspend;
Expand Down Expand Up @@ -77,6 +79,7 @@ public class Dataplane {
private OnStart onStart = dataFlow -> Result.failure(new UnsupportedOperationException("onStart is not implemented"));
private OnTerminate onTerminate = dataFlow -> Result.failure(new UnsupportedOperationException("onTerminate is not implemented"));
private OnSuspend onSuspend = dataFlow -> Result.failure(new UnsupportedOperationException("onSuspend is not implemented"));
private OnResume onResume = dataFlow -> Result.failure(new UnsupportedOperationException("onResume is not implemented"));
private OnStarted onStarted = dataFlow -> Result.failure(new UnsupportedOperationException("onStarted is not implemented"));
private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented"));

Expand Down Expand Up @@ -193,6 +196,24 @@ public Result<Void> suspend(String flowId, DataFlowSuspendMessage message) {
.map(it -> null);
}

public Result<DataFlowStatusMessage> resume(String flowId, DataFlowResumeMessage message) {
return dataFlowStore.findById(flowId)
.map(dataFlow -> {
if (message.dataAddress() != null) {
dataFlow.setDataAddress(message.dataAddress());
}
return dataFlow;
})
.compose(onResume::action)
.compose(dataFlow -> {
dataFlow.transitionToStarted();

var response = new DataFlowStatusMessage(id, flowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);

return save(dataFlow).map(it -> response);
});
}

public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage message) {
return dataFlowStore.findById(dataFlowId)
.map(dataFlow -> {
Expand Down Expand Up @@ -443,6 +464,11 @@ public Builder onSuspend(OnSuspend onSuspend) {
return this;
}

public Builder onResume(OnResume onResume) {
dataplane.onResume = onResume;
return this;
}

public Builder onTerminate(OnTerminate onTerminate) {
dataplane.onTerminate = onTerminate;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.dataflow;

import org.eclipse.dataplane.domain.DataAddress;

public record DataFlowResumeMessage(
String messageId,
String processId,
DataAddress dataAddress
) {
}
24 changes: 24 additions & 0 deletions src/main/java/org/eclipse/dataplane/logic/OnResume.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.logic;

import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;

public interface OnResume {

Result<DataFlow> action(DataFlow dataFlow);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint
*
*/

Expand All @@ -28,6 +29,7 @@
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
Expand Down Expand Up @@ -86,6 +88,14 @@ public Response suspend(@PathParam("flowId") String flowId, DataFlowSuspendMessa
return Response.ok().build();
}

@POST
@Path("/{flowId}/resume")
public Response resume(@PathParam("flowId") String flowId, DataFlowResumeMessage message) {
var response = dataplane.resume(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);

return Response.ok(response).build();
}

@POST
@Path("/{flowId}/terminate")
public Response terminate(@PathParam("flowId") String flowId, DataFlowTerminateMessage message) {
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/org/eclipse/dataplane/ControlPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import jakarta.ws.rs.core.Context;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
Expand Down Expand Up @@ -83,6 +84,10 @@ public ValidatableResponse providerSuspend(String flowId, DataFlowSuspendMessage
return providerClient.suspend(flowId, suspendMessage);
}

public ValidatableResponse providerResume(String flowId, DataFlowResumeMessage resumeMessage) {
return providerClient.resume(flowId, resumeMessage);
}

public ValidatableResponse providerStatus(String flowId) {
return providerClient.status(flowId);
}
Expand Down
9 changes: 9 additions & 0 deletions src/test/java/org/eclipse/dataplane/DataplaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.restassured.specification.RequestSpecification;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
Expand Down Expand Up @@ -93,6 +94,14 @@ public ValidatableResponse suspend(String flowId, DataFlowSuspendMessage suspend
.log().ifValidationFails();
}

public ValidatableResponse resume(String flowId, DataFlowResumeMessage resumeMessage) {
return baseRequest()
.body(resumeMessage)
.post("/v1/dataflows/{id}/resume", flowId)
.then()
.log().ifValidationFails();
}

private RequestSpecification baseRequest() {
var requestSpecification = given()
.contentType(ContentType.JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - resume endpoint
*
*/

Expand All @@ -22,6 +23,7 @@
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResumeMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
Expand Down Expand Up @@ -133,7 +135,8 @@ void shouldSuspendAndResumeOnProvider() {

consumerDataPlane.assertNoMoreDataIsTransferred();

var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
var resumeMessage = resumeMessage(providerProcessId);
var resumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200);

consumerDataPlane.assertDataIsFlowing();
Expand All @@ -151,6 +154,10 @@ private DataFlowStartMessage startMessage(String providerProcessId, String trans
transferType, null, emptyList(), emptyMap());
}

private DataFlowResumeMessage resumeMessage(String providerProcessId) {
return new DataFlowResumeMessage("theMessageId", providerProcessId, null);
}

private static class ConsumerDataPlane {

private final Path storage;
Expand Down Expand Up @@ -221,6 +228,7 @@ private static class ProviderDataPlane {
.registerAuthorization(new TestAuthorization())
.onStart(this::onStart)
.onSuspend(this::stopDataFlow)
.onResume(this::onStart)
.onTerminate(this::stopDataFlow)
.build();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Expand Down Expand Up @@ -265,6 +273,5 @@ private Result<DataFlow> stopDataFlow(DataFlow dataFlow) {
return Result.failure(e);
}
}

}
}