diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index 9aab334..f1c8ee9 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -27,6 +27,8 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; +import org.eclipse.dataplane.domain.registration.Authorization; +import org.eclipse.dataplane.domain.registration.AuthorizationType; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage; import org.eclipse.dataplane.logic.OnCompleted; @@ -37,6 +39,7 @@ import org.eclipse.dataplane.logic.OnTerminate; import org.eclipse.dataplane.port.DataPlaneRegistrationApiController; import org.eclipse.dataplane.port.DataPlaneSignalingApiController; +import org.eclipse.dataplane.port.exception.AuthorizationNotSupported; import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed; import org.eclipse.dataplane.port.exception.DataplaneNotRegistered; import org.eclipse.dataplane.port.store.ControlPlaneStore; @@ -48,9 +51,12 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.function.BiConsumer; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static java.util.Collections.emptyMap; @@ -73,6 +79,7 @@ public class Dataplane { private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented")); private final HttpClient httpClient = HttpClient.newHttpClient(); + private final Map authorizationTypes = new HashMap<>(); public static Builder newInstance() { return new Builder(); @@ -229,7 +236,7 @@ public Result notifyCompleted(String dataFlowId) { .compose(dataFlow -> { dataFlow.transitionToCompleted(); - return notifyControlPlane("completed", dataFlow, emptyMap()); // TODO DataFlowCompletedMessage not defined + return notifyControlPlane("completed", dataFlow, emptyMap()); }); } @@ -300,13 +307,22 @@ private Result notifyControlPlane(String action, DataFlow dataFlow, Object return toJson(message) .map(body -> { var endpoint = dataFlow.callbackEndpointFor(action); - var request = HttpRequest.newBuilder() + var requestBuilder = HttpRequest.newBuilder() .uri(URI.create(endpoint)) .header("content-type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(body)) - .build(); + .POST(HttpRequest.BodyPublishers.ofString(body)); + + var controlPlane = controlPlaneStore.findByEndpoint(dataFlow.getCallbackAddress()); + if (controlPlane.succeeded()) { + var authorization = controlPlane.getContent().authorization(); + if (authorization != null) { + var authorizationType = authorizationTypes.get(authorization.getType()); + var castAuthorization = objectMapper.convertValue(authorization, authorizationType.authorizationClass()); + authorizationType.authorizationFunction().accept(requestBuilder, castAuthorization); + } + } - return httpClient.send(request, HttpResponse.BodyHandlers.discarding()); + return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding()); }) .compose(response -> { var successful = response.statusCode() >= 200 && response.statusCode() < 300; @@ -331,9 +347,16 @@ public ControlPlaneStore controlPlaneStore() { } public Result registerControlPlane(ControlPlaneRegistrationMessage message) { + for (var auth : message.authorization()) { + if (!authorizationTypes.containsKey(auth.getType())) { + return Result.failure(new AuthorizationNotSupported(auth)); + } + } + var controlPlane = ControlPlane.newInstance() .id(message.controlplaneId()) .endpoint(message.endpoint()) + .authorization(message.authorization()) .build(); return controlPlaneStore.save(controlPlane); @@ -406,5 +429,10 @@ public Builder onTerminate(OnTerminate onTerminate) { dataplane.onTerminate = onTerminate; return this; } + + public Builder registerAuthorization(String type, Class authorizationClass, BiConsumer authorizationFunction) { + dataplane.authorizationTypes.put(type, new AuthorizationType<>(type, authorizationClass, authorizationFunction)); + return this; + } } } diff --git a/src/main/java/org/eclipse/dataplane/domain/controlplane/ControlPlane.java b/src/main/java/org/eclipse/dataplane/domain/controlplane/ControlPlane.java index 123d0ef..1509f35 100644 --- a/src/main/java/org/eclipse/dataplane/domain/controlplane/ControlPlane.java +++ b/src/main/java/org/eclipse/dataplane/domain/controlplane/ControlPlane.java @@ -14,12 +14,19 @@ package org.eclipse.dataplane.domain.controlplane; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import org.eclipse.dataplane.domain.registration.Authorization; +import org.eclipse.dataplane.domain.registration.RawAuthorization; + +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class ControlPlane { private String id; private String endpoint; + private final List authorizations = new ArrayList<>(); public String getId() { return id; @@ -33,6 +40,15 @@ public static ControlPlane.Builder newInstance() { return new ControlPlane.Builder(); } + public List getAuthorizations() { + return authorizations; + } + + public Authorization authorization() { + return getAuthorizations().stream().findAny().orElse(null); + } + + @JsonPOJOBuilder public static class Builder { private final ControlPlane controlPlane = new ControlPlane(); @@ -55,5 +71,10 @@ public Builder endpoint(String endpoint) { controlPlane.endpoint = endpoint; return this; } + + public Builder authorization(List authorizations) { + controlPlane.authorizations.addAll(authorizations); + return this; + } } } diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java b/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java new file mode 100644 index 0000000..6606f97 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/domain/registration/Authorization.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.domain.registration; + +public interface Authorization { + + String getType(); +} diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/AuthorizationType.java b/src/main/java/org/eclipse/dataplane/domain/registration/AuthorizationType.java new file mode 100644 index 0000000..09338ea --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/domain/registration/AuthorizationType.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.domain.registration; + +import java.net.http.HttpRequest; +import java.util.function.BiConsumer; + +public record AuthorizationType( + String type, + Class authorizationClass, + BiConsumer authorizationFunction // TODO: dedicated interface +) { + +} diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/ControlPlaneRegistrationMessage.java b/src/main/java/org/eclipse/dataplane/domain/registration/ControlPlaneRegistrationMessage.java index b4cdc13..cd096f3 100644 --- a/src/main/java/org/eclipse/dataplane/domain/registration/ControlPlaneRegistrationMessage.java +++ b/src/main/java/org/eclipse/dataplane/domain/registration/ControlPlaneRegistrationMessage.java @@ -14,9 +14,16 @@ package org.eclipse.dataplane.domain.registration; +import java.util.List; + +import static java.util.Collections.emptyList; + public record ControlPlaneRegistrationMessage( String controlplaneId, - String endpoint -// TODO: authorization + String endpoint, + List authorization ) { + public ControlPlaneRegistrationMessage(String controlplaneId, String endpoint) { + this(controlplaneId, endpoint, emptyList()); + } } diff --git a/src/main/java/org/eclipse/dataplane/domain/registration/RawAuthorization.java b/src/main/java/org/eclipse/dataplane/domain/registration/RawAuthorization.java new file mode 100644 index 0000000..e069d69 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/domain/registration/RawAuthorization.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.domain.registration; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; + +import java.util.HashMap; +import java.util.Map; + +public class RawAuthorization implements Authorization { + + private final Map attributes = new HashMap<>(); + + @Override + public String getType() { + return attributes.get("type").toString(); + } + + @JsonAnyGetter + public Map getAttributes() { + return attributes; + } + + @JsonAnySetter + public void setAttribute(String key, Object value) { + attributes.put(key, value); + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java b/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java index e349083..54e8592 100644 --- a/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java +++ b/src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java @@ -14,8 +14,10 @@ package org.eclipse.dataplane.port; +import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.WebApplicationException; +import org.eclipse.dataplane.port.exception.AuthorizationNotSupported; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; import java.util.function.Function; @@ -26,6 +28,11 @@ public interface ExceptionMapper { if (exception instanceof ResourceNotFoundException notFound) { return new NotFoundException(notFound); } + + if (exception instanceof AuthorizationNotSupported authorizationNotSupported) { + return new BadRequestException(authorizationNotSupported); + } + return new WebApplicationException("unexpected internal server error"); }; diff --git a/src/main/java/org/eclipse/dataplane/port/exception/AuthorizationNotSupported.java b/src/main/java/org/eclipse/dataplane/port/exception/AuthorizationNotSupported.java new file mode 100644 index 0000000..4f91d9e --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/exception/AuthorizationNotSupported.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.exception; + +import org.eclipse.dataplane.domain.registration.Authorization; + +public class AuthorizationNotSupported extends Exception { + + public AuthorizationNotSupported(Authorization authorization) { + super("Authorization type " + authorization.getType() + " not supported"); + } + +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java index c127690..5201fc4 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/ControlPlaneStore.java @@ -23,4 +23,6 @@ public interface ControlPlaneStore { Result findById(String controlplaneId); Result delete(String id); + + Result findByEndpoint(String endpoint); } diff --git a/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java index 3133f78..8c9427a 100644 --- a/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java +++ b/src/main/java/org/eclipse/dataplane/port/store/InMemoryControlPlaneStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; public class InMemoryControlPlaneStore implements ControlPlaneStore { @@ -44,17 +45,12 @@ public Result save(ControlPlane controlPlane) { @Override public Result findById(String controlplaneId) { - var dataFlow = store.get(controlplaneId); - if (dataFlow == null) { + var json = store.get(controlplaneId); + if (json == null) { return Result.failure(new ResourceNotFoundException("ControlPlane %s not found".formatted(controlplaneId))); } - try { - var deserialized = objectMapper.readValue(dataFlow, ControlPlane.class); - return Result.success(deserialized); - } catch (JsonProcessingException e) { - return Result.failure(e); - } + return deserialize(json); } @Override @@ -65,4 +61,21 @@ public Result delete(String id) { } return Result.success(); } + + @Override + public Result findByEndpoint(String endpoint) { + return store.values().stream().map(this::deserialize).filter(Result::succeeded) + .map(Result::getContent).filter(it -> Objects.equals(endpoint, it.getEndpoint())) + .findAny().map(Result::success) + .orElseGet(() -> Result.failure(new ResourceNotFoundException("ControlPlane with endpoint %s not found".formatted(endpoint)))); + } + + private Result deserialize(String json) { + try { + var deserialized = objectMapper.readValue(json, ControlPlane.class); + return Result.success(deserialized); + } catch (JsonProcessingException e) { + return Result.failure(e); + } + } } diff --git a/src/test/java/org/eclipse/dataplane/ControlPlane.java b/src/test/java/org/eclipse/dataplane/ControlPlane.java index 0e9bd72..5d00fb2 100644 --- a/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -16,6 +16,8 @@ import io.restassured.response.ValidatableResponse; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.NotAuthorizedException; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; @@ -26,8 +28,10 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage; import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Predicate; import static jakarta.ws.rs.core.MediaType.WILDCARD; import static org.assertj.core.api.Assertions.assertThat; @@ -42,14 +46,22 @@ public class ControlPlane { private final DataplaneClient consumerClient; private final DataplaneClient providerClient; private final HttpServer httpServer; + private String authorizationToken; public ControlPlane(HttpServer httpServer, String consumerDataPlanePath, String providerDataPlanePath) { this.httpServer = httpServer; consumerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), consumerDataPlanePath)); providerClient = new DataplaneClient("http://localhost:%d%s".formatted(httpServer.port(), providerDataPlanePath)); - httpServer.deploy("/consumer/control-plane", new ControlPlaneController(providerClient)); - httpServer.deploy("/provider/control-plane", new ControlPlaneController(consumerClient)); + Predicate authorizationProvider = authorization -> { + if (authorizationToken != null) { + return Objects.equals(authorization, authorizationToken); + } + return true; + }; + + httpServer.deploy("/consumer/control-plane", new ControlPlaneController(providerClient, authorizationProvider)); + httpServer.deploy("/provider/control-plane", new ControlPlaneController(consumerClient, authorizationProvider)); } @@ -89,34 +101,49 @@ public String consumerCallbackAddress() { return "http://localhost:%d/consumer/control-plane".formatted(httpServer.port()); } + public void setAuthorizationToken(String token) { + authorizationToken = token; + } + @Path("/transfers") public static class ControlPlaneController { private final ExecutorService executor = Executors.newCachedThreadPool(); private final DataplaneClient counterPart; + private final Predicate authorizationProvider; - public ControlPlaneController(DataplaneClient counterPart) { + public ControlPlaneController(DataplaneClient counterPart, Predicate authorizationProvider) { this.counterPart = counterPart; + this.authorizationProvider = authorizationProvider; } @POST @Path("/{transferId}/dataflow/prepared") @Consumes(WILDCARD) - public void prepared(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + public void prepared(@PathParam("transferId") String transferId, @HeaderParam("Authorization") String authorization, DataFlowResponseMessage message) { + if (!authorizationProvider.test(authorization)) { + throw new NotAuthorizedException("Not authorized"); + } assertThat(message.state()).isEqualTo("PREPARED"); } @POST @Path("/{transferId}/dataflow/started") @Consumes(WILDCARD) - public void started(@PathParam("transferId") String transferId, DataFlowResponseMessage message) { + public void started(@PathParam("transferId") String transferId, @HeaderParam("Authorization") String authorization, DataFlowResponseMessage message) { + if (!authorizationProvider.test(authorization)) { + throw new NotAuthorizedException("Not authorized"); + } assertThat(message.state()).isEqualTo("STARTED"); } @POST @Path("/{transferId}/dataflow/completed") @Consumes(WILDCARD) - public void completed(@PathParam("transferId") String transferId) { + public void completed(@PathParam("transferId") String transferId, @HeaderParam("Authorization") String authorization) { + if (!authorizationProvider.test(authorization)) { + throw new NotAuthorizedException("Not authorized"); + } executor.submit(() -> { var idPart = transferId.split("_")[1]; counterPart.completed("consumer_" + idPart).statusCode(200); @@ -126,7 +153,10 @@ public void completed(@PathParam("transferId") String transferId) { @POST @Path("/{transferId}/dataflow/errored") @Consumes(WILDCARD) - public void errored(@PathParam("transferId") String transferId) { + public void errored(@PathParam("transferId") String transferId, @HeaderParam("Authorization") String authorization) { + if (!authorizationProvider.test(authorization)) { + throw new NotAuthorizedException("Not authorized"); + } executor.submit(() -> { var idPart = transferId.split("_")[1]; counterPart.terminate("consumer_" + idPart, new DataFlowTerminateMessage("terminated by provider")).statusCode(200); @@ -134,4 +164,5 @@ public void errored(@PathParam("transferId") String transferId) { } } + } diff --git a/src/test/java/org/eclipse/dataplane/scenario/ControlPlaneRegistrationTest.java b/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java similarity index 68% rename from src/test/java/org/eclipse/dataplane/scenario/ControlPlaneRegistrationTest.java rename to src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java index 91fcf5d..7294392 100644 --- a/src/test/java/org/eclipse/dataplane/scenario/ControlPlaneRegistrationTest.java +++ b/src/test/java/org/eclipse/dataplane/api/ControlPlaneRegistrationApiTest.java @@ -12,28 +12,32 @@ * */ -package org.eclipse.dataplane.scenario; +package org.eclipse.dataplane.api; +import com.fasterxml.jackson.annotation.JsonProperty; import io.restassured.http.ContentType; import org.eclipse.dataplane.Dataplane; import org.eclipse.dataplane.HttpServer; import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; +import org.eclipse.dataplane.domain.registration.RawAuthorization; import org.eclipse.dataplane.port.exception.ResourceNotFoundException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.UUID; import static io.restassured.RestAssured.given; import static org.assertj.core.api.Assertions.assertThat; -class ControlPlaneRegistrationTest { +class ControlPlaneRegistrationApiTest { private final HttpServer httpServer = new HttpServer(21361); private final Dataplane sdk = Dataplane.newInstance() .id("consumer") + .registerAuthorization("test-authorization", TestAuthorization.class, (requestBuilder, authorization) -> {}) .build(); @BeforeEach @@ -103,6 +107,47 @@ void shouldReplaceControlPlane_whenSecondCall() { assertThat(result.succeeded()); assertThat(result.getContent().getEndpoint()).isEqualTo("http://new-endpoint"); } + + @Test + void shouldReturnBadRequest_whenRequestedAuthMethodNotSupported() { + var controlPlaneId = UUID.randomUUID().toString(); + var authorization = new RawAuthorization() { + + @Override + public String getType() { + return "unsupported"; + } + }; + var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage(controlPlaneId, "http://something", List.of(authorization)); + + given() + .contentType(ContentType.JSON) + .basePath("/runtime/data-plane") + .port(httpServer.port()) + .body(controlPlaneRegistrationMessage) + .put("/v1/controlplanes") + .then() + .log().ifValidationFails() + .statusCode(400); + } + + + @Test + void shouldRegisterAuthorizationType() { + var controlPlaneId = UUID.randomUUID().toString(); + var authorization = new TestAuthorization("token"); + var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage(controlPlaneId, "http://something", List.of(authorization)); + + given() + .contentType(ContentType.JSON) + .basePath("/runtime/data-plane") + .port(httpServer.port()) + .body(controlPlaneRegistrationMessage) + .put("/v1/controlplanes") + .then() + .log().ifValidationFails() + .statusCode(200); + } } @Nested @@ -152,4 +197,22 @@ void shouldReturn404_whenControlPlaneDoesNotExist() { } } + private static class TestAuthorization extends RawAuthorization { + + @JsonProperty("type") private final String type = "test-authorization"; + @JsonProperty("token") private String token; + + TestAuthorization(String token) { + this.token = token; + } + + @Override + public String getType() { + return type; + } + + public String getToken() { + return token; + } + } } diff --git a/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java new file mode 100644 index 0000000..51794d5 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/scenario/AuthorizationTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.scenario; + +import org.eclipse.dataplane.ControlPlane; +import org.eclipse.dataplane.Dataplane; +import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage; +import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; +import org.eclipse.dataplane.domain.registration.RawAuthorization; +import org.jspecify.annotations.NonNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.UUID; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; + +public class AuthorizationTest { + + private final HttpServer httpServer = new HttpServer(21961); + private final ControlPlane controlPlane = new ControlPlane(httpServer, "/data-plane", "/data-plane"); + private final Dataplane dataPlane = Dataplane.newInstance() + .id("data-plane") + .registerAuthorization("test-authorization", TestAuthorization.class, + (builder, authorization) -> builder.header("Authorization", authorization.getToken())) + .onPrepare(dataFlow -> { + dataFlow.transitionToPreparing(); + return Result.success(dataFlow); + }) + .build(); + + @BeforeEach + void setUp() { + httpServer.start(); + + httpServer.deploy("/data-plane", dataPlane.controller()); + } + + @Test + void shouldCommunicateWithControlPlaneUsingRegisteredAuthorization() { + var authorizationToken = UUID.randomUUID().toString(); + controlPlane.setAuthorizationToken(authorizationToken); + var controlPlaneRegistrationMessage = new ControlPlaneRegistrationMessage( + UUID.randomUUID().toString(), + controlPlane.consumerCallbackAddress(), + List.of(new TestAuthorization(authorizationToken)) + ); + dataPlane.registerControlPlane(controlPlaneRegistrationMessage).orElseThrow(RuntimeException::new); + + var transferType = "FileSystemAsync-PUSH"; + var processId = UUID.randomUUID().toString(); + var consumerProcessId = "consumer_" + processId; + var prepareMessage = createPrepareMessage(consumerProcessId, controlPlane.consumerCallbackAddress(), transferType); + + controlPlane.consumerPrepare(prepareMessage).statusCode(202).extract().as(DataFlowResponseMessage.class); + + var notifyPreparedResult = dataPlane.getById(consumerProcessId) + .compose(dataFlow -> dataPlane.notifyPrepared(consumerProcessId, Result::success)); + + assertThat(notifyPreparedResult.succeeded()).isTrue(); + } + + private @NonNull DataFlowPrepareMessage createPrepareMessage(String consumerProcessId, String callbackAddress, String transferType) { + return new DataFlowPrepareMessage("theMessageId", "theParticipantId", "theCounterPartyId", + "theDataspaceContext", consumerProcessId, "theAgreementId", "theDatasetId", callbackAddress, + transferType, emptyList(), emptyMap()); + } + + private static class TestAuthorization extends RawAuthorization { + + private final String type; + private String token; + + TestAuthorization() { + type = "test-authorization"; + } + + TestAuthorization(String token) { + this(); + this.token = token; + } + + @Override + public String getType() { + return type; + } + + public String getToken() { + return token; + } + + } +}