Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
import org.eclipse.dataplane.domain.registration.Authorization;
import org.eclipse.dataplane.domain.registration.AuthorizationType;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage;
import org.eclipse.dataplane.logic.OnCompleted;
Expand All @@ -37,6 +39,7 @@
import org.eclipse.dataplane.logic.OnTerminate;
import org.eclipse.dataplane.port.DataPlaneRegistrationApiController;
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
import org.eclipse.dataplane.port.exception.AuthorizationNotSupported;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
import org.eclipse.dataplane.port.store.ControlPlaneStore;
Expand All @@ -48,9 +51,12 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;

import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static java.util.Collections.emptyMap;
Expand All @@ -73,6 +79,7 @@ public class Dataplane {
private OnCompleted onCompleted = dataFlow -> Result.failure(new UnsupportedOperationException("onCompleted is not implemented"));

private final HttpClient httpClient = HttpClient.newHttpClient();
private final Map<String, AuthorizationType> authorizationTypes = new HashMap<>();

public static Builder newInstance() {
return new Builder();
Expand Down Expand Up @@ -229,7 +236,7 @@ public Result<Void> notifyCompleted(String dataFlowId) {
.compose(dataFlow -> {
dataFlow.transitionToCompleted();

return notifyControlPlane("completed", dataFlow, emptyMap()); // TODO DataFlowCompletedMessage not defined
return notifyControlPlane("completed", dataFlow, emptyMap());
});
}

Expand Down Expand Up @@ -300,13 +307,22 @@ private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object
return toJson(message)
.map(body -> {
var endpoint = dataFlow.callbackEndpointFor(action);
var request = HttpRequest.newBuilder()
var requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
.POST(HttpRequest.BodyPublishers.ofString(body));

var controlPlane = controlPlaneStore.findByEndpoint(dataFlow.getCallbackAddress());
if (controlPlane.succeeded()) {
var authorization = controlPlane.getContent().authorization();
if (authorization != null) {
var authorizationType = authorizationTypes.get(authorization.getType());
var castAuthorization = objectMapper.convertValue(authorization, authorizationType.authorizationClass());
authorizationType.authorizationFunction().accept(requestBuilder, castAuthorization);
}
}

return httpClient.send(request, HttpResponse.BodyHandlers.discarding());
return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding());
})
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
Expand All @@ -331,9 +347,16 @@ public ControlPlaneStore controlPlaneStore() {
}

public Result<Void> registerControlPlane(ControlPlaneRegistrationMessage message) {
for (var auth : message.authorization()) {
if (!authorizationTypes.containsKey(auth.getType())) {
return Result.failure(new AuthorizationNotSupported(auth));
}
}

var controlPlane = ControlPlane.newInstance()
.id(message.controlplaneId())
.endpoint(message.endpoint())
.authorization(message.authorization())
.build();

return controlPlaneStore.save(controlPlane);
Expand Down Expand Up @@ -406,5 +429,10 @@ public Builder onTerminate(OnTerminate onTerminate) {
dataplane.onTerminate = onTerminate;
return this;
}

public <T extends Authorization> Builder registerAuthorization(String type, Class<T> authorizationClass, BiConsumer<HttpRequest.Builder, T> authorizationFunction) {
dataplane.authorizationTypes.put(type, new AuthorizationType<>(type, authorizationClass, authorizationFunction));
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@

package org.eclipse.dataplane.domain.controlplane;

import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.eclipse.dataplane.domain.registration.Authorization;
import org.eclipse.dataplane.domain.registration.RawAuthorization;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class ControlPlane {

private String id;
private String endpoint;
private final List<RawAuthorization> authorizations = new ArrayList<>();

public String getId() {
return id;
Expand All @@ -33,6 +40,15 @@ public static ControlPlane.Builder newInstance() {
return new ControlPlane.Builder();
}

public List<RawAuthorization> getAuthorizations() {
return authorizations;
}

public Authorization authorization() {
return getAuthorizations().stream().findAny().orElse(null);
}

@JsonPOJOBuilder
public static class Builder {
private final ControlPlane controlPlane = new ControlPlane();

Expand All @@ -55,5 +71,10 @@ public Builder endpoint(String endpoint) {
controlPlane.endpoint = endpoint;
return this;
}

public Builder authorization(List<RawAuthorization> authorizations) {
controlPlane.authorizations.addAll(authorizations);
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.registration;

public interface Authorization {

String getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.registration;

import java.net.http.HttpRequest;
import java.util.function.BiConsumer;

public record AuthorizationType<T extends Authorization>(
String type,
Class<T> authorizationClass,
BiConsumer<HttpRequest.Builder, T> authorizationFunction // TODO: dedicated interface
) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@

package org.eclipse.dataplane.domain.registration;

import java.util.List;

import static java.util.Collections.emptyList;

public record ControlPlaneRegistrationMessage(
String controlplaneId,
String endpoint
// TODO: authorization
String endpoint,
List<RawAuthorization> authorization
) {
public ControlPlaneRegistrationMessage(String controlplaneId, String endpoint) {
this(controlplaneId, endpoint, emptyList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.registration;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;

import java.util.HashMap;
import java.util.Map;

public class RawAuthorization implements Authorization {

private final Map<String, Object> attributes = new HashMap<>();

@Override
public String getType() {
return attributes.get("type").toString();
}

@JsonAnyGetter
public Map<String, Object> getAttributes() {
return attributes;
}

@JsonAnySetter
public void setAttribute(String key, Object value) {
attributes.put(key, value);
}
}
7 changes: 7 additions & 0 deletions src/main/java/org/eclipse/dataplane/port/ExceptionMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package org.eclipse.dataplane.port;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.WebApplicationException;
import org.eclipse.dataplane.port.exception.AuthorizationNotSupported;
import org.eclipse.dataplane.port.exception.ResourceNotFoundException;

import java.util.function.Function;
Expand All @@ -26,6 +28,11 @@ public interface ExceptionMapper {
if (exception instanceof ResourceNotFoundException notFound) {
return new NotFoundException(notFound);
}

if (exception instanceof AuthorizationNotSupported authorizationNotSupported) {
return new BadRequestException(authorizationNotSupported);
}

return new WebApplicationException("unexpected internal server error");
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.port.exception;

import org.eclipse.dataplane.domain.registration.Authorization;

public class AuthorizationNotSupported extends Exception {

public AuthorizationNotSupported(Authorization authorization) {
super("Authorization type " + authorization.getType() + " not supported");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface ControlPlaneStore {
Result<ControlPlane> findById(String controlplaneId);

Result<Void> delete(String id);

Result<ControlPlane> findByEndpoint(String endpoint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class InMemoryControlPlaneStore implements ControlPlaneStore {

Expand All @@ -44,17 +45,12 @@ public Result<Void> save(ControlPlane controlPlane) {

@Override
public Result<ControlPlane> findById(String controlplaneId) {
var dataFlow = store.get(controlplaneId);
if (dataFlow == null) {
var json = store.get(controlplaneId);
if (json == null) {
return Result.failure(new ResourceNotFoundException("ControlPlane %s not found".formatted(controlplaneId)));
}

try {
var deserialized = objectMapper.readValue(dataFlow, ControlPlane.class);
return Result.success(deserialized);
} catch (JsonProcessingException e) {
return Result.failure(e);
}
return deserialize(json);
}

@Override
Expand All @@ -65,4 +61,21 @@ public Result<Void> delete(String id) {
}
return Result.success();
}

@Override
public Result<ControlPlane> findByEndpoint(String endpoint) {
return store.values().stream().map(this::deserialize).filter(Result::succeeded)
.map(Result::getContent).filter(it -> Objects.equals(endpoint, it.getEndpoint()))
.findAny().map(Result::success)
.orElseGet(() -> Result.failure(new ResourceNotFoundException("ControlPlane with endpoint %s not found".formatted(endpoint))));
}

private Result<ControlPlane> deserialize(String json) {
try {
var deserialized = objectMapper.readValue(json, ControlPlane.class);
return Result.success(deserialized);
} catch (JsonProcessingException e) {
return Result.failure(e);
}
}
}
Loading