diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java new file mode 100644 index 0000000..68c91a4 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java @@ -0,0 +1,58 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager; + +import java.util.concurrent.Semaphore; + +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; + +/** + * Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue. + * This to register any work that is still on the queuue and to properly calculate load metrics. + * Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics. + * This can result in the metrics being skewed, and thereby negatively reporting load metrics. + */ +public class StartupGuard implements WorkerSizeObserver { + + private final Semaphore openSemaphore = new Semaphore(0); + + private boolean open; + + /** + * @return Returns true once the number of messages has become zero for the first time. + */ + public boolean isOpen() { + return open; + } + + /** + * Wait for the number of messages on the message queue to become zero. + */ + public void waitForOpen() throws InterruptedException { + openSemaphore.acquire(); + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + synchronized (openSemaphore) { + if (!open) { + open = true; + openSemaphore.release(); + } + } + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java index ce83b3b..f4a6d85 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java @@ -45,10 +45,10 @@ public enum State { private final WorkerPool workerPool; private final TaskScheduler scheduler; + private final String workerQueueName; private boolean running; private State state; - private final String workerQueueName; public TaskDispatcher(final String workerQueueName, final TaskScheduler scheduler, final WorkerPool workerPool) { this.workerQueueName = workerQueueName; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index c92b1c7..61a226a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -23,9 +23,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,7 +74,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto * @param schedule scheduler configuration * @throws InterruptedException */ - public boolean updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { + public void updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType()); @@ -85,7 +85,10 @@ public boolean updateTaskScheduler(final TaskSchedule schedule) throws Interr final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName); taskScheduleBucket.updateQueues(schedule.getQueues(), workerQueueConfig); - return taskScheduleBucket.isRunning(); + } + + public boolean isRunning(final String queue) { + return Optional.ofNullable(buckets.get(queue)).map(TaskScheduleBucket::isRunning).orElse(false); } /** @@ -121,11 +124,13 @@ private class TaskScheduleBucket { public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException { this.workerQueueName = queueConfig.queueName(); final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName); + final StartupGuard startupGuard = new StartupGuard(); + taskScheduler = schedulerFactory.createScheduler(queueConfig); workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), - OpenTelemetryMetrics.METER, workerPool); + OpenTelemetryMetrics.METER, startupGuard); watchDog.addQueueWatchDogListener(workerPool); watchDog.addQueueWatchDogListener(taskScheduler); @@ -134,8 +139,11 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer.addWorkerProducerHandler(reporter); workerProducer.addWorkerProducerHandler(watchDog); + workerSizeObserverProxy.addObserver(workerQueueName, reporter); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); workerSizeObserverProxy.addObserver(workerQueueName, watchDog); + // startup Guard should be the last observer added as it will unlock the task dispatcher + workerSizeObserverProxy.addObserver(workerQueueName, startupGuard); if (taskScheduler instanceof final WorkerSizeObserver wzo) { workerSizeObserverProxy.addObserver(workerQueueName, wzo); @@ -143,15 +151,21 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer.start(); // Set up metrics WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName); - // Set up dispatcher + dispatcher = new TaskDispatcher(workerQueueName, taskScheduler, workerPool); - executorService.execute(dispatcher); - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // just wait a little second to make sure the process is actually running. - LOG.info("Started taskscheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + executorService.execute(() -> { + try { + // Wait for worker queue to be empty. + startupGuard.waitForOpen(); + LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + dispatcher.run(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + }}); } /** - * @return + * @return true if the dispatcher is running. */ public boolean isRunning() { return dispatcher.isRunning(); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index d489735..10b0332 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -47,11 +47,14 @@ class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMet private final Semaphore freeWorkers = new Semaphore(0); private final Map runningWorkers = new ConcurrentHashMap<>(); - private int totalConfiguredWorkers; private final String workerQueueName; private final WorkerProducer wp; private final WorkerUpdateHandler workerUpdateHandler; + private int totalReportedWorkers; + private int initialUnaccountedWorkers; + private boolean firstUpdateReceived; + public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) { this.workerQueueName = workerQueueName; this.wp = wp; @@ -83,19 +86,19 @@ public void sendTaskToWorker(final Task task) throws IOException { public int getWorkerSize() { synchronized (this) { - return freeWorkers.availablePermits() + runningWorkers.size(); + return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers; } } @Override public int getReportedWorkerSize() { - return totalConfiguredWorkers; + return totalReportedWorkers; } @Override public int getRunningWorkerSize() { synchronized (this) { - return runningWorkers.size(); + return runningWorkers.size() + initialUnaccountedWorkers; } } @@ -120,24 +123,32 @@ void releaseWorker(final String taskId) { * @param taskRecord the task is expected to be on. */ void releaseWorker(final String taskId, final TaskRecord taskRecord) { - if (taskRecord != null) { - synchronized (this) { - if (runningWorkers.containsKey(taskId)) { - // if currentSize is smaller than the worker size it means the worker - // must not be re-added as free worker but removed from the pool. - if (totalConfiguredWorkers >= runningWorkers.size()) { - freeWorkers.release(1); - } - runningWorkers.remove(taskId); - } else { - LOG.info("[{}][taskId:{}] Task for queue '{}' not found, maybe it was already released).", workerQueueName, taskId, taskRecord.queueName()); + synchronized (this) { + if (runningWorkers.containsKey(taskId)) { + freeWorker(); + runningWorkers.remove(taskId); + } else { + if (initialUnaccountedWorkers > 0) { + freeWorker(); } + initialUnaccountedWorkers = Math.max(initialUnaccountedWorkers - 1, 0); + LOG.info("[{}][taskId:{}] Unknown task received, possible left over of restart).", workerQueueName, taskId); } + } + if (taskRecord != null) { workerUpdateHandler.onTaskFinished(taskRecord); LOG.debug("[{}][taskId:{}] Task released).", workerQueueName, taskId); } } + private void freeWorker() { + // if currentSize is smaller than the worker size it means the worker + // must not be re-added as free worker but removed from the pool. + if (totalReportedWorkers >= (runningWorkers.size() + initialUnaccountedWorkers)) { + freeWorkers.release(1); + } + } + /** * Takes a worker from the free workers list and add it to the reserved workers list. Blocks until a worker becomes available on the free worker * list. @@ -169,24 +180,28 @@ public void reserveWorker() { @Override public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { synchronized (this) { + if (!firstUpdateReceived) { + initialUnaccountedWorkers = numberOfMessages; + firstUpdateReceived = true; + } updateNumberOfWorkers(numberOfWorkers); } } private void updateNumberOfWorkers(final int numberOfWorkers) { - final int previousTotalConfiguredWorkers = totalConfiguredWorkers; - totalConfiguredWorkers = numberOfWorkers; - final int deltaWorkers = totalConfiguredWorkers - getWorkerSize(); + final int previousTotalReportedWorkers = totalReportedWorkers; + totalReportedWorkers = numberOfWorkers; + final int deltaWorkers = totalReportedWorkers - getWorkerSize(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); - LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers); + LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalReportedWorkers, deltaWorkers); } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0) && freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers))) { - LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers); + LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalReportedWorkers, deltaWorkers); } - if (previousTotalConfiguredWorkers != totalConfiguredWorkers) { - workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers); + if (previousTotalReportedWorkers != totalReportedWorkers) { + workerUpdateHandler.onWorkerPoolSizeChange(totalReportedWorkers); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index e13338d..7b07f78 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -31,8 +31,9 @@ import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; +import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.client.TaskMetrics; import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; @@ -52,7 +53,7 @@ * * - Average load (in percentage) of all workers (of a certain type) together. */ -public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener { +public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener, WorkerSizeObserver { private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class); @@ -77,9 +78,10 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final DurationMetric workWorkerMetrics; private final LoadMetric loadMetrics = new LoadMetric(); + private final StartupGuard startupGuard; + private final Meter meter; private final String queueGroupName; - private final WorkerMetrics workerMetrics; private final DoubleGauge loadGauge; private final Attributes workerAttributes; @@ -87,11 +89,13 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW // as it doesn't have any metrics on it anymore. private final Set dispatchedTasks = new HashSet<>(); + private int numberOfWorkers; + public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, - final WorkerMetrics workerMetrics) { + final StartupGuard startupGuard) { this.queueGroupName = queueGroupName; this.meter = meter; - this.workerMetrics = workerMetrics; + this.startupGuard = startupGuard; // Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker. // Measures by worker and per queue to the worker @@ -136,7 +140,7 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, workerMetrics.getReportedWorkerSize()); + loadMetrics.register(1, numberOfWorkers); } @Override @@ -145,8 +149,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - if (dispatchedTasks.remove(messageId)) { - loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + loadMetrics.register(-1, numberOfWorkers); + } + + @Override + public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + this.numberOfWorkers = numberOfWorkers; + if (!startupGuard.isOpen() && numberOfMessages > 0) { + LOG.info("Queue {} will be started with {} messages already on the queue.", queueGroupName, numberOfMessages); + loadMetrics.register(numberOfMessages, numberOfWorkers); } } @@ -195,9 +206,11 @@ private static void metrics(final String prefixText, final DoubleGauge gauge, fi } private void workLoad() { - final double load = loadMetrics.process(); + if (startupGuard.isOpen()) { + final double load = loadMetrics.process(); - loadGauge.set(load, workerAttributes); - LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); + loadGauge.set(load, workerAttributes); + LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); + } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java new file mode 100644 index 0000000..73397c7 --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java @@ -0,0 +1,68 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Test class for {@link StartupGuard} + */ +class StartupGuardTest { + + @Test + void testOpen() { + final StartupGuard guard = new StartupGuard(); + + assertFalse(guard.isOpen(), "Guard should not be open."); + guard.onNumberOfWorkersUpdate(0, 1); + assertTrue(guard.isOpen(), "Guard should be open when onNumberOfWorkersUpdate is called."); + guard.onNumberOfWorkersUpdate(0, 1); + assertTrue(guard.isOpen(), "Guard should still remain open onNumberOfWorkersUpdate has been called."); + } + + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + void testWaitForOpen() throws InterruptedException { + final StartupGuard guard = new StartupGuard(); + final Semaphore waitForStart = new Semaphore(0); + final Semaphore waitForOpen = new Semaphore(0); + + new Thread(() -> { + try { + waitForStart.release(); + guard.waitForOpen(); + waitForOpen.release(); + } catch (final InterruptedException e) { + // Ignore exception + } + }).start(); + // First wait for first semaphore to be unlocked. + waitForStart.acquire(); + assertFalse(guard.isOpen(), "Guard should not be open."); + guard.onNumberOfWorkersUpdate(1, 1); + // Wait for semaphore that is called after waitForOpen is unlocked. + waitForOpen.acquire(); + assertTrue(guard.isOpen(), "Guard should now be open."); + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java index b9ddfb6..59bb280 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java @@ -17,7 +17,8 @@ package nl.aerius.taskmanager; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import java.io.File; import java.io.IOException; @@ -29,9 +30,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import nl.aerius.taskmanager.MockTaskScheduler.MockSchedulerFactory; import nl.aerius.taskmanager.adaptor.AdaptorFactory; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.PriorityTaskSchedule; import nl.aerius.taskmanager.domain.RabbitMQQueueType; @@ -44,7 +48,9 @@ class TaskManagerTest { private static ExecutorService executor; private static ScheduledExecutorService scheduledExecutorService; + private final PriorityTaskSchedulerFileHandler handler = new PriorityTaskSchedulerFileHandler(); + private PriorityTaskSchedule schedule; private TaskManager taskManager; @@ -54,7 +60,14 @@ void setUp() throws IOException { scheduledExecutorService = Executors.newScheduledThreadPool(1); final AdaptorFactory factory = new MockAdaptorFactory(); final MockSchedulerFactory schedulerFactory = new MockTaskScheduler.MockSchedulerFactory(); - taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, factory.createWorkerSizeProvider()); + final WorkerSizeProviderProxy workerSizeProvider = factory.createWorkerSizeProvider(); + + doAnswer(a -> { + // This will unblock the startup guard + ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0); + return null; + }).when(workerSizeProvider).addObserver(any(), any()); + taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, workerSizeProvider); schedule = handler.read(new File(getClass().getClassLoader().getResource("queue/priority-task-scheduler.ops.json").getFile())); } @@ -67,25 +80,35 @@ void after() throws InterruptedException { } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testAddScheduler() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); assertEquals(RabbitMQQueueType.STREAM, schedule.getQueueType(), "Should have queueType STREAM"); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testModifyQueue() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); schedule.getQueues().get(0).setPriority(30); - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated"); + updateTaskScheduler(); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testRemoveQueue() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); schedule.getQueues().remove(0); - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated"); + updateTaskScheduler(); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } + + private void updateTaskScheduler() throws InterruptedException { + taskManager.updateTaskScheduler(schedule); + while(!taskManager.isRunning(schedule.getWorkerQueueName())) { + Thread.sleep(300); + } + } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 562cdbf..0a205fd 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.lenient; @@ -26,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -75,16 +75,29 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { - assertSame(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); + assertEquals(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); workerPool.onNumberOfWorkersUpdate(10, 0); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); final Task task = createAndSendTaskToWorker(); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); workerPool.releaseWorker(task.getId()); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + } + + @Test + void testWorkerPoolSizingWithInitialSize() throws IOException { + workerPool.onNumberOfWorkersUpdate(10, 5); + assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is 5"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5); + assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is still 5"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + IntStream.range(1, 6).forEach(a -> workerPool.onWorkerFinished("", null)); + assertEquals(0, workerPool.getRunningWorkerSize(), "After unknown tasks received running size should be 0"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); } @Test @@ -112,7 +125,6 @@ void testWorkerPoolScaleDown() throws IOException { assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); } - @Test void testReleaseTaskTwice() throws IOException { workerPool.onNumberOfWorkersUpdate(2, 0); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index 16df98e..d30590b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -46,7 +46,7 @@ import io.opentelemetry.api.metrics.DoubleGaugeBuilder; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; +import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.client.TaskMetrics; /** @@ -63,11 +63,11 @@ class PerformanceMetricsReporterTest { private final Map mockedGauges = new HashMap<>(); private @Mock Meter mockedMeter; - private @Mock WorkerMetrics workMetrics; private @Mock ScheduledExecutorService scheduledExecutorService; private @Captor ArgumentCaptor methodCaptor; private @Captor ArgumentCaptor durationCaptor; + private StartupGuard startupGuard; private PerformanceMetricsReporter reporter; @BeforeEach @@ -75,35 +75,36 @@ void beforeEach() { final DoubleGaugeBuilder mockGaugeBuilder = mock(DoubleGaugeBuilder.class); doAnswer(inv -> { final DoubleGauge gauge = mock(DoubleGauge.class); + doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); doReturn(gauge).when(mockGaugeBuilder).build(); mockedGauges.put(inv.getArgument(0, String.class), gauge); return mockGaugeBuilder; }).when(mockedMeter).gaugeBuilder(any()); lenient().doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); - reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, workMetrics); + startupGuard = new StartupGuard(); + + reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, startupGuard); verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); } @Test void testOnWorkDispatched() { + startUp(10, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); assertGaugeCalls("dispatched", "wait", 2.0, v -> v > 99.0); } @Test void testOnWorkerFinished() { + startUp(2, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); - lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); assertGaugeCalls("work", "duration", 2.0, v -> v > 99.0); - // getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1" - verify(workMetrics, times(2)).getReportedWorkerSize(); } private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) { @@ -117,19 +118,19 @@ private void assertGaugeCalls(final String label, final String type, final doubl @Test void testWorkLoad() throws InterruptedException { - doReturn(4).when(workMetrics).getReportedWorkerSize(); + startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. methodCaptor.getValue().run(); verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(durationCaptor.capture(), any()); - assertEquals(50.0, durationCaptor.getAllValues().get(1), "Expected workload of 50%"); + assertEquals(75.0, durationCaptor.getAllValues().get(1), "Expected workload of 75%"); } @Test void testReset() throws InterruptedException { - doReturn(4).when(workMetrics).getReportedWorkerSize(); + startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. @@ -143,6 +144,11 @@ void testReset() throws InterruptedException { assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore"); } + private void startUp(final int numberOfWorkers, final int numberOfMessages) { + reporter.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + } + private Map createMap(final String queueName, final long duration) { return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); }