diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 959bfdc..549a9d5 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -10,6 +10,7 @@ * Contributors: * Think-it GmbH - initial API and implementation * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage * */ @@ -21,9 +22,9 @@ import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; @@ -114,7 +115,7 @@ private Result checkControlPlane(String controlplaneId) { return Result.failure(new ControlPlaneNotRegistered(controlplaneId)); } - public Result prepare(String controlplaneId, DataFlowPrepareMessage message) { + public Result prepare(String controlplaneId, DataFlowPrepareMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -137,11 +138,11 @@ public Result prepare(String controlplaneId, DataFlowPr dataFlow.transitionToPrepared(); } - DataFlowResponseMessage response; + DataFlowStatusMessage response; if (dataFlow.isPrepared() && dataFlow.isPush()) { - response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null); } else { - response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null); } return save(dataFlow).map(it -> response); @@ -149,7 +150,7 @@ public Result prepare(String controlplaneId, DataFlowPr } - public Result start(String controlplaneId, DataFlowStartMessage message) { + public Result start(String controlplaneId, DataFlowStartMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -171,11 +172,11 @@ public Result start(String controlplaneId, DataFlowStar dataFlow.transitionToStarted(); } - DataFlowResponseMessage response; + DataFlowStatusMessage response; if (dataFlow.isStarted() && dataFlow.isPull()) { - response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null); } else { - response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null); } return save(dataFlow).map(it -> response); }); @@ -213,7 +214,7 @@ public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { .compose(onPrepare::action) .compose(dataFlow -> { dataFlow.transitionToPrepared(); - var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); return notifyControlPlane("prepared", dataFlow, message); @@ -231,7 +232,7 @@ public Result notifyStarted(String dataFlowId, OnStart onStart) { .compose(dataFlow -> { dataFlow.transitionToStarted(); - var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); return notifyControlPlane("started", dataFlow, message); @@ -263,7 +264,9 @@ public Result notifyErrored(String dataFlowId, Throwable throwable) { .compose(dataFlow -> { dataFlow.transitionToTerminated(throwable.getMessage()); - return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage()); + + return notifyControlPlane("errored", dataFlow, message); }); } diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index 15059fc..efec334 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Objects; -// TODO: could it store the messages? public class DataFlow { private String id; diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java similarity index 76% rename from src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java rename to src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java index f9ced83..e9cc66c 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java @@ -9,6 +9,7 @@ * * Contributors: * Think-it GmbH - initial API and implementation + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage * */ @@ -16,10 +17,11 @@ import org.eclipse.dataplane.domain.DataAddress; -public record DataFlowResponseMessage( +public record DataFlowStatusMessage( String dataplaneId, - DataAddress dataAddress, + String dataFlowId, String state, + DataAddress dataAddress, String error ) { } diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 90d24c4..46d3bd9 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -24,9 +24,9 @@ import jakarta.ws.rs.core.Context; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; @@ -122,7 +122,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate dataPlane.notifyPrepared(consumerProcessId, Result::success)); diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java index 42d0388..470962f 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java @@ -20,7 +20,7 @@ import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.jspecify.annotations.NonNull; @@ -81,7 +81,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() { var consumerProcessId = "consumer_" + UUID.randomUUID(); var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); - controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); var notifyPreparedResult = dataPlane.getById(consumerProcessId) .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success)); @@ -120,7 +120,7 @@ void shouldGetUnauthorized_withDataPlaneIsNotAuthenticated() { var consumerProcessId = "consumer_" + UUID.randomUUID(); var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); - controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); var notifyPreparedResult = dataPlane.getById(consumerProcessId) .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success)); diff --git a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index 5855d0b..87902fc 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -22,9 +22,9 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; @@ -83,13 +83,13 @@ void shouldPullDataFromProvider() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -106,11 +106,11 @@ void shouldPermitAsyncStartup() { var processId = UUID.randomUUID().toString(); var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, transferType); - controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTING.name()); assertThat(startResponse.dataAddress()).isNull(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java index 825f2df..c1bc46d 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java @@ -22,8 +22,8 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; @@ -85,14 +85,14 @@ void shouldPushDataToEndpointPreparedByConsumer() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNotNull(); var destinationDataAddress = prepareResponse.dataAddress(); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNull(); @@ -116,12 +116,12 @@ void shouldSendError_whenFlowFails() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); var invalidDataAddress = new DataAddress("FileSystem", "", emptyList()); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, invalidDataAddress); - controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); await().untilAsserted(() -> { var providerStatus = controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class); @@ -139,7 +139,7 @@ void shouldPermitAsyncPreparation() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARING.name()); assertThat(prepareResponse.dataAddress()).isNull(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 4333ead..5fd267f 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -22,9 +22,9 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; @@ -87,13 +87,13 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() { var consumerProcessId = "consumer_" + processId; var prepareMessage = prepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = startMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -115,13 +115,13 @@ void shouldSuspendAndResumeOnProvider() { var consumerProcessId = "consumer_" + processId; var prepareMessage = prepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = startMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -133,7 +133,7 @@ void shouldSuspendAndResumeOnProvider() { consumerDataPlane.assertNoMoreDataIsTransferred(); - var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200); consumerDataPlane.assertDataIsFlowing(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index 7b463dd..30f6181 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -22,8 +22,8 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -85,7 +85,7 @@ void shouldPushDataToEndpointPreparedByConsumer() { "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", URI.create("http://callback"), transferType, emptyList(), emptyMap()); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNotNull(); var destinationDataAddress = prepareResponse.dataAddress(); @@ -94,7 +94,7 @@ void shouldPushDataToEndpointPreparedByConsumer() { var startMessage = new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId", "theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(), transferType, destinationDataAddress, emptyList(), emptyMap()); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNull();