Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
import org.eclipse.dataplane.domain.dataflow.DataFlowPrepareMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowResponseMessage;
Expand All @@ -26,17 +27,21 @@
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;
import org.eclipse.dataplane.domain.registration.DataPlaneRegistrationMessage;
import org.eclipse.dataplane.logic.OnCompleted;
import org.eclipse.dataplane.logic.OnPrepare;
import org.eclipse.dataplane.logic.OnStart;
import org.eclipse.dataplane.logic.OnStarted;
import org.eclipse.dataplane.logic.OnSuspend;
import org.eclipse.dataplane.logic.OnTerminate;
import org.eclipse.dataplane.port.DataPlaneRegistrationApiController;
import org.eclipse.dataplane.port.DataPlaneSignalingApiController;
import org.eclipse.dataplane.port.exception.DataFlowNotifyControlPlaneFailed;
import org.eclipse.dataplane.port.exception.DataplaneNotRegistered;
import org.eclipse.dataplane.port.store.ControlPlaneStore;
import org.eclipse.dataplane.port.store.DataFlowStore;
import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore;
import org.eclipse.dataplane.port.store.InMemoryDataFlowStore;

import java.net.URI;
Expand All @@ -53,7 +58,8 @@
public class Dataplane {

private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
private final DataFlowStore store = new InMemoryDataFlowStore(objectMapper);
private final DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper);
private final ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper);
private String id;
private String endpoint;
private final Set<String> transferTypes = new HashSet<>();
Expand All @@ -76,16 +82,20 @@ public DataPlaneSignalingApiController controller() {
return new DataPlaneSignalingApiController(this);
}

public DataPlaneRegistrationApiController registrationController() {
return new DataPlaneRegistrationApiController(this);
}

public Result<DataFlow> getById(String dataFlowId) {
return store.findById(dataFlowId);
return dataFlowStore.findById(dataFlowId);
}

public Result<Void> save(DataFlow dataFlow) {
return store.save(dataFlow);
return dataFlowStore.save(dataFlow);
}

public Result<DataFlowStatusResponseMessage> status(String dataFlowId) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.map(f -> new DataFlowStatusResponseMessage(f.getId(), f.getState().name()));
}

Expand Down Expand Up @@ -153,24 +163,24 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage message) {
}

public Result<Void> suspend(String flowId, DataFlowSuspendMessage message) {
return store.findById(flowId)
return dataFlowStore.findById(flowId)
.map(dataFlow -> {
dataFlow.transitionToSuspended(message.reason());
return dataFlow;
})
.compose(onSuspend::action)
.compose(store::save)
.compose(dataFlowStore::save)
.map(it -> null);
}

public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage message) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.map(dataFlow -> {
dataFlow.transitionToTerminated(message.reason());
return dataFlow;
})
.compose(onTerminate::action)
.compose(store::save)
.compose(dataFlowStore::save)
.map(it -> null);
}

Expand All @@ -180,7 +190,7 @@ public Result<Void> terminate(String dataFlowId, DataFlowTerminateMessage messag
* @param dataFlowId the data flow id.
*/
public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.compose(onPrepare::action)
.compose(dataFlow -> {
dataFlow.transitionToPrepared();
Expand All @@ -197,7 +207,7 @@ public Result<Void> notifyPrepared(String dataFlowId, OnPrepare onPrepare) {
* @param dataFlowId the data flow id.
*/
public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.compose(onStart::action)
.compose(dataFlow -> {
dataFlow.transitionToStarted();
Expand All @@ -215,7 +225,7 @@ public Result<Void> notifyStarted(String dataFlowId, OnStart onStart) {
* @param dataFlowId id of the data flow
*/
public Result<Void> notifyCompleted(String dataFlowId) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.compose(dataFlow -> {
dataFlow.transitionToCompleted();

Expand All @@ -230,7 +240,7 @@ public Result<Void> notifyCompleted(String dataFlowId) {
* @param throwable the error
*/
public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
return store.findById(dataFlowId)
return dataFlowStore.findById(dataFlowId)
.compose(dataFlow -> {
dataFlow.transitionToTerminated(throwable.getMessage());

Expand All @@ -239,7 +249,7 @@ public Result<Void> notifyErrored(String dataFlowId, Throwable throwable) {
}

public Result<Void> started(String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
return store.findById(flowId)
return dataFlowStore.findById(flowId)
.map(dataFlow -> {
dataFlow.setDataAddress(startedNotificationMessage.dataAddress());
return dataFlow;
Expand All @@ -258,7 +268,7 @@ public Result<Void> started(String flowId, DataFlowStartedNotificationMessage st
* @return result indicating whether data flow was completed successfully
*/
public Result<Void> completed(String flowId) {
return store.findById(flowId).compose(onCompleted::action)
return dataFlowStore.findById(flowId).compose(onCompleted::action)
.compose(dataFlow -> {
dataFlow.transitionToCompleted();
return save(dataFlow);
Expand Down Expand Up @@ -316,6 +326,23 @@ private Result<String> toJson(Object message) {
}
}

public ControlPlaneStore controlPlaneStore() {
return controlPlaneStore;
}

public Result<Void> registerControlPlane(ControlPlaneRegistrationMessage message) {
var controlPlane = ControlPlane.newInstance()
.id(message.controlplaneId())
.endpoint(message.endpoint())
.build();

return controlPlaneStore.save(controlPlane);
}

public Result<Void> deleteControlPlane(String id) {
return controlPlaneStore.delete(id);
}

public static class Builder {

private final Dataplane dataplane = new Dataplane();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.controlplane;

import java.util.Objects;

public class ControlPlane {

private String id;
private String endpoint;

public String getId() {
return id;
}

public String getEndpoint() {
return endpoint;
}

public static ControlPlane.Builder newInstance() {
return new ControlPlane.Builder();
}

public static class Builder {
private final ControlPlane controlPlane = new ControlPlane();

private Builder() {

}

public ControlPlane build() {
Objects.requireNonNull(controlPlane.id);

return controlPlane;
}

public Builder id(String id) {
controlPlane.id = id;
return this;
}

public Builder endpoint(String endpoint) {
controlPlane.endpoint = endpoint;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2026 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.domain.registration;

public record ControlPlaneRegistrationMessage(
String controlplaneId,
String endpoint
// TODO: authorization
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2025 Think-it GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
*
*/

package org.eclipse.dataplane.port;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Response;
import org.eclipse.dataplane.Dataplane;
import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;

@Path("/v1/controlplanes")
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
public class DataPlaneRegistrationApiController {

private final Dataplane dataplane;

public DataPlaneRegistrationApiController(Dataplane dataplane) {
this.dataplane = dataplane;
}

@PUT
@Path("/")
public Response register(ControlPlaneRegistrationMessage message) {
dataplane.registerControlPlane(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.ok().build();
}

@DELETE
@Path("/{id}")
public Response delete(@PathParam("id") String id) {
dataplane.deleteControlPlane(id).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.noContent().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;
import org.eclipse.dataplane.Dataplane;
import org.eclipse.dataplane.domain.dataflow.DataFlow;
Expand All @@ -31,7 +29,6 @@
import org.eclipse.dataplane.domain.dataflow.DataFlowStatusResponseMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowSuspendMessage;
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;
import org.eclipse.dataplane.port.exception.DataFlowNotFoundException;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static jakarta.ws.rs.core.MediaType.WILDCARD;
Expand All @@ -50,7 +47,7 @@ public DataPlaneSignalingApiController(Dataplane dataplane) {
@POST
@Path("/prepare")
public Response prepare(DataFlowPrepareMessage message) {
var response = dataplane.prepare(message).orElseThrow(this::mapToWsRsException);
var response = dataplane.prepare(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
if (response.state().equals(DataFlow.State.PREPARING.name())) {
return Response.accepted(response).build();
}
Expand All @@ -60,7 +57,7 @@ public Response prepare(DataFlowPrepareMessage message) {
@POST
@Path("/start")
public Response start(DataFlowStartMessage message) {
var response = dataplane.start(message).orElseThrow(this::mapToWsRsException);
var response = dataplane.start(message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
if (response.state().equals(DataFlow.State.STARTING.name())) {
return Response.accepted(response).build();
}
Expand All @@ -70,43 +67,36 @@ public Response start(DataFlowStartMessage message) {
@POST
@Path("/{flowId}/suspend")
public Response suspend(@PathParam("flowId") String flowId, DataFlowSuspendMessage message) {
dataplane.suspend(flowId, message).orElseThrow(this::mapToWsRsException);
dataplane.suspend(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.ok().build();
}

@POST
@Path("/{flowId}/terminate")
public Response terminate(@PathParam("flowId") String flowId, DataFlowTerminateMessage message) {
dataplane.terminate(flowId, message).orElseThrow(this::mapToWsRsException);
dataplane.terminate(flowId, message).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.ok().build();
}

@POST
@Path("/{flowId}/started")
public Response started(@PathParam("flowId") String flowId, DataFlowStartedNotificationMessage startedNotificationMessage) {
dataplane.started(flowId, startedNotificationMessage).orElseThrow(this::mapToWsRsException);
dataplane.started(flowId, startedNotificationMessage).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.ok().build();
}

@POST
@Path("/{flowId}/completed")
@Consumes(WILDCARD)
public Response completed(@PathParam("flowId") String flowId) {
dataplane.completed(flowId).orElseThrow(this::mapToWsRsException);
dataplane.completed(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
return Response.ok().build();
}

@GET
@Path("/{flowId}/status")
public DataFlowStatusResponseMessage status(@PathParam("flowId") String flowId) {
return dataplane.status(flowId).orElseThrow(this::mapToWsRsException);
}

private WebApplicationException mapToWsRsException(Exception exception) {
if (exception instanceof DataFlowNotFoundException notFound) {
return new NotFoundException(notFound);
}
return new WebApplicationException("unexpected internal server error");
return dataplane.status(flowId).orElseThrow(ExceptionMapper.MAP_TO_WSRS);
}

}
Loading