From af029e777de85a9ac19c8505fa24471a4b1efbfa Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Tue, 24 Mar 2026 16:48:49 +0100 Subject: [PATCH 1/5] chore: remove outdated TODO --- .../java/org/eclipse/dataplane/domain/dataflow/DataFlow.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index 15059fc..efec334 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Objects; -// TODO: could it store the messages? public class DataFlow { private String id; From c351fda4f2251d1f867c59569f088b6d15dc0214 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 25 Mar 2026 16:21:59 +0100 Subject: [PATCH 2/5] feat: introduce DataFlowStatusMessage --- .../java/org/eclipse/dataplane/Dataplane.java | 27 ++++++++++--------- ...essage.java => DataFlowStatusMessage.java} | 6 +++-- .../org/eclipse/dataplane/ControlPlane.java | 6 ++--- .../scenario/AuthorizationOauth2Test.java | 4 +-- .../dataplane/scenario/AuthorizationTest.java | 4 +-- .../dataplane/scenario/ConsumerPullTest.java | 10 +++---- .../dataplane/scenario/ProviderPushTest.java | 12 ++++----- .../dataplane/scenario/StreamingPullTest.java | 12 ++++----- .../dataplane/scenario/StreamingPushTest.java | 6 ++--- 9 files changed, 46 insertions(+), 41 deletions(-) rename src/main/java/org/eclipse/dataplane/domain/dataflow/{DataFlowResponseMessage.java => DataFlowStatusMessage.java} (76%) diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 959bfdc..2b3068c 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -10,6 +10,7 @@ * Contributors: * Think-it GmbH - initial API and implementation * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - data flow properties + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage * */ @@ -21,7 +22,7 @@ import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; @@ -114,7 +115,7 @@ private Result checkControlPlane(String controlplaneId) { return Result.failure(new ControlPlaneNotRegistered(controlplaneId)); } - public Result prepare(String controlplaneId, DataFlowPrepareMessage message) { + public Result prepare(String controlplaneId, DataFlowPrepareMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -137,11 +138,11 @@ public Result prepare(String controlplaneId, DataFlowPr dataFlow.transitionToPrepared(); } - DataFlowResponseMessage response; + DataFlowStatusMessage response; if (dataFlow.isPrepared() && dataFlow.isPush()) { - response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), initialDataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), dataFlow.getDataAddress(), null); } else { - response = new DataFlowResponseMessage(id, null, initialDataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), initialDataFlow.getState().name(), null, null); } return save(dataFlow).map(it -> response); @@ -149,7 +150,7 @@ public Result prepare(String controlplaneId, DataFlowPr } - public Result start(String controlplaneId, DataFlowStartMessage message) { + public Result start(String controlplaneId, DataFlowStartMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -171,11 +172,11 @@ public Result start(String controlplaneId, DataFlowStar dataFlow.transitionToStarted(); } - DataFlowResponseMessage response; + DataFlowStatusMessage response; if (dataFlow.isStarted() && dataFlow.isPull()) { - response = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), dataFlow.getDataAddress(), null); } else { - response = new DataFlowResponseMessage(id, null, dataFlow.getState().name(), null); + response = new DataFlowStatusMessage(id, dataFlow.getId(), dataFlow.getState().name(), null, null); } return save(dataFlow).map(it -> response); }); @@ -213,7 +214,7 @@ public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { .compose(onPrepare::action) .compose(dataFlow -> { dataFlow.transitionToPrepared(); - var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(),null); return notifyControlPlane("prepared", dataFlow, message); @@ -231,7 +232,7 @@ public Result notifyStarted(String dataFlowId, OnStart onStart) { .compose(dataFlow -> { dataFlow.transitionToStarted(); - var message = new DataFlowResponseMessage(id, dataFlow.getDataAddress(), dataFlow.getState().name(), null); + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); return notifyControlPlane("started", dataFlow, message); @@ -263,7 +264,9 @@ public Result notifyErrored(String dataFlowId, Throwable throwable) { .compose(dataFlow -> { dataFlow.transitionToTerminated(throwable.getMessage()); - return notifyControlPlane("errored", dataFlow, emptyMap()); // TODO DataFlowErroredMessage not defined + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), null, throwable.getMessage()); + + return notifyControlPlane("errored", dataFlow, message); }); } diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java similarity index 76% rename from src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java rename to src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java index f9ced83..e9cc66c 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowResponseMessage.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java @@ -9,6 +9,7 @@ * * Contributors: * Think-it GmbH - initial API and implementation + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage * */ @@ -16,10 +17,11 @@ import org.eclipse.dataplane.domain.DataAddress; -public record DataFlowResponseMessage( +public record DataFlowStatusMessage( String dataplaneId, - DataAddress dataAddress, + String dataFlowId, String state, + DataAddress dataAddress, String error ) { } diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 90d24c4..aa3a7cd 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -24,7 +24,7 @@ import jakarta.ws.rs.core.Context; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; @@ -122,7 +122,7 @@ public ControlPlaneController(DataplaneClient counterPart, Predicate dataPlane.notifyPrepared(consumerProcessId, Result::success)); diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java index 42d0388..086dcbb 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java @@ -20,7 +20,7 @@ import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.jspecify.annotations.NonNull; @@ -81,7 +81,7 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() { var consumerProcessId = "consumer_" + UUID.randomUUID(); var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); - controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); var notifyPreparedResult = dataPlane.getById(consumerProcessId) .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success)); diff --git a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index 5855d0b..24d6989 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -22,7 +22,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; @@ -83,13 +83,13 @@ void shouldPullDataFromProvider() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -106,11 +106,11 @@ void shouldPermitAsyncStartup() { var processId = UUID.randomUUID().toString(); var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, transferType); - controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTING.name()); assertThat(startResponse.dataAddress()).isNull(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java index 825f2df..354f825 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java @@ -22,7 +22,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; @@ -85,14 +85,14 @@ void shouldPushDataToEndpointPreparedByConsumer() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNotNull(); var destinationDataAddress = prepareResponse.dataAddress(); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, destinationDataAddress); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNull(); @@ -116,12 +116,12 @@ void shouldSendError_whenFlowFails() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); var invalidDataAddress = new DataAddress("FileSystem", "", emptyList()); var providerProcessId = "provider_" + processId; var startMessage = createStartMessage(providerProcessId, controlPlane.providerCallbackAddress(), transferType, invalidDataAddress); - controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); await().untilAsserted(() -> { var providerStatus = controlPlane.providerStatus(providerProcessId).statusCode(200).extract().as(DataFlowStatusResponseMessage.class); @@ -139,7 +139,7 @@ void shouldPermitAsyncPreparation() { var consumerProcessId = "consumer_" + processId; var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARING.name()); assertThat(prepareResponse.dataAddress()).isNull(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 4333ead..81dc356 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -22,7 +22,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; @@ -87,13 +87,13 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() { var consumerProcessId = "consumer_" + processId; var prepareMessage = prepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = startMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -115,13 +115,13 @@ void shouldSuspendAndResumeOnProvider() { var consumerProcessId = "consumer_" + processId; var prepareMessage = prepareMessage(consumerProcessId, transferType); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNull(); var providerProcessId = "provider_" + processId; var startMessage = startMessage(providerProcessId, transferType); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); @@ -133,7 +133,7 @@ void shouldSuspendAndResumeOnProvider() { consumerDataPlane.assertNoMoreDataIsTransferred(); - var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var resumeResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(resumeResponse.dataAddress())).statusCode(200); consumerDataPlane.assertDataIsFlowing(); diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index 7b463dd..7f8a8f3 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -22,7 +22,7 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; @@ -85,7 +85,7 @@ void shouldPushDataToEndpointPreparedByConsumer() { "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", URI.create("http://callback"), transferType, emptyList(), emptyMap()); - var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var prepareResponse = controlPlane.consumerPrepare(prepareMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(prepareResponse.state()).isEqualTo(PREPARED.name()); assertThat(prepareResponse.dataAddress()).isNotNull(); var destinationDataAddress = prepareResponse.dataAddress(); @@ -94,7 +94,7 @@ void shouldPushDataToEndpointPreparedByConsumer() { var startMessage = new DataFlowStartMessage("theMessageId", "theParticipantId", "theCounterPartyId", "theDataspaceContext", providerProcessId, "theAgreementId", "theDatasetId", controlPlane.providerCallbackAddress(), transferType, destinationDataAddress, emptyList(), emptyMap()); - var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowResponseMessage.class); + var startResponse = controlPlane.providerStart(startMessage).statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNull(); From 09a2e13ae2262f3c5eefb4fd82715178f8ed7343 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 25 Mar 2026 16:38:42 +0100 Subject: [PATCH 3/5] test: notifyErrored --- .../org/eclipse/dataplane/DataplaneTest.java | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/eclipse/dataplane/DataplaneTest.java b/src/test/java/org/eclipse/dataplane/DataplaneTest.java index fa2714b..a1e6f50 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneTest.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneTest.java @@ -9,6 +9,7 @@ * * Contributors: * Think-it GmbH - initial API and implementation + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - introduce DataFlowStatusMessage * */ @@ -30,6 +31,7 @@ import java.net.URI; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.absent; import static com.github.tomakehurst.wiremock.client.WireMock.and; import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; @@ -43,6 +45,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED; +import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.TERMINATED; class DataplaneTest { @@ -111,12 +114,41 @@ void shouldTransitionToCompleted_whenControlPlaneRespondCorrectly() { assertThat(result.succeeded()).isTrue(); assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(COMPLETED.name()); } + } + + @Nested + class NotifyErrored { + @Test + void shouldFail_whenDataFlowDoesNotExist() { + var dataplane = Dataplane.newInstance().build(); - private DataFlowPrepareMessage createPrepareMessage() { - return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any", - URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap()); + var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error")); + + assertThat(result.failed()).isTrue(); + assertThatThrownBy(result::orElseThrow).isExactlyInstanceOf(ResourceNotFoundException.class); } + @Test + void shouldSendDataFlowStatusMessage_whenDataFlowIsErrored() { + controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200))); + var dataplane = Dataplane.newInstance().id("dataplane-id").onPrepare(Result::success).build(); + dataplane.prepare(createPrepareMessage()); + + var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error")); + + assertThat(result.succeeded()).isTrue(); + assertThat(dataplane.status("dataFlowId").getContent().state()).isEqualTo(TERMINATED.name()); + + controlPlane.verify(postRequestedFor(urlPathEqualTo("/transfers/dataFlowId/dataflow/errored")) + .withRequestBody(and( + matchingJsonPath("dataplaneId", equalTo("dataplane-id")), + matchingJsonPath("dataFlowId", equalTo("dataFlowId")), + matchingJsonPath("state", equalTo("TERMINATED")), + matchingJsonPath("dataAddress", absent()), + matchingJsonPath("error", equalTo("some-error")) + )) + ); + } } @Nested @@ -164,4 +196,8 @@ void shouldFail_whenStatusIsNot200() { } } + private DataFlowPrepareMessage createPrepareMessage() { + return new DataFlowPrepareMessage("any", "any", "any", "any", "dataFlowId", "any", "any", + URI.create(controlPlane.baseUrl()), "Something-PUSH", emptyList(), emptyMap()); + } } From cc49cb37c8a7baf552a6d11db2af6a8cf5c9871b Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 25 Mar 2026 16:45:19 +0100 Subject: [PATCH 4/5] chore: checkstyle --- src/main/java/org/eclipse/dataplane/Dataplane.java | 4 ++-- src/test/java/org/eclipse/dataplane/ControlPlane.java | 2 +- .../java/org/eclipse/dataplane/scenario/ConsumerPullTest.java | 2 +- .../java/org/eclipse/dataplane/scenario/ProviderPushTest.java | 2 +- .../org/eclipse/dataplane/scenario/StreamingPullTest.java | 2 +- .../org/eclipse/dataplane/scenario/StreamingPushTest.java | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 2b3068c..549a9d5 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -22,9 +22,9 @@ import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; @@ -214,7 +214,7 @@ public Result notifyPrepared(String dataFlowId, OnPrepare onPrepare) { .compose(onPrepare::action) .compose(dataFlow -> { dataFlow.transitionToPrepared(); - var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(),null); + var message = new DataFlowStatusMessage(id, dataFlowId, dataFlow.getState().name(), dataFlow.getDataAddress(), null); return notifyControlPlane("prepared", dataFlow, message); diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index aa3a7cd..46d3bd9 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -24,9 +24,9 @@ import jakarta.ws.rs.core.Context; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; diff --git a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index 24d6989..87902fc 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -22,9 +22,9 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; diff --git a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java index 354f825..c1bc46d 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java @@ -22,8 +22,8 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 81dc356..5fd267f 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -22,9 +22,9 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index 7f8a8f3..30f6181 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -22,8 +22,8 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; -import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowStatusMessage; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; From 347040e2cd45d37eadb38233a669a331f1c7afb2 Mon Sep 17 00:00:00 2001 From: Ronja Quensel Date: Wed, 25 Mar 2026 17:17:15 +0100 Subject: [PATCH 5/5] fix: compile after rebase --- src/test/java/org/eclipse/dataplane/DataplaneTest.java | 3 ++- .../java/org/eclipse/dataplane/scenario/AuthorizationTest.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/eclipse/dataplane/DataplaneTest.java b/src/test/java/org/eclipse/dataplane/DataplaneTest.java index a1e6f50..cd72ca1 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneTest.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneTest.java @@ -132,7 +132,8 @@ void shouldFail_whenDataFlowDoesNotExist() { void shouldSendDataFlowStatusMessage_whenDataFlowIsErrored() { controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200))); var dataplane = Dataplane.newInstance().id("dataplane-id").onPrepare(Result::success).build(); - dataplane.prepare(createPrepareMessage()); + dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any"))); + dataplane.prepare("controlplaneId", createPrepareMessage()); var result = dataplane.notifyErrored("dataFlowId", new RuntimeException("some-error")); diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java index 086dcbb..470962f 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java @@ -120,7 +120,7 @@ void shouldGetUnauthorized_withDataPlaneIsNotAuthenticated() { var consumerProcessId = "consumer_" + UUID.randomUUID(); var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); - controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowStatusMessage.class); var notifyPreparedResult = dataPlane.getById(consumerProcessId) .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success));