From 3ed98d16c19da1f19f0e51de2654dd2a2330dbef Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Wed, 25 Mar 2026 15:48:27 +0100 Subject: [PATCH] feat: provide full mutual authentication --- build.gradle.kts | 2 +- .../java/org/eclipse/dataplane/Dataplane.java | 42 ++++++--- .../org/eclipse/dataplane/domain/Result.java | 28 ++++++ .../dataplane/domain/dataflow/DataFlow.java | 10 +++ .../domain/registration/Authorization.java | 5 +- .../Oauth2ClientCredentialsAuthorization.java | 24 +++++- .../port/DataPlaneSignalingApiController.java | 36 ++++++-- .../dataplane/port/ExceptionMapper.java | 14 ++- .../exception/ControlPlaneNotRegistered.java | 23 +++++ .../port/store/ControlPlaneStore.java | 4 +- .../port/store/InMemoryControlPlaneStore.java | 9 +- .../org/eclipse/dataplane/ControlPlane.java | 34 ++++++-- .../eclipse/dataplane/DataplaneClient.java | 44 ++++++---- .../org/eclipse/dataplane/DataplaneTest.java | 10 ++- .../api/ControlPlaneRegistrationApiTest.java | 16 +--- .../authorization/TestAuthorization.java | 49 +++++++++++ .../scenario/AuthorizationOauth2Test.java | 45 +++++----- .../dataplane/scenario/AuthorizationTest.java | 86 ++++++++++++------- .../dataplane/scenario/ConsumerPullTest.java | 27 ++++-- .../dataplane/scenario/ProviderPushTest.java | 22 ++++- .../dataplane/scenario/StreamingPullTest.java | 26 ++++-- .../dataplane/scenario/StreamingPushTest.java | 19 +++- 22 files changed, 424 insertions(+), 151 deletions(-) create mode 100644 src/main/java/org/eclipse/dataplane/port/exception/ControlPlaneNotRegistered.java create mode 100644 src/test/java/org/eclipse/dataplane/authorization/TestAuthorization.java diff --git a/build.gradle.kts b/build.gradle.kts index 0e14ff6..6c5e227 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "org.eclipse.dataplane-core" -version = "0.0.7-SNAPSHOT" +version = "0.0.8-SNAPSHOT" repositories { mavenCentral() diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index c8959cc..959bfdc 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -39,8 +39,10 @@ import org.eclipse.dataplane.port.DataPlaneRegistrationApiController; import org.eclipse.dataplane.port.DataPlaneSignalingApiController; import org.eclipse.dataplane.port.exception.AuthorizationNotSupported; +import org.eclipse.dataplane.port.exception.ControlPlaneNotRegistered; import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.eclipse.dataplane.port.exception.DataplaneNotRegistered; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; import org.eclipse.dataplane.port.store.ControlPlaneStore; import org.eclipse.dataplane.port.store.DataFlowStore; import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore; @@ -57,6 +59,7 @@ import java.util.UUID; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; import static java.util.Collections.emptyMap; public class Dataplane { @@ -84,7 +87,7 @@ public static Builder newInstance() { } public DataPlaneSignalingApiController controller() { - return new DataPlaneSignalingApiController(this); + return new DataPlaneSignalingApiController(this, authorizations); } public DataPlaneRegistrationApiController registrationController() { @@ -104,7 +107,14 @@ public Result status(String dataFlowId) { .map(f -> new DataFlowStatusResponseMessage(f.getId(), f.getState().name())); } - public Result prepare(DataFlowPrepareMessage message) { + private Result checkControlPlane(String controlplaneId) { + if (controlPlaneStore.exists(controlplaneId)) { + return Result.success(); + } + return Result.failure(new ControlPlaneNotRegistered(controlplaneId)); + } + + public Result prepare(String controlplaneId, DataFlowPrepareMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -117,9 +127,11 @@ public Result prepare(DataFlowPrepareMessage message) { .participantId(message.participantId()) .counterPartyId(message.counterPartyId()) .dataspaceContext(message.dataspaceContext()) + .controlplaneId(controlplaneId) .build(); - return onPrepare.action(initialDataFlow) + return checkControlPlane(controlplaneId) + .compose(v -> onPrepare.action(initialDataFlow)) .compose(dataFlow -> { if (dataFlow.isInitiating()) { dataFlow.transitionToPrepared(); @@ -137,7 +149,7 @@ public Result prepare(DataFlowPrepareMessage message) { } - public Result start(DataFlowStartMessage message) { + public Result start(String controlplaneId, DataFlowStartMessage message) { var initialDataFlow = DataFlow.newInstance() .id(message.processId()) .state(DataFlow.State.INITIATING) @@ -149,9 +161,11 @@ public Result start(DataFlowStartMessage message) { .participantId(message.participantId()) .counterPartyId(message.counterPartyId()) .dataspaceContext(message.dataspaceContext()) + .controlplaneId(controlplaneId) .build(); - return onStart.action(initialDataFlow) + return checkControlPlane(controlplaneId) + .compose(v -> onStart.action(initialDataFlow)) .compose(dataFlow -> { if (dataFlow.isInitiating()) { dataFlow.transitionToStarted(); @@ -310,14 +324,16 @@ private Result notifyControlPlane(String action, DataFlow dataFlow, Object .header("content-type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(body)); - var controlPlane = controlPlaneStore.findByEndpoint(dataFlow.getCallbackAddress()); - if (controlPlane.succeeded()) { - var authorizationProfile = controlPlane.getContent().authorization(); - if (authorizationProfile != null) { - var authorization = authorizations.get(authorizationProfile.getType()); - authorization.apply(requestBuilder, authorizationProfile); - } - } + controlPlaneStore.findById(dataFlow.getControlplaneId()) + .compose(controlPlane -> { + var authorizationProfile = controlPlane.authorization(); + if (authorizationProfile != null) { + var authorization = authorizations.get(authorizationProfile.getType()); + return authorization.authorizationHeader(authorizationProfile); + } + return Result.failure(new ResourceNotFoundException("ControlPlane has no authorization")); + }) + .onSuccess(authorizationHeader -> requestBuilder.header(AUTHORIZATION, authorizationHeader)); return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding()); }) diff --git a/src/main/java/org/eclipse/dataplane/domain/Result.java b/src/main/java/org/eclipse/dataplane/domain/Result.java index 5b218ef..62b2979 100644 --- a/src/main/java/org/eclipse/dataplane/domain/Result.java +++ b/src/main/java/org/eclipse/dataplane/domain/Result.java @@ -15,6 +15,7 @@ package org.eclipse.dataplane.domain; import java.util.NoSuchElementException; +import java.util.function.Consumer; import java.util.function.Function; public abstract class Result { @@ -52,6 +53,10 @@ public static Result attempt(ExceptionThrowingSupplier resultSupplier) public abstract Result compose(ExceptionThrowingFunction> transformValue); + public abstract Result onSuccess(Consumer onSuccessDo); + + public abstract Result onFailure(Consumer onFailureDo); + public boolean succeeded() { return this instanceof Result.Success; } @@ -101,6 +106,17 @@ public Result compose(ExceptionThrowingFunction> transformVa return Result.failure(e); } } + + @Override + public Result onSuccess(Consumer onSuccessDo) { + onSuccessDo.accept(content); + return this; + } + + @Override + public Result onFailure(Consumer onFailureDo) { + return this; + } } private static class Failure extends Result { @@ -140,6 +156,17 @@ public Result map(ExceptionThrowingFunction transformValue) { public Result compose(ExceptionThrowingFunction> transformValue) { return Result.failure(this.exception); } + + @Override + public Result onSuccess(Consumer onSuccessDo) { + return this; + } + + @Override + public Result onFailure(Consumer onFailureDo) { + onFailureDo.accept(exception); + return this; + } } @FunctionalInterface @@ -151,4 +178,5 @@ public interface ExceptionThrowingFunction { public interface ExceptionThrowingSupplier { T get() throws Exception; } + } diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index 9ed3a2f..15059fc 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -39,6 +39,7 @@ public class DataFlow { private List labels; private Map metadata; private DataAddress dataAddress; + private String controlplaneId; public static DataFlow.Builder newInstance() { return new Builder(); @@ -158,6 +159,10 @@ public URI callbackEndpointFor(String action) { return URI.create(getCallbackAddress() + "/transfers/" + getId() + "/dataflow/" + action); } + public String getControlplaneId() { + return controlplaneId; + } + public static class Builder { private final DataFlow dataFlow = new DataFlow(); @@ -234,6 +239,11 @@ public Builder metadata(Map metadata) { dataFlow.metadata = metadata; return this; } + + public Builder controlplaneId(String controlplaneId) { + dataFlow.controlplaneId = controlplaneId; + return this; + } } public enum State { diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java b/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java index 34f9cba..6ec1ed8 100644 --- a/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java +++ b/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java @@ -14,7 +14,7 @@ package org.eclipse.dataplane.domain.registration; -import java.net.http.HttpRequest; +import org.eclipse.dataplane.domain.Result; /** * Defines structure for an authorization profile. @@ -30,6 +30,7 @@ public interface Authorization { * Function that applies the authorization profile to the request builder. * e.g. the Authorization header could be added with proper content. */ - HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile); + Result authorizationHeader(AuthorizationProfile profile); + Result extractCallerId(String authorizationHeader); } diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/Oauth2ClientCredentialsAuthorization.java b/src/main/java/org/eclipse/dataplane/domain/registration/Oauth2ClientCredentialsAuthorization.java index d5250d7..cade64b 100644 --- a/src/main/java/org/eclipse/dataplane/domain/registration/Oauth2ClientCredentialsAuthorization.java +++ b/src/main/java/org/eclipse/dataplane/domain/registration/Oauth2ClientCredentialsAuthorization.java @@ -15,6 +15,8 @@ package org.eclipse.dataplane.domain.registration; import com.fasterxml.jackson.databind.ObjectMapper; +import com.nimbusds.jwt.SignedJWT; +import org.eclipse.dataplane.domain.Result; import java.net.URI; import java.net.URLEncoder; @@ -25,7 +27,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; import static jakarta.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED; public class Oauth2ClientCredentialsAuthorization implements Authorization { @@ -39,7 +40,7 @@ public String type() { } @Override - public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile) { + public Result authorizationHeader(AuthorizationProfile profile) { var tokenEndpoint = profile.stringAttribute("tokenEndpoint"); var parameters = Map.of( @@ -63,10 +64,25 @@ public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, Authorizati var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); var body = response.body(); var accessToken = objectMapper.readValue(body, Map.class).get("access_token").toString(); - return requestBuilder.header(AUTHORIZATION, "Bearer " + accessToken); + return Result.success("Bearer " + accessToken); } catch (Exception e) { - throw new RuntimeException(e); + return Result.failure(e); } } + + @Override + public Result extractCallerId(String authorizationHeader) { + try { + var token = authorizationHeader.substring("Bearer ".length()); + var jwt = SignedJWT.parse(token); + var sub = jwt.getJWTClaimsSet().getClaims().get("sub"); + if (sub instanceof String callerId) { + return Result.success(callerId); + } + return Result.failure(new RuntimeException("JWT sub claim %s is not a string".formatted(sub))); + } catch (Exception e) { + return Result.failure(e); + } + } } diff --git a/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java b/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java index b30d98d..96218ca 100644 --- a/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java +++ b/src/main/java/org/eclipse/dataplane/port/DataPlaneSignalingApiController.java @@ -16,12 +16,16 @@ import jakarta.ws.rs.Consumes; import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotAuthorizedException; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.Response; import org.eclipse.dataplane.Dataplane; +import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; @@ -29,6 +33,9 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; +import org.eclipse.dataplane.domain.registration.Authorization; + +import java.util.Map; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; import static jakarta.ws.rs.core.MediaType.WILDCARD; @@ -39,15 +46,20 @@ public class DataPlaneSignalingApiController { private final Dataplane dataplane; + private final Map authorizations; - public DataPlaneSignalingApiController(Dataplane dataplane) { + public DataPlaneSignalingApiController(Dataplane dataplane, Map authorizations) { this.dataplane = dataplane; + this.authorizations = authorizations; } @POST @Path("/prepare") - public Response prepare(DataFlowPrepareMessage message) { - var response = dataplane.prepare(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS); + public Response prepare(DataFlowPrepareMessage message, @Context ContainerRequestContext requestContext) { + var response = extractControlplaneId(requestContext) + .compose(controlplaneId -> dataplane.prepare(controlplaneId, message)) + .orElseThrow(ExceptionMapper.MAP_TO_WSRS); + if (response.state().equals(DataFlow.State.PREPARING.name())) { return Response.accepted(response).build(); } @@ -56,8 +68,11 @@ public Response prepare(DataFlowPrepareMessage message) { @POST @Path("/start") - public Response start(DataFlowStartMessage message) { - var response = dataplane.start(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS); + public Response start(DataFlowStartMessage message, @Context ContainerRequestContext requestContext) { + var response = extractControlplaneId(requestContext) + .compose(controlplaneId -> dataplane.start(controlplaneId, message)) + .orElseThrow(ExceptionMapper.MAP_TO_WSRS); + if (response.state().equals(DataFlow.State.STARTING.name())) { return Response.accepted(response).build(); } @@ -99,4 +114,15 @@ public DataFlowStatusResponseMessage status(@PathParam("flowId") String flowId) return dataplane.status(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS); } + private Result extractControlplaneId(ContainerRequestContext requestContext) { + var authorizationHeader = requestContext.getHeaderString("Authorization"); + if (authorizationHeader == null) { + return Result.failure(new NotAuthorizedException("Authorization header missing")); + } + return authorizations.values().stream() + .map(authorization -> authorization.extractCallerId(authorizationHeader)) + .filter(Result::succeeded).findFirst() + .orElseGet(() -> Result.failure(new NotAuthorizedException("Authorization method not recognized"))); + } + } diff --git a/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java b/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java index 54e8592..0e73427 100644 --- a/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java +++ b/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java @@ -15,9 +15,11 @@ package org.eclipse.dataplane.port; import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.NotAuthorizedException; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.WebApplicationException; import org.eclipse.dataplane.port.exception.AuthorizationNotSupported; +import org.eclipse.dataplane.port.exception.ControlPlaneNotRegistered; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; import java.util.function.Function; @@ -25,12 +27,20 @@ public interface ExceptionMapper { Function MAP_TO_WSRS = exception -> { + if (exception instanceof WebApplicationException webApplicationException) { + return webApplicationException; + } + if (exception instanceof ResourceNotFoundException notFound) { return new NotFoundException(notFound); } - if (exception instanceof AuthorizationNotSupported authorizationNotSupported) { - return new BadRequestException(authorizationNotSupported); + if (exception instanceof ControlPlaneNotRegistered controlPlaneNotRegistered) { + return new NotAuthorizedException(controlPlaneNotRegistered); + } + + if (exception instanceof AuthorizationNotSupported) { + return new BadRequestException(exception); } return new WebApplicationException("unexpected internal server error"); diff --git a/src/main/java/org/eclipse/dataplane/port/exception/ControlPlaneNotRegistered.java b/src/main/java/org/eclipse/dataplane/port/exception/ControlPlaneNotRegistered.java new file mode 100644 index 0000000..116a0b2 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/exception/ControlPlaneNotRegistered.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.exception; + +public class ControlPlaneNotRegistered extends Exception { + + public ControlPlaneNotRegistered(String controlplaneId) { + super("Controlplane " + controlplaneId + " not registered"); + } + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java index f67dec6..747e672 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java @@ -17,8 +17,6 @@ import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.controlplane.ControlPlane; -import java.net.URI; - public interface ControlPlaneStore { Result save(ControlPlane controlPlane); @@ -26,5 +24,5 @@ public interface ControlPlaneStore { Result delete(String id); - Result findByEndpoint(URI endpoint); + boolean exists(String controlplaneId); } diff --git a/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java index d3b8186..7e66b49 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java @@ -20,10 +20,8 @@ import org.eclipse.dataplane.domain.controlplane.ControlPlane; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; -import java.net.URI; import java.util.HashMap; import java.util.Map; -import java.util.Objects; public class InMemoryControlPlaneStore implements ControlPlaneStore { @@ -64,11 +62,8 @@ public Result delete(String id) { } @Override - public Result findByEndpoint(URI endpoint) { - return store.values().stream().map(this::deserialize).filter(Result::succeeded) - .map(Result::getContent).filter(it -> Objects.equals(endpoint, it.getEndpoint())) - .findAny().map(Result::success) - .orElseGet(() -> Result.failure(new ResourceNotFoundException("ControlPlane with endpoint %s not found".formatted(endpoint)))); + public boolean exists(String controlplaneId) { + return store.containsKey(controlplaneId); } private Result deserialize(String json) { diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 039ac20..90d24c4 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -22,6 +22,7 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.container.ContainerRequestContext; import jakarta.ws.rs.core.Context; +import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; @@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Predicate; +import java.util.function.Supplier; import static jakarta.ws.rs.core.MediaType.WILDCARD; import static org.assertj.core.api.Assertions.assertThat; @@ -48,11 +50,16 @@ public class ControlPlane { private DataplaneClient providerClient; private HttpServer httpServer; private Predicate authorizationValidation = c -> true; + private Supplier> authorizationTokenGenerator; + + public static Builder newInstance() { + return new Builder(); + } public void initialize(HttpServer httpServer, String consumerDataPlanePath, String providerDataPlanePath) { this.httpServer = httpServer; - consumerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), consumerDataPlanePath)); - providerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), providerDataPlanePath)); + consumerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), consumerDataPlanePath), authorizationTokenGenerator); + providerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), providerDataPlanePath), authorizationTokenGenerator); Predicate authorizationProvider = context -> authorizationValidation.test(context); @@ -96,11 +103,6 @@ public URI consumerCallbackAddress() { return URI.create("http://localhost:%d/consumer/control-plane".formatted(httpServer.port())); } - @Deprecated(forRemoval = true) - public void setAuthorizationToken(String token) { - - } - public void setAuthorizationValidation(Predicate authorizationValidation) { this.authorizationValidation = authorizationValidation; } @@ -165,4 +167,22 @@ public void errored(@PathParam("transferId") String transferId, @Context Contain } + public static class Builder { + + private final ControlPlane instance = new ControlPlane(); + + private Builder() { + + } + + public ControlPlane build() { + return instance; + } + + public Builder authorizationTokenGenerator(Supplier> authorizationTokenGenerator) { + instance.authorizationTokenGenerator = authorizationTokenGenerator; + return this; + } + } + } diff --git a/src/test/java/org/eclipse/dataplane/DataplaneClient.java b/src/test/java/org/eclipse/dataplane/DataplaneClient.java index 07a9600..a0d0109 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneClient.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneClient.java @@ -16,26 +16,30 @@ import io.restassured.http.ContentType; import io.restassured.response.ValidatableResponse; +import io.restassured.specification.RequestSpecification; +import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; +import java.util.function.Supplier; + import static io.restassured.RestAssured.given; public class DataplaneClient { private final String baseUri; + private final Supplier> authorizationTokenGenerator; - public DataplaneClient(String baseUri) { + public DataplaneClient(String baseUri, Supplier> authorizationTokenGenerator) { this.baseUri = baseUri; + this.authorizationTokenGenerator = authorizationTokenGenerator; } public ValidatableResponse prepare(DataFlowPrepareMessage prepareMessage) { - return given() - .contentType(ContentType.JSON) - .baseUri(baseUri) + return baseRequest() .body(prepareMessage) .post("/v1/dataflows/prepare") .then() @@ -43,9 +47,7 @@ public ValidatableResponse prepare(DataFlowPrepareMessage prepareMessage) { } public ValidatableResponse start(DataFlowStartMessage startMessage) { - return given() - .contentType(ContentType.JSON) - .baseUri(baseUri) + return baseRequest() .body(startMessage) .post("/v1/dataflows/start") .then() @@ -53,9 +55,7 @@ public ValidatableResponse start(DataFlowStartMessage startMessage) { } public ValidatableResponse terminate(String dataFlowId, DataFlowTerminateMessage terminateMessage) { - return given() - .contentType(ContentType.JSON) - .baseUri(baseUri) + return baseRequest() .body(terminateMessage) .post("/v1/dataflows/{id}/terminate", dataFlowId) .then() @@ -71,9 +71,7 @@ public ValidatableResponse status(String dataFlowId) { } public ValidatableResponse started(String dataFlowId, DataFlowStartedNotificationMessage startedNotificationMessage) { - return given() - .contentType(ContentType.JSON) - .baseUri(baseUri) + return baseRequest() .body(startedNotificationMessage) .post("/v1/dataflows/{id}/started", dataFlowId) .then() @@ -81,21 +79,31 @@ public ValidatableResponse started(String dataFlowId, DataFlowStartedNotificatio } public ValidatableResponse completed(String dataFlowId) { - return given() - .baseUri(baseUri) + return baseRequest() .post("/v1/dataflows/{id}/completed", dataFlowId) .then() .log().ifValidationFails(); } public ValidatableResponse suspend(String flowId, DataFlowSuspendMessage suspendMessage) { - return given() - .contentType(ContentType.JSON) - .baseUri(baseUri) + return baseRequest() .body(suspendMessage) .post("/v1/dataflows/{id}/suspend", flowId) .then() .log().ifValidationFails(); } + private RequestSpecification baseRequest() { + var requestSpecification = given() + .contentType(ContentType.JSON) + .baseUri(baseUri); + + if (authorizationTokenGenerator != null) { + authorizationTokenGenerator.get() + .onSuccess(authorizationHeader -> requestSpecification.header("Authorization", authorizationHeader)); + } + + return requestSpecification; + } + } diff --git a/src/test/java/org/eclipse/dataplane/DataplaneTest.java b/src/test/java/org/eclipse/dataplane/DataplaneTest.java index a630f95..fa2714b 100644 --- a/src/test/java/org/eclipse/dataplane/DataplaneTest.java +++ b/src/test/java/org/eclipse/dataplane/DataplaneTest.java @@ -17,6 +17,7 @@ import com.github.tomakehurst.wiremock.WireMockServer; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.eclipse.dataplane.port.exception.DataplaneNotRegistered; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; @@ -73,7 +74,8 @@ void shouldFail_whenDataFlowDoesNotExist() { @Test void shouldReturnFailedFuture_whenControlPlaneIsNotAvailable() { var dataplane = Dataplane.newInstance().onPrepare(Result::success).build(); - dataplane.prepare(createPrepareMessage()); + dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any"))); + dataplane.prepare("controlplaneId", createPrepareMessage()); controlPlane.stop(); var result = dataplane.notifyCompleted("dataFlowId"); @@ -87,7 +89,8 @@ void shouldReturnFailedFuture_whenControlPlaneRespondWithError() { controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(500))); var dataplane = Dataplane.newInstance().onPrepare(Result::success).build(); - dataplane.prepare(createPrepareMessage()); + dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any"))); + dataplane.prepare("controlplaneId", createPrepareMessage()); var result = dataplane.notifyCompleted("dataFlowId"); @@ -100,7 +103,8 @@ void shouldReturnFailedFuture_whenControlPlaneRespondWithError() { void shouldTransitionToCompleted_whenControlPlaneRespondCorrectly() { controlPlane.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200))); var dataplane = Dataplane.newInstance().onPrepare(Result::success).build(); - dataplane.prepare(createPrepareMessage()); + dataplane.registerControlPlane(new ControlPlaneRegistrationMessage("controlplaneId", URI.create("http://localhost/any"))); + dataplane.prepare("controlplaneId", createPrepareMessage()); var result = dataplane.notifyCompleted("dataFlowId"); diff --git a/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java b/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java index facc2c4..ecddc4d 100644 --- a/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java +++ b/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java @@ -17,7 +17,7 @@ import io.restassured.http.ContentType; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; -import org.eclipse.dataplane.domain.registration.Authorization; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.registration.AuthorizationProfile; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import java.net.URI; -import java.net.http.HttpRequest; import java.util.List; import java.util.UUID; @@ -195,17 +194,4 @@ void shouldReturn404_whenControlPlaneDoesNotExist() { } } - private static class TestAuthorization implements Authorization { - - @Override - public String type() { - return "token"; - } - - @Override - public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile) { - return requestBuilder.header("Authorization", profile.stringAttribute("token")); - } - } - } diff --git a/src/test/java/org/eclipse/dataplane/authorization/TestAuthorization.java b/src/test/java/org/eclipse/dataplane/authorization/TestAuthorization.java new file mode 100644 index 0000000..d910e8e --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/authorization/TestAuthorization.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.authorization; + +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.registration.Authorization; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; +import org.jspecify.annotations.NonNull; + +import java.util.UUID; +import java.util.function.Function; + +public class TestAuthorization implements Authorization { + + public static final Function> TOKEN_GENERATOR = id -> Result.success(id + "::" + UUID.randomUUID()); + + public static @NonNull AuthorizationProfile createAuthorizationProfile(String callerId) { + var authorizationProfile = new AuthorizationProfile("token"); + authorizationProfile.setAttribute("token", callerId + "::" + UUID.randomUUID()); + return authorizationProfile; + } + + @Override + public String type() { + return "token"; + } + + @Override + public Result authorizationHeader(AuthorizationProfile profile) { + return Result.success(profile.stringAttribute("token")); + } + + @Override + public Result extractCallerId(String authorizationHeader) { + return Result.success(authorizationHeader.split("::")[0]); + } +} diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java index c60c6ed..deff35c 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationOauth2Test.java @@ -42,7 +42,6 @@ import org.junit.jupiter.api.Test; import java.net.URI; -import java.text.ParseException; import java.util.Date; import java.util.List; import java.util.Map; @@ -58,10 +57,11 @@ public class AuthorizationOauth2Test { private final HttpServer httpServer = new HttpServer(); - private final ControlPlane controlPlane = new ControlPlane(); + private final Oauth2ClientCredentialsAuthorization oauth2ClientCredentialsAuthorization = new Oauth2ClientCredentialsAuthorization(); + private ControlPlane controlPlane; private final Dataplane dataPlane = Dataplane.newInstance() .id("data-plane") - .registerAuthorization(new Oauth2ClientCredentialsAuthorization()) + .registerAuthorization(oauth2ClientCredentialsAuthorization) .onPrepare(dataFlow -> { dataFlow.transitionToPreparing(); return Result.success(dataFlow); @@ -74,6 +74,11 @@ public class AuthorizationOauth2Test { @BeforeEach void setUp() { httpServer.start(); + + controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> oauth2ClientCredentialsAuthorization.authorizationHeader(oauth2AuthorizationProfile())) + .build(); + controlPlane.initialize(httpServer, "/data-plane", "/data-plane"); httpServer.deploy("/data-plane", dataPlane.controller()); @@ -90,15 +95,15 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() { var controlplaneId = clientId; controlPlane.setAuthorizationValidation(requestContext -> requestContext - .containsHeaderString("Authorization", authorization -> isValidBearerTokenWithSub(authorization, controlplaneId))); + .containsHeaderString("Authorization", authorization -> { + var callerIdExtraction = oauth2ClientCredentialsAuthorization.extractCallerId(authorization); + return callerIdExtraction.succeeded() && Objects.equals(callerIdExtraction.getContent(), controlplaneId); + })); var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage( controlplaneId, controlPlane.consumerCallbackAddress(), - List.of(new AuthorizationProfile("oauth2_client_credentials") - .withAttribute("tokenEndpoint", "http://localhost:" + httpServer.port() + "/oauth2/token") - .withAttribute("clientId", clientId) - .withAttribute("clientSecret", clientSecret)) + List.of(oauth2AuthorizationProfile()) ); dataPlane.registerControlPlane(controlPlaneRegistrationMessage).orElseThrow(RuntimeException::new); @@ -115,31 +120,23 @@ void shouldCommunicateWithControlPlaneUsingOauth2Authorization() { assertThat(notifyPreparedResult.succeeded()).isTrue(); } - private boolean isValidBearerTokenWithSub(String authorization, String controlplaneId) { - var bearer = "Bearer "; - var isBearer = authorization.startsWith(bearer); - if (!isBearer) { - return false; - } - - try { - var jwt = SignedJWT.parse(authorization.substring(bearer.length())); - var sub = jwt.getJWTClaimsSet().getClaims().get("sub"); - return sub.equals(controlplaneId); - } catch (ParseException e) { - return false; - } - } - private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, URI callbackAddress, String transferType) { return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress, transferType, emptyList(), emptyMap()); } + private AuthorizationProfile oauth2AuthorizationProfile() { + return new AuthorizationProfile("oauth2_client_credentials") + .withAttribute("tokenEndpoint", "http://localhost:" + httpServer.port() + "/oauth2/token") + .withAttribute("clientId", clientId) + .withAttribute("clientSecret", clientSecret); + } + @Path("/") public static class Oauth2TokenController { + private final String clientId; private final String clientSecret; diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java index 5fe9906..42d0388 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java @@ -17,33 +17,36 @@ import org.eclipse.dataplane.ControlPlane; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; -import org.eclipse.dataplane.domain.registration.Authorization; -import org.eclipse.dataplane.domain.registration.AuthorizationProfile; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; +import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.net.URI; -import java.net.http.HttpRequest; import java.util.List; import java.util.UUID; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.dataplane.authorization.TestAuthorization.TOKEN_GENERATOR; public class AuthorizationTest { private final HttpServer httpServer = new HttpServer(); - private final ControlPlane controlPlane = new ControlPlane(); + private final TestAuthorization authorization = new TestAuthorization(); + private final ControlPlane controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> TOKEN_GENERATOR.apply("control-plane-id")) + .build(); private final Dataplane dataPlane = Dataplane.newInstance() - .id("data-plane") - .registerAuthorization(new TestAuthorization()) + .id("data-plane-id") + .registerAuthorization(authorization) .onPrepare(dataFlow -> { dataFlow.transitionToPreparing(); return Result.success(dataFlow); @@ -53,6 +56,7 @@ public class AuthorizationTest { @BeforeEach void setUp() { httpServer.start(); + controlPlane.initialize(httpServer, "/data-plane", "/data-plane"); httpServer.deploy("/data-plane", dataPlane.controller()); @@ -65,22 +69,17 @@ void tearDown() { @Test void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() { - var authorizationToken = UUID.randomUUID().toString(); controlPlane.setAuthorizationValidation(requestContext -> - requestContext.containsHeaderString("Authorization", a -> a.equals(authorizationToken))); - var authorizationProfile = new AuthorizationProfile("test-authorization"); - authorizationProfile.setAttribute("token", authorizationToken); + requestContext.containsHeaderString("Authorization", a -> a.startsWith("data-plane-id"))); var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage( - UUID.randomUUID().toString(), + "control-plane-id", controlPlane.consumerCallbackAddress(), - List.of(authorizationProfile) + List.of(TestAuthorization.createAuthorizationProfile("data-plane-id")) ); dataPlane.registerControlPlane(controlPlaneRegistrationMessage).orElseThrow(RuntimeException::new); - var transferType = "FileSystemAsync-PUSH"; - var processId = UUID.randomUUID().toString(); - var consumerProcessId = "consumer_" + processId; - var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); + var consumerProcessId = "consumer_" + UUID.randomUUID(); + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); @@ -90,23 +89,52 @@ void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() { assertThat(notifyPreparedResult.succeeded()).isTrue(); } - private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, URI callbackAddress, String transferType) { - return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", - "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress, - transferType, emptyList(), emptyMap()); + @Test + void shouldGetUnauthorized_whenControlPlaneIsNotAuthenticated() { + controlPlane.setAuthorizationValidation(requestContext -> + requestContext.containsHeaderString("Authorization", a -> a.startsWith("data-plane-id"))); + var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage( + "unmatching-control-plane-id", + controlPlane.consumerCallbackAddress(), + List.of(TestAuthorization.createAuthorizationProfile("data-plane-id")) + ); + dataPlane.registerControlPlane(controlPlaneRegistrationMessage).orElseThrow(RuntimeException::new); + + var consumerProcessId = "consumer_" + UUID.randomUUID(); + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); + + controlPlane.consumerPrepare(prepareMessage).statusCode(401); } - private static class TestAuthorization implements Authorization { + @Test + void shouldGetUnauthorized_withDataPlaneIsNotAuthenticated() { + controlPlane.setAuthorizationValidation(requestContext -> + requestContext.containsHeaderString("Authorization", a -> a.startsWith("unmatching-data-plane-id"))); + var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage( + "control-plane-id", + controlPlane.consumerCallbackAddress(), + List.of(TestAuthorization.createAuthorizationProfile("data-plane-id")) + ); + dataPlane.registerControlPlane(controlPlaneRegistrationMessage).orElseThrow(RuntimeException::new); - @Override - public String type() { - return "test-authorization"; - } + var consumerProcessId = "consumer_" + UUID.randomUUID(); + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), "FileSystemAsync-PUSH"); - @Override - public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile) { - return requestBuilder.header("Authorization", profile.stringAttribute("token")); - } + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + + var notifyPreparedResult = dataPlane.getById(consumerProcessId) + .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success)); + + assertThat(notifyPreparedResult.failed()).isTrue(); + assertThat(notifyPreparedResult.getException()).isInstanceOfSatisfying(DataFlowNotifyControlPlaneFailed.class, e -> { + assertThat(e.getResponse().statusCode()).isEqualTo(401); + }); + } + + private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, URI callbackAddress, String transferType) { + return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", + "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress, + transferType, emptyList(), emptyMap()); } } diff --git a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index 0559b94..5855d0b 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -17,6 +17,7 @@ import org.eclipse.dataplane.ControlPlane; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; @@ -25,6 +26,7 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -32,8 +34,10 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Objects; import java.util.UUID; @@ -41,6 +45,8 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.dataplane.authorization.TestAuthorization.TOKEN_GENERATOR; +import static org.eclipse.dataplane.authorization.TestAuthorization.createAuthorizationProfile; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTING; @@ -50,7 +56,9 @@ class ConsumerPullTest { private final HttpServer httpServer = new HttpServer(); private final int filesAvailableOnProvider = 13; - private final ControlPlane controlPlane = new ControlPlane(); + private final ControlPlane controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> TOKEN_GENERATOR.apply("control-plane-id")) + .build(); private final ConsumerDataPlane consumerDataPlane = new ConsumerDataPlane(); private final ProviderDataPlane providerDataPlane = new ProviderDataPlane(filesAvailableOnProvider); @@ -124,12 +132,19 @@ void shouldPermitAsyncStartup() { transferType, emptyList(), emptyMap()); } - private class ConsumerDataPlane { private final Path storage; + private final Dataplane sdk = Dataplane.newInstance() + .id("consumer") + .registerAuthorization(new TestAuthorization()) + .onPrepare(Result::success) + .onStarted(this::onStarted) + .build(); + ConsumerDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("consumer")))); try { storage = Files.createTempDirectory("consumer-storage"); } catch (IOException e) { @@ -137,12 +152,6 @@ private class ConsumerDataPlane { } } - private final Dataplane sdk = Dataplane.newInstance() - .id("consumer") - .onPrepare(Result::success) - .onStarted(this::onStarted) - .build(); - public Object controller() { return sdk.controller(); } @@ -165,11 +174,13 @@ private class ProviderDataPlane { private final Dataplane sdk = Dataplane.newInstance() .id("provider") + .registerAuthorization(new TestAuthorization()) .onStart(this::onStart) .build(); private final int filesToBeCreated; ProviderDataPlane(int fileToBeCreated) { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("provider")))); this.filesToBeCreated = fileToBeCreated; } diff --git a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java index d8a5a45..825f2df 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/ProviderPushTest.java @@ -17,6 +17,7 @@ import org.eclipse.dataplane.ControlPlane; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; @@ -24,6 +25,7 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -33,6 +35,7 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -42,6 +45,8 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.dataplane.authorization.TestAuthorization.TOKEN_GENERATOR; +import static org.eclipse.dataplane.authorization.TestAuthorization.createAuthorizationProfile; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.COMPLETED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARING; @@ -52,7 +57,10 @@ public class ProviderPushTest { private final HttpServer httpServer = new HttpServer(); - private final ControlPlane controlPlane = new ControlPlane(); + private final ControlPlane controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> TOKEN_GENERATOR.apply("control-plane-id")) + .build(); + private final ConsumerDataPlane consumerDataPlane = new ConsumerDataPlane(); private final ProviderDataPlane providerDataPlane = new ProviderDataPlane(); @@ -157,10 +165,15 @@ private static class ProviderDataPlane { private final ExecutorService executor = Executors.newCachedThreadPool(); private final Dataplane sdk = Dataplane.newInstance() + .registerAuthorization(new TestAuthorization()) .id("provider") .onStart(this::onStart) .build(); + ProviderDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("provider")))); + } + private Result onStart(DataFlow dataFlow) { var dataAddress = dataFlow.getDataAddress(); var future = CompletableFuture.runAsync(() -> { @@ -192,12 +205,17 @@ public Object controller() { private static class ConsumerDataPlane { private final Dataplane sdk = Dataplane.newInstance() - .id("thisDataplaneId") + .id("consumer") + .registerAuthorization(new TestAuthorization()) .onPrepare(this::onPrepare) .onCompleted(this::onCompleted) .onTerminate(Result::success) .build(); + ConsumerDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("consumer")))); + } + public void completePreparation(String dataFlowId) { sdk.getById(dataFlowId) .compose(dataFlow -> sdk.notifyPrepared(dataFlowId, this::prepareDestinationDataAddress)) diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index ee210a3..4333ead 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -17,6 +17,7 @@ import org.eclipse.dataplane.ControlPlane; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; @@ -26,15 +27,18 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -47,6 +51,8 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.dataplane.authorization.TestAuthorization.TOKEN_GENERATOR; +import static org.eclipse.dataplane.authorization.TestAuthorization.createAuthorizationProfile; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED; @@ -54,7 +60,9 @@ class StreamingPullTest { private final HttpServer httpServer = new HttpServer(); - private final ControlPlane controlPlane = new ControlPlane(); + private final ControlPlane controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> TOKEN_GENERATOR.apply("control-plane-id")) + .build(); private final ConsumerDataPlane consumerDataPlane = new ConsumerDataPlane(); private final ProviderDataPlane providerDataPlane = new ProviderDataPlane(); @@ -147,8 +155,16 @@ private static class ConsumerDataPlane { private final Path storage; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final Dataplane sdk = Dataplane.newInstance() + .id("consumer") + .registerAuthorization(new TestAuthorization()) + .onPrepare(Result::success) + .onStarted(this::onStarted) + .build(); + ConsumerDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("consumer")))); try { storage = Files.createTempDirectory("consumer-storage"); } catch (IOException e) { @@ -156,12 +172,6 @@ private static class ConsumerDataPlane { } } - private final Dataplane sdk = Dataplane.newInstance() - .id("consumer") - .onPrepare(Result::success) - .onStarted(this::onStarted) - .build(); - public Object controller() { return sdk.controller(); } @@ -208,6 +218,7 @@ private static class ProviderDataPlane { private final Dataplane sdk = Dataplane.newInstance() .id("provider") + .registerAuthorization(new TestAuthorization()) .onStart(this::onStart) .onSuspend(this::stopDataFlow) .onTerminate(this::stopDataFlow) @@ -216,6 +227,7 @@ private static class ProviderDataPlane { private final Map> flows = new HashMap<>(); ProviderDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("provider")))); } public Object controller() { diff --git a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index 9ad1f72..7b463dd 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -17,12 +17,14 @@ import org.eclipse.dataplane.ControlPlane; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.authorization.TestAuthorization; import org.eclipse.dataplane.domain.DataAddress; import org.eclipse.dataplane.domain.Result; import org.eclipse.dataplane.domain.dataflow.DataFlow; import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -44,6 +47,8 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.dataplane.authorization.TestAuthorization.TOKEN_GENERATOR; +import static org.eclipse.dataplane.authorization.TestAuthorization.createAuthorizationProfile; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.PREPARED; import static org.eclipse.dataplane.domain.dataflow.DataFlow.State.STARTED; @@ -51,7 +56,9 @@ public class StreamingPushTest { private final HttpServer httpServer = new HttpServer(); - private final ControlPlane controlPlane = new ControlPlane(); + private final ControlPlane controlPlane = ControlPlane.newInstance() + .authorizationTokenGenerator(() -> TOKEN_GENERATOR.apply("control-plane-id")) + .build(); private final ConsumerDataPlane consumerDataPlane = new ConsumerDataPlane(); private final ProviderDataPlane providerDataPlane = new ProviderDataPlane(); @@ -101,10 +108,15 @@ private static class ProviderDataPlane { private final Map> flows = new HashMap<>(); private final Dataplane sdk = Dataplane.newInstance() .id("provider") + .registerAuthorization(new TestAuthorization()) .onStart(this::onStart) .onSuspend(this::stopFlow) .build(); + ProviderDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("provider")))); + } + private Result onStart(DataFlow dataFlow) { var dataAddress = dataFlow.getDataAddress(); var future = executor.scheduleAtFixedRate(() -> { @@ -139,6 +151,7 @@ private static class ConsumerDataPlane { private final Dataplane sdk = Dataplane.newInstance() .id("consumer") + .registerAuthorization(new TestAuthorization()) .onPrepare(this::onPrepare) .onSuspend(Result::success) .onCompleted(this::onCompleted) @@ -146,6 +159,10 @@ private static class ConsumerDataPlane { private final Map destinations = new HashMap<>(); + ConsumerDataPlane() { + sdk.registerControlPlane(new ControlPlaneRegistrationMessage("control-plane-id", URI.create("http://localhost:any"), List.of(createAuthorizationProfile("consumer")))); + } + private Result onPrepare(DataFlow dataFlow) { try { var destinationFolder = Files.createTempDirectory("consumer-dest");