Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Contributors:
* Think-it GmbH - initial API and implementation
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
*
*/

Expand All @@ -21,9 +22,9 @@
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
Expand Down Expand Up @@ -114,7 +115,7 @@ private Result<Void> checkControlPlane(String controlplaneId) {
return Result.failure(new ControlPlaneNotRegistered(controlplaneId));
}

public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
public Result<DataFlowStatusMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
var initialDataFlow = DataFlow.newInstance()
.id(message.processId())
.state(DataFlow.State.INITIATING)
Expand All @@ -137,19 +138,19 @@ public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPr
dataFlow.transitionToPrepared();
}

DataFlowResponseMessage response;
DataFlowStatusMessage response;
if (dataFlow.isPrepared() && dataFlow.isPush()) {
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null);
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null);
} else {
response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null);
response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null);
}

return save(dataFlow).map(it -> response);
});
}


public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStartMessage message) {
public Result<DataFlowStatusMessage> start(String controlplaneId, DataFlowStartMessage message) {
var initialDataFlow = DataFlow.newInstance()
.id(message.processId())
.state(DataFlow.State.INITIATING)
Expand All @@ -171,11 +172,11 @@ public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStar
dataFlow.transitionToStarted();
}

DataFlowResponseMessage response;
DataFlowStatusMessage response;
if (dataFlow.isStarted() && dataFlow.isPull()) {
response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null);
} else {
response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null);
response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null);
}
return save(dataFlow).map(it -> response);
});
Expand Down Expand Up @@ -213,7 +214,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
.compose(onPrepare::action)
.compose(dataFlow -> {
dataFlow.transitionToPrepared();
var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);

return notifyControlPlane("prepared", dataFlow, message);

Expand All @@ -231,7 +232,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
.compose(dataFlow -> {
dataFlow.transitionToStarted();

var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null);
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null);

return notifyControlPlane("started", dataFlow, message);

Expand Down Expand Up @@ -263,7 +264,9 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
.compose(dataFlow -> {
dataFlow.transitionToTerminated(throwable.getMessage());

return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined
var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage());

return notifyControlPlane("errored", dataFlow, message);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.Objects;

// TODO: could it store the messages?
public class DataFlow {

private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
*
*/

package org.eclipse.dataplane.domain.dataflow;

import org.eclipse.dataplane.domain.DataAddress;

public record DataFlowResponseMessage(
public record DataFlowStatusMessage(
String dataplaneId,
DataAddress dataAddress,
String dataFlowId,
String state,
DataAddress dataAddress,
String error
) {
}
6 changes: 3 additions & 3 deletions src/test/java/org/eclipse/dataplane/ControlPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import jakarta.ws.rs.core.Context;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;

Expand Down Expand Up @@ -122,7 +122,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate<ContainerRe
@POST
@Path("/{transferId}/dataflow/prepared")
@Consumes(WILDCARD)
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
public void prepared(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
if (!authorizationValidation.test(context)) {
throw new NotAuthorizedException("Not authorized");
}
Expand All @@ -132,7 +132,7 @@ public void prepared(@PathParam("transferId") String transferId, @Context Contai
@POST
@Path("/{transferId}/dataflow/started")
@Consumes(WILDCARD)
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowResponseMessage message) {
public void started(@PathParam("transferId") String transferId, @Context ContainerRequestContext context, DataFlowStatusMessage message) {
if (!authorizationValidation.test(context)) {
throw new NotAuthorizedException("Not authorized");
}
Expand Down
43 changes: 40 additions & 3 deletions src/test/java/org/eclipse/dataplane/DataplaneTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage
*
*/

Expand All @@ -30,6 +31,7 @@
import java.net.URI;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.absent;
import static com.github.tomakehurst.wiremock.client.WireMock.and;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
Expand All @@ -43,6 +45,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED;
import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED;

class DataplaneTest {

Expand Down Expand Up @@ -111,12 +114,42 @@ void shouldTransitionToCompleted_whenControlPlaneRespondCorrectly() {
assertThat(result.succeeded()).isTrue();
assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(COMPLETED.name());
}
}

@Nested
class NotifyErrored {
@Test
void shouldFail_whenDataFlowDoesNotExist() {
var dataplane = Dataplane.newInstance().build();

private DataFlowPrepareMessage createPrepareMessage() {
return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any",
URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap());
var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error"));

assertThat(result.failed()).isTrue();
assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(ResourceNotFoundException.class);
}

@Test
void shouldSendDataFlowStatusMessage_whenDataFlowIsErrored() {
controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200)));
var dataplane = Dataplane.newInstance().id("dataplane-id").onPrepare(Result::success).build();
dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any")));
dataplane.prepare("controlplaneId", createPrepareMessage());

var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error"));

assertThat(result.succeeded()).isTrue();
assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(TERMINATED.name());

controlPlane.verify(postRequestedFor(urlPathEqualTo("/transfers/dataFlowId/dataflow/errored"))
.withRequestBody(and(
matchingJsonPath("dataplaneId", equalTo("dataplane-id")),
matchingJsonPath("dataFlowId", equalTo("dataFlowId")),
matchingJsonPath("state", equalTo("TERMINATED")),
matchingJsonPath("dataAddress", absent()),
matchingJsonPath("error", equalTo("some-error"))
))
);
}
}

@Nested
Expand Down Expand Up @@ -164,4 +197,8 @@ void shouldFail_whenStatusIsNot200() {
}
}

private DataFlowPrepareMessage createPrepareMessage() {
return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any",
URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.eclipse.dataplane.HttpServer;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.eclipse.dataplane.domain.registration.Oauth2ClientCredentialsAuthorization;
Expand Down Expand Up @@ -112,7 +112,7 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() {
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType);

controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);

var notifyPreparedResult = dataPlane.getById(consumerProcessId)
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.eclipse.dataplane.authorization.TestAuthorization;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.jspecify.annotations.NonNull;
Expand Down Expand Up @@ -81,7 +81,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() {
var consumerProcessId = "consumer_" + UUID.randomUUID();
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH");

controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);

var notifyPreparedResult = dataPlane.getById(consumerProcessId)
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));
Expand Down Expand Up @@ -120,7 +120,7 @@ void shouldGetUnauthorized_withDataPlaneIsNotAuthenticated() {
var consumerProcessId = "consumer_" + UUID.randomUUID();
var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH");

controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);

var notifyPreparedResult = dataPlane.getById(consumerProcessId)
.compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.jspecify.annotations.NonNull;
Expand Down Expand Up @@ -83,13 +83,13 @@ void shouldPullDataFromProvider() {
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);

var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(prepareResponse.state()).isEqualTo(PREPARED.name());
assertThat(prepareResponse.dataAddress()).isNull();

var providerProcessId = "provider_" + processId;
var startMessage = createStartMessage(providerProcessId, transferType);
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(startResponse.state()).isEqualTo(STARTED.name());
assertThat(startResponse.dataAddress()).isNotNull();

Expand All @@ -106,11 +106,11 @@ void shouldPermitAsyncStartup() {
var processId = UUID.randomUUID().toString();
var consumerProcessId = "consumer_" + processId;
var prepareMessage = createPrepareMessage(consumerProcessId, transferType);
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class);
controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class);

var providerProcessId = "provider_" + processId;
var startMessage = createStartMessage(providerProcessId, transferType);
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class);
var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class);
assertThat(startResponse.state()).isEqualTo(STARTING.name());
assertThat(startResponse.dataAddress()).isNull();

Expand Down
Loading