Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = "org.eclipse.dataplane-core"
version = "0.0.7-SNAPSHOT"
version = "0.0.8-SNAPSHOT"

repositories {
mavenCentral()
Expand Down
42 changes: 29 additions & 13 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.eclipse.dataplane.port.DataPlaneRegistrationApiController;
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
import org.eclipse.dataplane.port.exception.AuthorizationNotSupported;
import org.eclipse.dataplane.port.exception.ControlPlaneNotRegistered;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
import org.eclipse.dataplane.port.exception.ResourceNotFoundException;
import org.eclipse.dataplane.port.store.ControlPlaneStore;
import org.eclipse.dataplane.port.store.DataFlowStore;
import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore;
Expand All @@ -57,6 +59,7 @@
import java.util.UUID;

import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION;
import static java.util.Collections.emptyMap;

public class Dataplane {
Expand Down Expand Up @@ -84,7 +87,7 @@ public static Builder newInstance() {
}

public DataPlaneSignalingApiController controller() {
return new DataPlaneSignalingApiController(this);
return new DataPlaneSignalingApiController(this, authorizations);
}

public DataPlaneRegistrationApiController registrationController() {
Expand All @@ -104,7 +107,14 @@ public Result<DataFlowStatusResponseMessage> status(String dataFlowId) {
.map(f -> new DataFlowStatusResponseMessage(f.getId(), f.getState().name()));
}

public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
private Result<Void> checkControlPlane(String controlplaneId) {
if (controlPlaneStore.exists(controlplaneId)) {
return Result.success();
}
return Result.failure(new ControlPlaneNotRegistered(controlplaneId));
}

public Result<DataFlowResponseMessage> prepare(String controlplaneId, DataFlowPrepareMessage message) {
var initialDataFlow = DataFlow.newInstance()
.id(message.processId())
.state(DataFlow.State.INITIATING)
Expand All @@ -117,9 +127,11 @@ public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
.participantId(message.participantId())
.counterPartyId(message.counterPartyId())
.dataspaceContext(message.dataspaceContext())
.controlplaneId(controlplaneId)
.build();

return onPrepare.action(initialDataFlow)
return checkControlPlane(controlplaneId)
.compose(v -> onPrepare.action(initialDataFlow))
.compose(dataFlow -> {
if (dataFlow.isInitiating()) {
dataFlow.transitionToPrepared();
Expand All @@ -137,7 +149,7 @@ public Result<DataFlowResponseMessage> prepare(DataFlowPrepareMessage message) {
}


public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
public Result<DataFlowResponseMessage> start(String controlplaneId, DataFlowStartMessage message) {
var initialDataFlow = DataFlow.newInstance()
.id(message.processId())
.state(DataFlow.State.INITIATING)
Expand All @@ -149,9 +161,11 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
.participantId(message.participantId())
.counterPartyId(message.counterPartyId())
.dataspaceContext(message.dataspaceContext())
.controlplaneId(controlplaneId)
.build();

return onStart.action(initialDataFlow)
return checkControlPlane(controlplaneId)
.compose(v -> onStart.action(initialDataFlow))
.compose(dataFlow -> {
if (dataFlow.isInitiating()) {
dataFlow.transitionToStarted();
Expand Down Expand Up @@ -310,14 +324,16 @@ private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body));

var controlPlane = controlPlaneStore.findByEndpoint(dataFlow.getCallbackAddress());
if (controlPlane.succeeded()) {
var authorizationProfile = controlPlane.getContent().authorization();
if (authorizationProfile != null) {
var authorization = authorizations.get(authorizationProfile.getType());
authorization.apply(requestBuilder, authorizationProfile);
}
}
controlPlaneStore.findById(dataFlow.getControlplaneId())
.compose(controlPlane -> {
var authorizationProfile = controlPlane.authorization();
if (authorizationProfile != null) {
var authorization = authorizations.get(authorizationProfile.getType());
return authorization.authorizationHeader(authorizationProfile);
}
return Result.failure(new ResourceNotFoundException("ControlPlane has no authorization"));
})
.onSuccess(authorizationHeader -> requestBuilder.header(AUTHORIZATION, authorizationHeader));

return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding());
})
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/eclipse/dataplane/domain/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.dataplane.domain;

import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.function.Function;

public abstract class Result<C> {
Expand Down Expand Up @@ -52,6 +53,10 @@ public static <R> Result<R> attempt(ExceptionThrowingSupplier<R> resultSupplier)

public abstract <T> Result<T> compose(ExceptionThrowingFunction<C, Result<T>> transformValue);

public abstract Result<C> onSuccess(Consumer<C> onSuccessDo);

public abstract Result<C> onFailure(Consumer<Exception> onFailureDo);

public boolean succeeded() {
return this instanceof Result.Success<C>;
}
Expand Down Expand Up @@ -101,6 +106,17 @@ public <T> Result<T> compose(ExceptionThrowingFunction<C, Result<T>> transformVa
return Result.failure(e);
}
}

@Override
public Result<C> onSuccess(Consumer<C> onSuccessDo) {
onSuccessDo.accept(content);
return this;
}

@Override
public Result<C> onFailure(Consumer<Exception> onFailureDo) {
return this;
}
}

private static class Failure<C> extends Result<C> {
Expand Down Expand Up @@ -140,6 +156,17 @@ public <T> Result<T> map(ExceptionThrowingFunction<C, T> transformValue) {
public <T> Result<T> compose(ExceptionThrowingFunction<C, Result<T>> transformValue) {
return Result.failure(this.exception);
}

@Override
public Result<C> onSuccess(Consumer<C> onSuccessDo) {
return this;
}

@Override
public Result<C> onFailure(Consumer<Exception> onFailureDo) {
onFailureDo.accept(exception);
return this;
}
}

@FunctionalInterface
Expand All @@ -151,4 +178,5 @@ public interface ExceptionThrowingFunction<T, R> {
public interface ExceptionThrowingSupplier<T> {
T get() throws Exception;
}

}
10 changes: 10 additions & 0 deletions src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DataFlow {
private List<String> labels;
private Map<String, Object> metadata;
private DataAddress dataAddress;
private String controlplaneId;

public static DataFlow.Builder newInstance() {
return new Builder();
Expand Down Expand Up @@ -158,6 +159,10 @@ public URI callbackEndpointFor(String action) {
return URI.create(getCallbackAddress() + "/transfers/" + getId() + "/dataflow/" + action);
}

public String getControlplaneId() {
return controlplaneId;
}

public static class Builder {
private final DataFlow dataFlow = new DataFlow();

Expand Down Expand Up @@ -234,6 +239,11 @@ public Builder metadata(Map<String, Object> metadata) {
dataFlow.metadata = metadata;
return this;
}

public Builder controlplaneId(String controlplaneId) {
dataFlow.controlplaneId = controlplaneId;
return this;
}
}

public enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.eclipse.dataplane.domain.registration;

import java.net.http.HttpRequest;
import org.eclipse.dataplane.domain.Result;

/**
* Defines structure for an authorization profile.
Expand All @@ -30,6 +30,7 @@ public interface Authorization {
* Function that applies the authorization profile to the request builder.
* e.g. the Authorization header could be added with proper content.
*/
HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile);
Result<String> authorizationHeader(AuthorizationProfile profile);

Result<String> extractCallerId(String authorizationHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.eclipse.dataplane.domain.registration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nimbusds.jwt.SignedJWT;
import org.eclipse.dataplane.domain.Result;

import java.net.URI;
import java.net.URLEncoder;
Expand All @@ -25,7 +27,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION;
import static jakarta.ws.rs.core.MediaType.APPLICATION_FORM_URLENCODED;

public class Oauth2ClientCredentialsAuthorization implements Authorization {
Expand All @@ -39,7 +40,7 @@ public String type() {
}

@Override
public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, AuthorizationProfile profile) {
public Result<String> authorizationHeader(AuthorizationProfile profile) {
var tokenEndpoint = profile.stringAttribute("tokenEndpoint");

var parameters = Map.of(
Expand All @@ -63,10 +64,25 @@ public HttpRequest.Builder apply(HttpRequest.Builder requestBuilder, Authorizati
var response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
var body = response.body();
var accessToken = objectMapper.readValue(body, Map.class).get("access_token").toString();
return requestBuilder.header(AUTHORIZATION, "Bearer " + accessToken);
return Result.success("Bearer " + accessToken);
} catch (Exception e) {
throw new RuntimeException(e);
return Result.failure(e);
}

}

@Override
public Result<String> extractCallerId(String authorizationHeader) {
try {
var token = authorizationHeader.substring("Bearer ".length());
var jwt = SignedJWT.parse(token);
var sub = jwt.getJWTClaimsSet().getClaims().get("sub");
if (sub instanceof String callerId) {
return Result.success(callerId);
}
return Result.failure(new RuntimeException("JWT sub claim %s is not a string".formatted(sub)));
} catch (Exception e) {
return Result.failure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotAuthorizedException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response;
import org.eclipse.dataplane.Dataplane;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStartedNotificationMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
import org.eclipse.dataplane.domain.registration.Authorization;

import java.util.Map;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.MediaType.WILDCARD;
Expand All @@ -39,15 +46,20 @@
public class DataPlaneSignalingApiController {

private final Dataplane dataplane;
private final Map<String, Authorization> authorizations;

public DataPlaneSignalingApiController(Dataplane dataplane) {
public DataPlaneSignalingApiController(Dataplane dataplane, Map<String, Authorization> authorizations) {
this.dataplane = dataplane;
this.authorizations = authorizations;
}

@POST
@Path("/prepare")
public Response prepare(DataFlowPrepareMessage message) {
var response = dataplane.prepare(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
public Response prepare(DataFlowPrepareMessage message, @Context ContainerRequestContext requestContext) {
var response = extractControlplaneId(requestContext)
.compose(controlplaneId -> dataplane.prepare(controlplaneId, message))
.orElseThrow(ExceptionMapper.MAP_TO_WSRS);

if (response.state().equals(DataFlow.State.PREPARING.name())) {
return Response.accepted(response).build();
}
Expand All @@ -56,8 +68,11 @@ public Response prepare(DataFlowPrepareMessage message) {

@POST
@Path("/start")
public Response start(DataFlowStartMessage message) {
var response = dataplane.start(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
public Response start(DataFlowStartMessage message, @Context ContainerRequestContext requestContext) {
var response = extractControlplaneId(requestContext)
.compose(controlplaneId -> dataplane.start(controlplaneId, message))
.orElseThrow(ExceptionMapper.MAP_TO_WSRS);

if (response.state().equals(DataFlow.State.STARTING.name())) {
return Response.accepted(response).build();
}
Expand Down Expand Up @@ -99,4 +114,15 @@ public DataFlowStatusResponseMessage status(@PathParam("flowId") String flowId)
return dataplane.status(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
}

private Result<String> extractControlplaneId(ContainerRequestContext requestContext) {
var authorizationHeader = requestContext.getHeaderString("Authorization");
if (authorizationHeader == null) {
return Result.failure(new NotAuthorizedException("Authorization header missing"));
}
return authorizations.values().stream()
.map(authorization -> authorization.extractCallerId(authorizationHeader))
.filter(Result::succeeded).findFirst()
.orElseGet(() -> Result.failure(new NotAuthorizedException("Authorization method not recognized")));
}

}
14 changes: 12 additions & 2 deletions src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,32 @@
package org.eclipse.dataplane.port;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.NotAuthorizedException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.WebApplicationException;
import org.eclipse.dataplane.port.exception.AuthorizationNotSupported;
import org.eclipse.dataplane.port.exception.ControlPlaneNotRegistered;
import org.eclipse.dataplane.port.exception.ResourceNotFoundException;

import java.util.function.Function;

public interface ExceptionMapper {

Function<Exception, WebApplicationException> MAP_TO_WSRS = exception -> {
if (exception instanceof WebApplicationException webApplicationException) {
return webApplicationException;
}

if (exception instanceof ResourceNotFoundException notFound) {
return new NotFoundException(notFound);
}

if (exception instanceof AuthorizationNotSupported authorizationNotSupported) {
return new BadRequestException(authorizationNotSupported);
if (exception instanceof ControlPlaneNotRegistered controlPlaneNotRegistered) {
return new NotAuthorizedException(controlPlaneNotRegistered);
}

if (exception instanceof AuthorizationNotSupported) {
return new BadRequestException(exception);
}

return new WebApplicationException("unexpected internal server error");
Expand Down
Loading