diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/ConditionalRetry.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/ConditionalRetry.java index f7d6534a5..0d277c364 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/ConditionalRetry.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/ConditionalRetry.java @@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.functions.Func1; -import rx.functions.Func2; public class ConditionalRetry { @@ -41,32 +40,18 @@ public ConditionalRetry(Counter counter, String name) { public ConditionalRetry(Counter counter, String name, final int max) { this.counter = counter; this.name = name; - this.retryLogic = - new Func1, Observable>() { - @Override - public Observable call(Observable attempts) { - return attempts - .zipWith(Observable.range(1, max), new Func2() { - @Override - public Integer call(Throwable t1, Integer integer) { - return integer; - } - }) - .flatMap(new Func1>() { - @Override - public Observable call(Integer integer) { - if (errorRef.get() != null) - return Observable.error(errorRef.get()); - if (ConditionalRetry.this.counter != null) - ConditionalRetry.this.counter.increment(); - long delay = 2 * (integer > 10 ? 10 : integer); - logger.info(": retrying " + ConditionalRetry.this.name + - " after sleeping for " + delay + " secs"); - return Observable.timer(delay, TimeUnit.SECONDS); - } - }); - } - }; + this.retryLogic = attempts -> attempts + .zipWith(Observable.range(1, max), (t1, integer) -> integer) + .flatMap(integer -> { + if (errorRef.get() != null) + return Observable.error(errorRef.get()); + if (ConditionalRetry.this.counter != null) + ConditionalRetry.this.counter.increment(); + long delay = 2 * (integer > 10 ? 10 : integer); + logger.info(": retrying " + ConditionalRetry.this.name + + " after sleeping for " + delay + " secs"); + return Observable.timer(delay, TimeUnit.SECONDS); + }); } public void setErrorRef(Throwable error) { diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java index 641d341f8..bbf8aa873 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HttpUtility.java @@ -36,8 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; /* package */ class HttpUtility { @@ -49,74 +47,54 @@ static Observable getGetResponse(String host, int port, String uri) { return new CompositeHttpClientBuilder() .appendPipelineConfigurator( - new PipelineConfigurator, HttpClientRequest>() { - @Override - public void configureNewPipeline(ChannelPipeline pipeline) { - pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() { - private String uri = ""; + (PipelineConfigurator, HttpClientRequest>) pipeline -> { + pipeline.addLast("introspecting-handler", new ChannelDuplexHandler() { + private String uri = ""; - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - if (msg instanceof HttpRequest) { - HttpRequest request = (HttpRequest) msg; - uri = request.getUri(); - logger.info("Sending request on channel id: " + ctx.channel().toString() + - ", request URI: " + uri); - } - super.write(ctx, msg, promise); + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof HttpRequest) { + HttpRequest request = (HttpRequest) msg; + uri = request.uri(); + logger.info("Sending request on channel id: " + ctx.channel().toString() + + ", request URI: " + uri); } + super.write(ctx, msg, promise); + } - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof HttpResponse) { - logger.info("Received response on channel id: " + ctx.channel().toString() + - ", request URI: " + uri); - } - super.channelRead(ctx, msg); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof HttpResponse) { + logger.info("Received response on channel id: " + ctx.channel().toString() + + ", request URI: " + uri); } - }); - - try { - int maxContentLength = 10 * 1024 * 1024; // Ten megabytes - pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator", - new HttpObjectAggregator(maxContentLength)); - } catch (NoSuchElementException ex) { - logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}", - ex.getMessage(), ex); - } catch (IllegalArgumentException ex) { - logger.error("ChannelHandler named http-object-aggregator already existed in this" + - " pipeline. Error: {}", ex.getMessage(), ex); - } - catch (Throwable t) { - logger.error("Unknown error adding HttpObjectAggregator to Master Client " + - "Pipeline. Error: {}", t.getMessage(), t); + super.channelRead(ctx, msg); } - } + }); + try { + int maxContentLength = 10 * 1024 * 1024; // Ten megabytes + pipeline.replace(HttpObjectAggregator.class, "http-object-aggregator", + new HttpObjectAggregator(maxContentLength)); + } catch (NoSuchElementException ex) { + logger.error("HttpObjectAggregator did not exist in this pipeline. Error: {}", + ex.getMessage(), ex); + } catch (IllegalArgumentException ex) { + logger.error("ChannelHandler named http-object-aggregator already existed in this" + + " pipeline. Error: {}", ex.getMessage(), ex); + } catch (Throwable t) { + logger.error("Unknown error adding HttpObjectAggregator to Master Client " + + "Pipeline. Error: {}", t.getMessage(), t); + } }) .build() .submit(new RxClient.ServerInfo(host, port), HttpClientRequest.createGet(uri), new HttpClient.HttpClientConfig.Builder().setFollowRedirect(true).followRedirect(MAX_REDIRECTS).build()) - .flatMap(new Func1, Observable>() { - @Override - public Observable call(HttpClientResponse response) { - return response.getContent(); - } - }) - .map(new Func1() { - @Override - public String call(ByteBuf o) { - return o.toString(Charset.defaultCharset()); - } - }) - .doOnError(new Action1() { - @Override - public void call(Throwable throwable) { - logger.warn("Error: " + throwable.getMessage(), throwable); - } - }) + .flatMap(response -> response.getContent()) + .map(o -> o.toString(Charset.defaultCharset())) + .doOnError(throwable -> logger.warn("Error: " + throwable.getMessage(), throwable)) .timeout(GET_TIMEOUT_SECS, TimeUnit.SECONDS); } } diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java index 2d4283c3d..88c5d42cf 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterClientApi.java @@ -27,11 +27,16 @@ import io.mantisrx.runtime.MantisJobDefinition; import io.mantisrx.runtime.MantisJobState; import io.mantisrx.runtime.WorkerMigrationConfig; -import io.mantisrx.runtime.codec.JsonCodec; +import io.mantisrx.runtime.codec.JacksonCodecs; import io.mantisrx.runtime.descriptor.DeploymentStrategy; import io.mantisrx.runtime.descriptor.SchedulingInfo; import io.mantisrx.runtime.parameter.Parameter; -import io.mantisrx.server.core.*; +import io.mantisrx.server.core.JobAssignmentResult; +import io.mantisrx.server.core.JobScalerRuleInfo; +import io.mantisrx.server.core.JobSchedulingInfo; +import io.mantisrx.server.core.NamedJobInfo; +import io.mantisrx.server.core.PostJobStatusRequest; +import io.mantisrx.server.core.Status; import io.mantisrx.server.core.master.MasterDescription; import io.mantisrx.server.core.master.MasterMonitor; import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException; @@ -78,9 +83,6 @@ import rx.functions.Func2; -/** - * - */ public class MantisMasterClientApi implements MantisMasterGateway { static final String ConnectTimeoutSecsPropertyName = "MantisClientConnectTimeoutSecs"; @@ -136,10 +138,6 @@ public class MantisMasterClientApi implements MantisMasterGateway { }); private MasterMonitor masterMonitor; - /** - * - * @param masterMonitor - */ public MantisMasterClientApi(MasterMonitor masterMonitor) { this.masterMonitor = masterMonitor; masterEndpoint = masterMonitor.getMasterObservable() @@ -170,15 +168,6 @@ private String toUri(MasterDescription md, String path) { } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param schedulingInfo - * @return - */ public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -187,17 +176,6 @@ public Observable submitJob(final String name, final String v WorkerMigrationConfig.DEFAULT); } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param subscriptionTimeoutSecs - * @param schedulingInfo - * @param migrationConfig - * @return - */ public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -208,17 +186,6 @@ public Observable submitJob(final String name, final String v false, migrationConfig); } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param subscriptionTimeoutSecs - * @param schedulingInfo - * @return - */ - public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -228,17 +195,6 @@ public Observable submitJob(final String name, final String v false, WorkerMigrationConfig.DEFAULT); } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param subscriptionTimeoutSecs - * @param schedulingInfo - * @param readyForJobMaster - * @return - */ public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -249,18 +205,6 @@ public Observable submitJob(final String name, final String v readyForJobMaster, WorkerMigrationConfig.DEFAULT); } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param subscriptionTimeoutSecs - * @param schedulingInfo - * @param readyForJobMaster - * @param migrationConfig - * @return - */ public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -272,19 +216,6 @@ public Observable submitJob(final String name, final String v readyForJobMaster, migrationConfig, new LinkedList<>()); } - /** - * - * @param name - * @param version - * @param parameters - * @param jobSla - * @param subscriptionTimeoutSecs - * @param schedulingInfo - * @param readyForJobMaster - * @param migrationConfig - * @param labels - * @return - */ public Observable submitJob(final String name, final String version, final List parameters, final JobSla jobSla, @@ -320,12 +251,6 @@ public Observable submitJob(final String name, final String v } } - /** - * - * @param submitJobRequestJson - * @return - */ - public Observable submitJob(final String submitJobRequestJson) { return masterMonitor.getMasterObservable() .filter(masterDescription -> masterDescription != null) @@ -363,13 +288,6 @@ public Observable killJob(final String jobId) { "User requested"); } - /** - * - * @param jobId - * @param user - * @param reason - * @return - */ public Observable killJob(final String jobId, final String user, final String reason) { return masterMonitor.getMasterObservable() .filter(md -> md != null) @@ -394,14 +312,6 @@ public Observable killJob(final String jobId, final String user, final Str }); } - /** - * - * @param jobId - * @param stageNum - * @param numWorkers - * @param reason - * @return - */ public Observable scaleJobStage(final String jobId, final int stageNum, final int numWorkers, final String reason) { return masterMonitor @@ -423,14 +333,6 @@ public Observable scaleJobStage(final String jobId, final int stageNum, }); } - /** - * - * @param jobId - * @param user - * @param workerNum - * @param reason - * @return - */ public Observable resubmitJobWorker(final String jobId, final String user, final int workerNum, final String reason) { return masterMonitor.getMasterObservable() @@ -462,7 +364,7 @@ private Observable submitPostRequest(String uri, String post .withContent(postContent), new HttpClient.HttpClientConfig.Builder() .build()) - .map(b -> b.getStatus()); + .map(HttpClientResponse::getStatus); } private Observable getPostResponse(String uri, String postContent) { @@ -473,16 +375,10 @@ private Observable getPostResponse(String uri, String postContent) { .withContent(postContent), new HttpClient.HttpClientConfig.Builder() .build()) - .flatMap((Func1, Observable>) b -> b.getContent()) + .flatMap(HttpClientResponse::getContent) .map(o -> o.toString(Charset.defaultCharset())); } - /** - * - * @param jobName - * @return - */ - public Observable namedJobExists(final String jobName) { return masterMonitor.getMasterObservable() .filter(md -> md != null) @@ -507,11 +403,6 @@ public Observable namedJobExists(final String jobName) { ; } - /** - * - * @param jobId - * @return - */ public Observable getSinkStageNum(final String jobId) { return masterMonitor.getMasterObservable() .filter(masterDescription -> masterDescription != null) @@ -543,12 +434,6 @@ public Observable getSinkStageNum(final String jobId) { }); } - /** - * - * @param jobName - * @param state - * @return - */ // returns json array of job metadata public Observable getJobsOfNamedJob(final String jobName, final MantisJobState.MetaState state) { return masterMonitor.getMasterObservable() @@ -672,11 +557,6 @@ private WebSocketClient getRxnettyWebSoc .build(); } - /** - * - * @param jobId - * @return - */ public Observable getJobStatusObservable(final String jobId) { return masterMonitor.getMasterObservable() .filter((md) -> md != null) @@ -689,11 +569,6 @@ public Observable getJobStatusObservable(final String jobId) { .onErrorResumeNext(Observable.empty()); } - /** - * - * @param jobId - * @return - */ public Observable schedulingChanges(final String jobId) { final ConditionalRetry retryObject = new ConditionalRetry(null, "assignmentresults_" + jobId); return masterMonitor.getMasterObservable() @@ -792,11 +667,6 @@ public Observable jobScalerRulesStream(final String jobId) { ; } - /** - * - * @param jobName - * @return - */ public Observable namedJobInfo(final String jobName) { return masterMonitor.getMasterObservable() .filter(masterDescription -> masterDescription != null) @@ -827,17 +697,12 @@ public Observable namedJobInfo(final String jobName) { } - /** - * - * @param jobId - * @return - */ public Observable assignmentResults(String jobId) { ConnectToObservable.Builder connectionBuilder = new ConnectToObservable.Builder() .subscribeAttempts(subscribeAttemptsToMaster) .name("/v1/api/master/assignmentresults") - .decoder(new JsonCodec(JobAssignmentResult.class)); + .decoder(JacksonCodecs.pojo(JobAssignmentResult.class)); if (jobId != null && !jobId.isEmpty()) { Map subscriptionParams = new HashMap<>(); subscriptionParams.put("jobId", jobId); diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterGateway.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterGateway.java index f5d341c17..03a892ee6 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterGateway.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MantisMasterGateway.java @@ -22,7 +22,11 @@ import io.mantisrx.runtime.WorkerMigrationConfig; import io.mantisrx.runtime.descriptor.SchedulingInfo; import io.mantisrx.runtime.parameter.Parameter; -import io.mantisrx.server.core.*; +import io.mantisrx.server.core.JobAssignmentResult; +import io.mantisrx.server.core.JobScalerRuleInfo; +import io.mantisrx.server.core.JobSchedulingInfo; +import io.mantisrx.server.core.NamedJobInfo; +import io.mantisrx.server.core.Status; import java.util.List; import java.util.concurrent.CompletableFuture; diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MasterClientWrapper.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MasterClientWrapper.java index 1c941e52c..4a14462b2 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MasterClientWrapper.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/MasterClientWrapper.java @@ -39,9 +39,6 @@ import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observer; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; import rx.subjects.PublishSubject; public class MasterClientWrapper { @@ -86,21 +83,14 @@ public static void main(String[] args) throws InterruptedException { Services.startAndWait(haServices); MasterClientWrapper clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi()); clientWrapper.getMasterClientApi() - .flatMap(new Func1>() { - @Override - public Observable call(MantisMasterGateway mantisMasterClientApi) { - Integer sinkStage = null; - return mantisMasterClientApi.getSinkStageNum(jobId) + .flatMap(mantisMasterClientApi -> + mantisMasterClientApi.getSinkStageNum(jobId) .take(1) // only need to figure out sink stage number once - .flatMap(new Func1>() { - @Override - public Observable call(Integer integer) { - logger.info("Getting sink locations for " + jobId); - return clientWrapper.getSinkLocations(jobId, integer, 0, 0); - } - }); - } - }).toBlocking().subscribe((ep) -> { + .flatMap(integer -> { + logger.info("Getting sink locations for " + jobId); + return clientWrapper.getSinkLocations(jobId, integer, 0, 0); + }) + ).toBlocking().subscribe((ep) -> { System.out.println("Endpoint Change -> " + ep); }); Thread.sleep(50000); @@ -149,19 +139,9 @@ private List getAllNonJobMasterEndpoints(final String jobId, final Map Endpoint ep = new WorkerEndpoint(getWrappedHost(host.getHost(), host.getWorkerNumber()), host.getPort().get(0), stageNum, host.getMetricsPort(), host.getWorkerIndex(), host.getWorkerNumber(), // completed callback - new Action0() { - @Override - public void call() { - logger.info("job " + jobId + " WorkerIndex " + workerIndex + " completed"); - } - }, + () -> logger.info("job " + jobId + " WorkerIndex " + workerIndex + " completed"), // error callback - new Action1() { - @Override - public void call(Throwable t1) { - logger.info("job " + jobId + " WorkerIndex " + workerIndex + " failed"); - } - } + t1 -> logger.info("job " + jobId + " WorkerIndex " + workerIndex + " failed") ); endpoints.add(ep); } @@ -176,38 +156,15 @@ public Observable getAllWorkerMetricLocations(final String jobId Observable> schedulingUpdates = masterClientApi .schedulingChanges(jobId) - .doOnError(new Action1() { - @Override - public void call(Throwable throwable) { - logger.warn("Error on scheduling changes observable: " + throwable); - } - }) + .doOnError(throwable -> logger.warn("Error on scheduling changes observable: " + throwable)) .retryWhen(schedInfoRetry.getRetryLogic()) - .map(new Func1>() { - @Override - public Map call(JobSchedulingInfo jobSchedulingInfo) { - logger.info("Got scheduling info for " + jobId); - return jobSchedulingInfo.getWorkerAssignments(); - } + .map(jobSchedulingInfo -> { + logger.info("Got scheduling info for " + jobId); + return jobSchedulingInfo.getWorkerAssignments(); }) - .filter(new Func1, Boolean>() { - @Override - public Boolean call(Map workerAssignments) { - return workerAssignments != null; - } - }) - .map(new Func1, List>() { - @Override - public List call(Map workerAssignments) { - return getAllNonJobMasterEndpoints(jobId, workerAssignments); - } - }) - .doOnError(new Action1() { - @Override - public void call(Throwable throwable) { - logger.error(throwable.getMessage(), throwable); - } - }); + .filter(workerAssignments -> workerAssignments != null) + .map(workerAssignments -> getAllNonJobMasterEndpoints(jobId, workerAssignments)) + .doOnError(throwable -> logger.error(throwable.getMessage(), throwable)); return (new ToDeltaEndpointInjector(schedulingUpdates)).deltas(); }