diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerActor.java index 0eb47d4f7..1b4ea2754 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerActor.java @@ -1,9 +1,12 @@ package io.mantisrx.master.resourcecluster; +import static akka.pattern.Patterns.pipe; + import akka.actor.AbstractActorWithTimers; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; +import akka.dispatch.MessageDispatcher; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.TagList; import io.mantisrx.common.Ack; @@ -109,6 +112,27 @@ @Slf4j public class ExecutorStateManagerActor extends AbstractActorWithTimers { + @Value + static class LoadDisabledTaskExecutorsResponse { + List requests; + int attempt; + } + + @Value + static class LoadDisabledTaskExecutorsFailure { + int attempt; + Throwable cause; + } + + @Value + static class RetryLoadDisabledTaskExecutors { + int attempt; + } + + static final int MAX_INIT_LOAD_RETRIES = 5; + static final Duration INIT_LOAD_RETRY_DELAY = Duration.ofSeconds(5); + static final String BLOCKING_IO_DISPATCHER = "akka.actor.default-blocking-io-dispatcher"; + @Value static class ExpireDisableTaskExecutorsRequest { DisableTaskExecutorsRequest request; @@ -337,11 +361,11 @@ public static Props props( @Override public void preStart() throws Exception { super.preStart(); - List activeRequests = - mantisJobStore.loadAllDisableTaskExecutorsRequests(clusterID); - for (DisableTaskExecutorsRequest request : activeRequests) { - onNewDisableTaskExecutorsRequest(request); - } + // Load disabled task executors asynchronously on the blocking-io-dispatcher + // to avoid blocking both preStart() and the actor thread. + // A blocking call in preStart() throws ActorInitializationException on timeout, + // causing permanent actor death with no restart. + startAsyncLoadDisabledTaskExecutors(0); timers().startTimerWithFixedDelay( String.format("periodic-disabled-task-executors-test-for-%s", clusterID.getResourceID()), @@ -349,6 +373,23 @@ public void preStart() throws Exception { disabledTaskExecutorsCheckInterval); } + private void startAsyncLoadDisabledTaskExecutors(int attempt) { + MessageDispatcher ioDispatcher = getContext().getSystem().dispatchers().lookup(BLOCKING_IO_DISPATCHER); + CompletableFuture future = CompletableFuture.supplyAsync( + () -> { + try { + List requests = + mantisJobStore.loadAllDisableTaskExecutorsRequests(clusterID); + return (Object) new LoadDisabledTaskExecutorsResponse(requests, attempt); + } catch (IOException e) { + throw new java.util.concurrent.CompletionException(e); + } + }, + ioDispatcher) + .exceptionally(t -> new LoadDisabledTaskExecutorsFailure(attempt, t)); + pipe(future, getContext().getDispatcher()).to(self()); + } + @Override public Receive createReceive() { return receiveBuilder() @@ -387,6 +428,9 @@ public Receive createReceive() { .match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry) .match(UpdateJobArtifactsToCache.class, this::onUpdateJobArtifactsToCache) .match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest) + .match(LoadDisabledTaskExecutorsResponse.class, this::onLoadDisabledTaskExecutorsSuccess) + .match(LoadDisabledTaskExecutorsFailure.class, this::onLoadDisabledTaskExecutorsFailure) + .match(RetryLoadDisabledTaskExecutors.class, req -> startAsyncLoadDisabledTaskExecutors(req.getAttempt())) .match(Ack.class, ack -> log.debug("Received ack from {}", sender())) .build(); } @@ -1003,6 +1047,34 @@ private void onUpdateJobArtifactsToCache(UpdateJobArtifactsToCache update) { this.jobArtifactsToCache.addAll(update.getArtifacts()); } + private void onLoadDisabledTaskExecutorsSuccess(LoadDisabledTaskExecutorsResponse response) { + log.info("Loaded {} disabled task executor requests for cluster {} (attempt {})", + response.getRequests().size(), clusterID, response.getAttempt() + 1); + for (DisableTaskExecutorsRequest disableRequest : response.getRequests()) { + restoreDisableTaskExecutorsRequest(disableRequest); + } + } + + private void onLoadDisabledTaskExecutorsFailure(LoadDisabledTaskExecutorsFailure failure) { + metrics.incrementCounter( + ResourceClusterActorMetrics.EXECUTOR_STATE_MANAGER_INIT_FAILURE, + TagList.create(ImmutableMap.of("resourceCluster", clusterID.getResourceID()))); + int nextAttempt = failure.getAttempt() + 1; + if (nextAttempt <= MAX_INIT_LOAD_RETRIES) { + log.error("Failed to load disabled task executors for cluster {} (attempt {}/{}), scheduling retry", + clusterID, failure.getAttempt() + 1, MAX_INIT_LOAD_RETRIES, failure.getCause()); + // Schedule a timer that re-triggers the async load on the io-dispatcher. + timers().startSingleTimer( + "load-disabled-te-retry-" + clusterID.getResourceID(), + new RetryLoadDisabledTaskExecutors(nextAttempt), + INIT_LOAD_RETRY_DELAY); + } else { + log.error("Exhausted retries loading disabled task executors for cluster {} after {} attempts. " + + "Relying on periodic CheckDisabledTaskExecutors for eventual consistency.", + clusterID, MAX_INIT_LOAD_RETRIES, failure.getCause()); + } + } + // custom equals function to check if the existing set already has the request under consideration. private boolean addNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest newRequest) { if (newRequest.isRequestByAttributes()) { @@ -1016,19 +1088,34 @@ private boolean addNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest ne Preconditions.checkState(activeDisableTaskExecutorsByAttributesRequests.add(newRequest), "activeDisableTaskExecutorRequests cannot contain %s", newRequest); return true; } else if (newRequest.getTaskExecutorID().isPresent() && !disabledTaskExecutors.contains(newRequest.getTaskExecutorID().get())) { - log.info("Req with id {}", newRequest); + log.debug("DisableTaskExecutorsRequest with id {}", newRequest); disabledTaskExecutors.add(newRequest.getTaskExecutorID().get()); return true; } - log.info("No Req {}", newRequest); + log.debug("skip DisableTaskExecutorsRequest Req {}", newRequest); return false; } + /** + * Restore a disable request loaded from the store into in-memory state. + * Skips the store write since the request already exists in persistence. + */ + private void restoreDisableTaskExecutorsRequest(DisableTaskExecutorsRequest request) { + if (addNewDisableTaskExecutorsRequest(request)) { + Duration toExpiry = Comparators.max(Duration.between(clock.instant(), request.getExpiry()), Duration.ZERO); + getTimers().startSingleTimer( + getExpiryKeyFor(request), + new ExpireDisableTaskExecutorsRequest(request), + toExpiry); + findAndMarkDisabledTaskExecutorsFor(request); + } + } + private void onNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest request) { ActorRef sender = sender(); if (addNewDisableTaskExecutorsRequest(request)) { try { - log.info("New req to add {}", request); + log.info("New DisableTaskExecutorsRequest to add {}", request); // store the request in a persistent store in order to retrieve it if the node goes down mantisJobStore.storeNewDisabledTaskExecutorsRequest(request); // figure out the time to expire the current request diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index edb4e67cd..862691b7d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -23,6 +23,7 @@ import akka.actor.Props; import akka.actor.Status; import akka.actor.SupervisorStrategy; +import akka.actor.Terminated; import akka.japi.pf.ReceiveBuilder; import com.netflix.spectator.api.TagList; import io.mantisrx.common.Ack; @@ -99,6 +100,13 @@ public SupervisorStrategy supervisorStrategy() { return MantisActorSupervisorStrategy.getInstance().create(); } + private static final Duration CHILD_RECREATE_DELAY = Duration.ofSeconds(10); + + @Value + static class RecreateChildActor { + String actorName; + } + private final Duration heartbeatTimeout; private final Duration assignmentTimeout; private final Duration disabledTaskExecutorsCheckInterval; @@ -268,6 +276,7 @@ public void preStart() throws Exception { Props registryProps = ReservationRegistryActor.props(this.clusterID, clock, null, null, null, metrics); reservationRegistryActor = getContext().actorOf(registryProps, reservationRegistryActorName); } + getContext().watch(reservationRegistryActor); Option existingExecutorStateManager = getContext().child(executorStateManagerActorName); if (existingExecutorStateManager.isDefined()) { @@ -293,6 +302,7 @@ public void preStart() throws Exception { reservationSchedulingEnabled); executorStateManagerActor = getContext().actorOf(esmProps, executorStateManagerActorName); } + getContext().watch(executorStateManagerActor); syncExecutorJobArtifactsCache(); @@ -370,6 +380,8 @@ public Receive createReceive() { .match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest) .match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest) .match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self())) + .match(Terminated.class, this::onChildTerminated) + .match(RecreateChildActor.class, this::onRecreateChildActor) .build(); } @@ -451,6 +463,103 @@ private void onGetReservationAwareClusterUsage(GetReservationAwareClusterUsageRe }); } + private void onChildTerminated(Terminated terminated) { + ActorRef deadActor = terminated.getActor(); + String actorType; + if (deadActor.equals(executorStateManagerActor)) { + actorType = "executorStateManager"; + log.error("{} terminated for cluster {}. Nulling ref and scheduling re-creation.", + actorType, clusterID); + executorStateManagerActor = null; + timers().startSingleTimer( + "recreate-esm-" + clusterID.getResourceID(), + new RecreateChildActor(executorStateManagerActorName), + CHILD_RECREATE_DELAY); + } else if (deadActor.equals(reservationRegistryActor)) { + actorType = "reservationRegistry"; + log.error("{} terminated for cluster {}. Nulling ref and scheduling re-creation.", + actorType, clusterID); + reservationRegistryActor = null; + timers().startSingleTimer( + "recreate-rr-" + clusterID.getResourceID(), + new RecreateChildActor(reservationRegistryActorName), + CHILD_RECREATE_DELAY); + } else { + actorType = "unknown"; + log.error("Received Terminated for unknown actor {} in cluster {}", deadActor, clusterID); + } + metrics.incrementCounter( + ResourceClusterActorMetrics.CHILD_ACTOR_TERMINATED, + TagList.create(ImmutableMap.of( + "resourceCluster", clusterID.getResourceID(), + "actorType", actorType))); + } + + private void onRecreateChildActor(RecreateChildActor request) { + String actorName = request.getActorName(); + // Check if actor already exists (e.g. from a race with restart) + Option existing = getContext().child(actorName); + if (existing.isDefined()) { + log.info("Child actor {} already exists for cluster {}, skipping re-creation", actorName, clusterID); + if (actorName.equals(executorStateManagerActorName)) { + executorStateManagerActor = existing.get(); + getContext().watch(executorStateManagerActor); + } else if (actorName.equals(reservationRegistryActorName)) { + reservationRegistryActor = existing.get(); + getContext().watch(reservationRegistryActor); + } + return; + } + + try { + if (actorName.equals(executorStateManagerActorName)) { + if (!(executorStateManager instanceof ExecutorStateManagerImpl)) { + throw new IllegalStateException("ExecutorStateManager is not an instance of ExecutorStateManagerImpl"); + } + Props esmProps = ExecutorStateManagerActor.props( + (ExecutorStateManagerImpl) executorStateManager, + clock, + rpcService, + jobMessageRouter, + mantisJobStore, + heartbeatTimeout, + assignmentTimeout, + disabledTaskExecutorsCheckInterval, + clusterID, + isJobArtifactCachingEnabled, + jobClustersWithArtifactCachingEnabled, + metrics, + executeStageRequestFactory, + reservationSchedulingEnabled); + executorStateManagerActor = getContext().actorOf(esmProps, executorStateManagerActorName); + getContext().watch(executorStateManagerActor); + syncExecutorJobArtifactsCache(); + log.info("Successfully recreated ExecutorStateManagerActor for cluster {}", clusterID); + metrics.incrementCounter( + ResourceClusterActorMetrics.CHILD_ACTOR_RECREATED, + TagList.create(ImmutableMap.of( + "resourceCluster", clusterID.getResourceID(), + "actorType", "executorStateManager"))); + } else if (actorName.equals(reservationRegistryActorName)) { + Props registryProps = ReservationRegistryActor.props(this.clusterID, clock, null, null, null, metrics); + reservationRegistryActor = getContext().actorOf(registryProps, reservationRegistryActorName); + getContext().watch(reservationRegistryActor); + log.info("Successfully recreated ReservationRegistryActor for cluster {}", clusterID); + metrics.incrementCounter( + ResourceClusterActorMetrics.CHILD_ACTOR_RECREATED, + TagList.create(ImmutableMap.of( + "resourceCluster", clusterID.getResourceID(), + "actorType", "reservationRegistry"))); + } + } catch (Exception e) { + log.error("Failed to recreate child actor {} for cluster {}, scheduling retry", actorName, clusterID, e); + timers().startSingleTimer( + "recreate-retry-" + actorName, + new RecreateChildActor(actorName), + CHILD_RECREATE_DELAY); + } + } + private void syncExecutorJobArtifactsCache() { if (executorStateManagerActor == null) { return; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActorMetrics.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActorMetrics.java index c9c95ab51..30ca60b49 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActorMetrics.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActorMetrics.java @@ -55,6 +55,10 @@ class ResourceClusterActorMetrics { public static final String NUM_PENDING_RESERVATIONS = "numPendingReservations"; public static final String RESERVATION_FULFILLMENT_LATENCY = "reservationFulfillmentLatency"; + public static final String EXECUTOR_STATE_MANAGER_INIT_FAILURE = "executorStateManagerInitFailure"; + public static final String CHILD_ACTOR_TERMINATED = "childActorTerminated"; + public static final String CHILD_ACTOR_RECREATED = "childActorRecreated"; + private final Registry registry; private final ConcurrentHashMap> messageMetrics; private final Tuple2 unknownMessageMetrics; diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java index 6bbebef74..c109faf93 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClustersManagerActor.java @@ -20,6 +20,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.actor.Terminated; import akka.japi.pf.ReceiveBuilder; import io.mantisrx.common.akka.MantisActorSupervisorStrategy; import io.mantisrx.master.resourcecluster.ResourceClusterActor.AddNewJobArtifactsToCacheRequest; @@ -60,6 +61,7 @@ import java.time.Clock; import java.time.Duration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import lombok.Builder; @@ -81,6 +83,10 @@ class ResourceClustersManagerActor extends AbstractActor { // Cluster Id to map. private final Map resourceClusterActorMap; + // Clusters where one child died and the sibling is being stopped. + // Entry is removed only after both actors are confirmed terminated. + private final Set drainingClusters; + private final ActorRef resourceClusterHostActor; private final IMantisPersistenceProvider mantisPersistenceProvider; private final JobMessageRouter jobMessageRouter; @@ -122,6 +128,7 @@ public ResourceClustersManagerActor( this.executeStageRequestFactory = new ExecuteStageRequestFactory(masterConfiguration); this.resourceClusterActorMap = new HashMap<>(); + this.drainingClusters = new HashSet<>(); } @Override @@ -186,9 +193,45 @@ public Receive createReceive() { holder.getResourceClusterActor().forward(markReady, context())); sender().tell(io.mantisrx.common.Ack.getInstance(), self()); }) + .match(Terminated.class, this::onChildTerminated) .build(); } + private void onChildTerminated(Terminated terminated) { + ActorRef deadActor = terminated.getActor(); + + for (Map.Entry entry : resourceClusterActorMap.entrySet()) { + ActorHolder holder = entry.getValue(); + if (!deadActor.equals(holder.getResourceClusterActor()) && + !deadActor.equals(holder.getResourceClusterScalerActor())) { + continue; + } + + ClusterID clusterID = entry.getKey(); + if (drainingClusters.contains(clusterID)) { + // Second Terminated: the survivor we stopped has now fully terminated. + // Safe to remove — both actor names are freed. + drainingClusters.remove(clusterID); + resourceClusterActorMap.remove(clusterID); + log.info("Both actors terminated for cluster {}. Entry removed; " + + "will be recreated on next request.", clusterID); + } else { + // First Terminated: stop the sibling but keep the entry until both are dead. + drainingClusters.add(clusterID); + ActorRef survivor = deadActor.equals(holder.getResourceClusterActor()) + ? holder.getResourceClusterScalerActor() + : holder.getResourceClusterActor(); + getContext().stop(survivor); + log.error("Actor {} terminated for cluster {}. Stopping sibling {}; " + + "entry will be removed once both are terminated.", + deadActor, clusterID, survivor); + } + return; + } + + log.warn("Received Terminated for unknown actor {}", deadActor); + } + private ActorRef createResourceClusterActorFor(ClusterID clusterID) { log.info("Creating resource cluster actor for {}", clusterID); ActorRef clusterActor = diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/worker/JobWorkerTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/worker/JobWorkerTest.java new file mode 100644 index 000000000..0b4aae5f3 --- /dev/null +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/jobcluster/job/worker/JobWorkerTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.master.jobcluster.job.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.netflix.spectator.api.DefaultRegistry; +import com.netflix.spectator.api.Registry; +import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; +import io.mantisrx.master.events.LifecycleEventPublisher; +import io.mantisrx.runtime.MantisJobState; +import io.mantisrx.server.core.Status; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JobWorkerTest { + + private static Registry registry; + + @BeforeClass + public static void setup() { + registry = new DefaultRegistry(); + SpectatorRegistryFactory.setRegistry(registry); + } + + @Test + public void testWorkerAcceptedToStartedMsRecordedOnWorkerStatus() throws Exception { + final long acceptedAt = 1000L; + final long startedAt = 6000L; + final long expectedLatencyNanos = TimeUnit.MILLISECONDS.toNanos(startedAt - acceptedAt); + + // Use a unique job ID (format: clusterName-jobNum) to avoid registry collisions + final String jobId = "test-startedMs-" + System.nanoTime() + "-1"; + + // Build worker in Launched state so we can transition to Started via WorkerStatus + JobWorker worker = new JobWorker.Builder() + .withJobId(jobId) + .withWorkerIndex(0) + .withWorkerNumber(1) + .withStageNum(1) + .withNumberOfPorts(1) + .withAcceptedAt(acceptedAt) + .withState(WorkerState.Launched) + .withLifecycleEventsPublisher(LifecycleEventPublisher.noop()) + .build(); + + // Send WorkerStatus with Started state — this is the path we're testing + Status startedStatus = new Status( + jobId, 1, 0, 1, + Status.TYPE.INFO, "started", MantisJobState.Started, startedAt); + WorkerStatus startedEvent = new WorkerStatus(startedStatus, Instant.ofEpochMilli(startedAt)); + boolean persistRequired = worker.processEvent(startedEvent); + + assertTrue("processEvent should return true for Started status", persistRequired); + + // Query the Spectator registry directly for the timer + // The MetricId formats it as "metricGroup_metricName" + com.netflix.spectator.api.Timer spectatorTimer = registry.timer( + registry.createId("JobWorker_workerAcceptedToStartedMs").withTag("jobId", jobId)); + + assertEquals("workerAcceptedToStartedMs timer should have been recorded once", + 1, spectatorTimer.count()); + assertEquals("workerAcceptedToStartedMs should record the correct latency", + expectedLatencyNanos, spectatorTimer.totalTime()); + } +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java index a64abf5bc..a73e67c8d 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java +++ b/mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorTest.java @@ -724,6 +724,10 @@ public void testIfDisableTaskExecutorRequestsAreExpiredCorrectly() throws Except @Test public void testIfDisabledTaskExecutorRequestsAreInitializedCorrectlyWhenTheControlPlaneStarts() throws Exception { + // Stop the @Before actor and wait for its async IO to complete before overriding mock + actorSystem.stop(resourceClusterActor); + Thread.sleep(200); + when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.eq(CLUSTER_ID))) .thenReturn(ImmutableList.of( new DisableTaskExecutorsRequest( @@ -732,9 +736,12 @@ public void testIfDisabledTaskExecutorRequestsAreInitializedCorrectlyWhenTheCont Instant.now().plus(Duration.ofDays(1)), Optional.empty()))); - actorSystem.stop(resourceClusterActor); setupActor(); + // Disabled TEs are now loaded asynchronously via pipe on the io-dispatcher, + // so allow time for the future to complete and the result to be delivered. + Thread.sleep(500); + assertEquals(Ack.getInstance(), resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION).get()); assertEquals( new ResourceOverview(1, 0, 0, 0, 1), @@ -818,6 +825,62 @@ public void testGetAssignedTaskExecutorAfterTaskCompletes() throws Throwable { } } + @Test + public void testPreStartResilientToJobStoreFailure() throws Exception { + // Stop the @Before actor and wait for its async IO to complete before overriding mock + actorSystem.stop(resourceClusterActor); + Thread.sleep(200); + + // Now safe to override the mock — no in-flight futures from the old actor + when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.eq(CLUSTER_ID))) + .thenThrow(new RuntimeException("DEADLINE_EXCEEDED: DGW timeout")); + doReturn(ImmutableList.of()) + .when(mantisJobStore) + .getJobArtifactsToCache(CLUSTER_ID); + + setupActor(); + + // Wait for the async load retry to be attempted + Thread.sleep(500); + + // The actor should still be alive and functional despite the load failure + assertEquals(Ack.getInstance(), resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION).get()); + assertEquals(ImmutableList.of(TASK_EXECUTOR_ID), resourceCluster.getRegisteredTaskExecutors().get()); + } + + @Test + public void testPreStartRetriesAndSucceeds() throws Exception { + // Stop the @Before actor and wait for its async IO to complete before overriding mock + actorSystem.stop(resourceClusterActor); + Thread.sleep(200); + + // First call fails, second call succeeds + when(mantisJobStore.loadAllDisableTaskExecutorsRequests(ArgumentMatchers.eq(CLUSTER_ID))) + .thenThrow(new RuntimeException("DEADLINE_EXCEEDED: DGW timeout")) + .thenReturn(ImmutableList.of( + new DisableTaskExecutorsRequest( + ATTRIBUTES, + CLUSTER_ID, + Instant.now().plus(Duration.ofDays(1)), + Optional.empty()))); + doReturn(ImmutableList.of()) + .when(mantisJobStore) + .getJobArtifactsToCache(CLUSTER_ID); + + setupActor(); + + // Wait for retry (retry delay is 5s in production, but the actor processes the initial attempt immediately) + // The first attempt fails immediately via self-tell, then a timer-based retry is scheduled. + // In test, we wait long enough for the retry timer to fire. + Thread.sleep(6000); + + // After successful retry, disabled TEs should be loaded + assertEquals(Ack.getInstance(), resourceCluster.registerTaskExecutor(TASK_EXECUTOR_REGISTRATION).get()); + assertEquals( + new ResourceOverview(1, 0, 0, 0, 1), + resourceCluster.resourceOverview().get()); + } + @Test public void testTaskExecutorIsDisabledEvenAfterRestart() throws Exception { when(mantisJobStore.getTaskExecutor(TASK_EXECUTOR_ID)).thenReturn(TASK_EXECUTOR_REGISTRATION); diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/rules/ScalerControllerActorTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/rules/ScalerControllerActorTest.java index 1564d4250..3ac68f3aa 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/rules/ScalerControllerActorTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/rules/ScalerControllerActorTest.java @@ -20,8 +20,6 @@ import org.mockito.stubbing.Answer; import rx.Observable; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -34,12 +32,8 @@ public class ScalerControllerActorTest { private static final String JOB_ID = "test-job-id"; private static final String RULE_ID_1 = "1"; private static final String RULE_ID_2 = "2"; - private static final Duration Max_Duration = Duration.of(5000, ChronoUnit.MILLIS); - private static final Duration Interval_Duration = Duration.of(500, ChronoUnit.MILLIS); private ActorSystem system; - private TestKit testKit; - private JobScalerContext jobScalerContext; @Mock @@ -52,7 +46,6 @@ public class ScalerControllerActorTest { public void setUp() { MockitoAnnotations.initMocks(this); system = ActorSystem.create(); - testKit = new TestKit(system); jobScalerContext = JobScalerContext.builder() .jobId(JOB_ID) .masterClientApi(masterClientApi) @@ -81,11 +74,23 @@ public void setUp() { public void tearDown() { TestKit.shutdownActorSystem(system); system = null; - testKit = null; } @Test - public void testOnRuleRefreshWithPerpetualRuleWithDefault() { + public void testOnRuleRefreshWithPerpetualRuleWithDefault() throws Exception { + CountDownLatch startLatch = new CountDownLatch(2); + CountDownLatch shutdownLatch = new CountDownLatch(2); + doAnswer((Answer) invocation -> { + log.info("Test: start job auto scaler service"); + startLatch.countDown(); + return null; + }).when(jobAutoScalerService).start(); + doAnswer((Answer) invocation -> { + log.info("Test: shutdown job auto scaler service"); + shutdownLatch.countDown(); + return null; + }).when(jobAutoScalerService).shutdown(); + JobScalingRule perpetualRule = TestRuleUtils.createPerpetualRule(RULE_ID_1, JOB_ID); JobScalingRule perpetualRule2 = TestRuleUtils.createPerpetualRule(RULE_ID_2, JOB_ID); @@ -123,16 +128,21 @@ public void testOnRuleRefreshWithPerpetualRuleWithDefault() { response = probe.expectMsgClass(ScalerControllerActor.GetActiveRuleResponse.class); assertNull(response.getRule()); - testKit.awaitAssert(Max_Duration, Interval_Duration, - () -> { - verify(jobAutoScalerService, times(2)).start(); - verify(jobAutoScalerService, times(2)).shutdown(); - return null; - }); + assertTrue("Expected 2 start() calls within timeout", + startLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Expected 2 shutdown() calls within timeout", + shutdownLatch.await(10, TimeUnit.SECONDS)); } @Test - public void testOnRuleRefreshWithDesireSize() { + public void testOnRuleRefreshWithDesireSize() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + doAnswer((Answer) invocation -> { + log.info("Test: start job auto scaler service"); + startLatch.countDown(); + return null; + }).when(jobAutoScalerService).start(); + JobScalingRule perpetualRule = TestRuleUtils.createPerpetualRuleWithDesireSize(RULE_ID_1, JOB_ID); ActorRef controllerActor = system.actorOf(ScalerControllerActor.Props(jobScalerContext), "controllerActor"); @@ -144,15 +154,25 @@ public void testOnRuleRefreshWithDesireSize() { probe.expectMsgClass(ScalerControllerActor.GetActiveRuleResponse.class); assertEquals(perpetualRule, response.getRule()); - testKit.awaitAssert(Max_Duration, Interval_Duration, - () -> { - verify(jobAutoScalerService, times(1)).start(); - return null; - }); + assertTrue("Expected 1 start() call within timeout", + startLatch.await(10, TimeUnit.SECONDS)); } @Test - public void testOnRuleRefreshWithDesireSizeOnly() { + public void testOnRuleRefreshWithDesireSizeOnly() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch shutdownLatch = new CountDownLatch(1); + doAnswer((Answer) invocation -> { + log.info("Test: start job auto scaler service"); + startLatch.countDown(); + return null; + }).when(jobAutoScalerService).start(); + doAnswer((Answer) invocation -> { + log.info("Test: shutdown job auto scaler service"); + shutdownLatch.countDown(); + return null; + }).when(jobAutoScalerService).shutdown(); + JobScalingRule perpetualRule1 = TestRuleUtils.createPerpetualRuleWithDesireSize(RULE_ID_1, JOB_ID); JobScalingRule perpetualRule2 = TestRuleUtils.createPerpetualRuleWithDesireSizeOnly(RULE_ID_2, JOB_ID); @@ -171,17 +191,15 @@ public void testOnRuleRefreshWithDesireSizeOnly() { probe.expectMsgClass(ScalerControllerActor.GetActiveRuleResponse.class); assertEquals(perpetualRule2, response.getRule()); - testKit.awaitAssert(Max_Duration, Interval_Duration, - () -> { - // no service should be stared since no scaling policy is defined - verify(jobAutoScalerService, times(1)).start(); - verify(jobAutoScalerService, times(1)).shutdown(); - return null; - }); + // no service should be started since no scaling policy is defined for rule 2 + assertTrue("Expected 1 start() call within timeout", + startLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Expected 1 shutdown() call within timeout", + shutdownLatch.await(10, TimeUnit.SECONDS)); } @Test - public void testOnRuleRefreshFailedStart() { + public void testOnRuleRefreshFailedStart() throws Exception { JobScalingRule perpetualRule = TestRuleUtils.createPerpetualRule(RULE_ID_1, JOB_ID); JobScalingRule perpetualRule2 = TestRuleUtils.createPerpetualRule(RULE_ID_2, JOB_ID); @@ -189,15 +207,18 @@ public void testOnRuleRefreshFailedStart() { final TestKit probe = new TestKit(system); CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch startLatch = new CountDownLatch(2); AtomicInteger serviceNum = new AtomicInteger(); doAnswer((Answer) invocation -> { if (serviceNum.get() == 0) { log.info("Test Block: job auto scaler service"); serviceNum.getAndIncrement(); + startLatch.countDown(); assertTrue(latch1.await(10, TimeUnit.SECONDS)); throw new RuntimeException("Mock start service failure"); } else { log.info("Test: start job auto scaler service"); + startLatch.countDown(); } return null; }).when(jobAutoScalerService).start(); @@ -225,10 +246,7 @@ public void testOnRuleRefreshFailedStart() { probe.expectMsgClass(ScalerControllerActor.GetActiveRuleResponse.class); assertEquals(perpetualRule2, response.getRule()); - testKit.awaitAssert(Max_Duration, Interval_Duration, - () -> { - verify(jobAutoScalerService, times(2)).start(); - return null; - }); + assertTrue("Expected 2 start() calls within timeout", + startLatch.await(10, TimeUnit.SECONDS)); } }