From 563f2f4013c94fb03933a666d38cb5965ec1c5aa Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Fri, 13 Mar 2026 13:04:31 +0100 Subject: [PATCH 1/3] AER-4181 Added a startup guard to the schedulers This guard will block a scheduler from starting if there are still tasks on the worker queue. This will help when the taskmanager is restart during operation and there were still tasks on the queue. When the taskmanager is restart it looses the information on what tasks are still in progress and therefore operates as if there are no tasks. This will result in adding new tasks on the worker queue even while it should wait till the tasks that are already on the queue. After restart it also skewed the load metrics. Because it would report as no load, while there would be tasks on the queue. This could result in scaling down because it looked like there was nothing to do. With this change the taskmanager will wait before doing anything before the worker queue is actual empty, starting fresh. It also won't report any new load metrics until that moment. This will keep the old load metric last send active, which should result in no action being taken by the scaling mechanism. --- .../nl/aerius/taskmanager/StartupGuard.java | 58 +++++++++++++++++++ .../nl/aerius/taskmanager/TaskDispatcher.java | 2 +- .../nl/aerius/taskmanager/TaskManager.java | 30 +++++++--- .../metrics/PerformanceMetricsReporter.java | 14 +++-- .../aerius/taskmanager/StartupGuardTest.java | 47 +++++++++++++++ .../aerius/taskmanager/TaskManagerTest.java | 37 +++++++++--- .../PerformanceMetricsReporterTest.java | 7 ++- 7 files changed, 173 insertions(+), 22 deletions(-) create mode 100644 source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java create mode 100644 source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java new file mode 100644 index 0000000..0d95108 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java @@ -0,0 +1,58 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager; + +import java.util.concurrent.Semaphore; + +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; + +/** + * Class to be used at startup. The Scheduler should not start before the worker queue is empty. + * This to let any work that was left over after a restart of the taskmanager to complete before adding new tasks. + * Because the Taskmanager is not aware of the tasks already on the queue and therefore won't be counted in the metrics. + * This can result in the metrics being skewed, and thereby negatively reporting load metrics. + */ +public class StartupGuard implements WorkerSizeObserver { + + private final Semaphore openSemaphore = new Semaphore(0); + + private boolean open; + + /** + * @return Returns true once the number of messages has become zero for the first time. + */ + public boolean isOpen() { + return open; + } + + /** + * Wait for the number of messages on the message queue to become zero. + */ + public void waitForOpen() throws InterruptedException { + openSemaphore.acquire(); + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + synchronized (openSemaphore) { + if (!open && numberOfMessages == 0) { + open = true; + openSemaphore.release(); + } + } + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java index ce83b3b..f4a6d85 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskDispatcher.java @@ -45,10 +45,10 @@ public enum State { private final WorkerPool workerPool; private final TaskScheduler scheduler; + private final String workerQueueName; private boolean running; private State state; - private final String workerQueueName; public TaskDispatcher(final String workerQueueName, final TaskScheduler scheduler, final WorkerPool workerPool) { this.workerQueueName = workerQueueName; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index c92b1c7..f06c4e8 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -23,9 +23,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,7 +74,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto * @param schedule scheduler configuration * @throws InterruptedException */ - public boolean updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { + public void updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType()); @@ -85,7 +85,10 @@ public boolean updateTaskScheduler(final TaskSchedule schedule) throws Interr final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName); taskScheduleBucket.updateQueues(schedule.getQueues(), workerQueueConfig); - return taskScheduleBucket.isRunning(); + } + + public boolean isRunning(final String queue) { + return Optional.ofNullable(buckets.get(queue)).map(TaskScheduleBucket::isRunning).orElse(false); } /** @@ -121,11 +124,13 @@ private class TaskScheduleBucket { public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException { this.workerQueueName = queueConfig.queueName(); final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName); + final StartupGuard startupGuard = new StartupGuard(); + taskScheduler = schedulerFactory.createScheduler(queueConfig); workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), - OpenTelemetryMetrics.METER, workerPool); + OpenTelemetryMetrics.METER, workerPool, startupGuard); watchDog.addQueueWatchDogListener(workerPool); watchDog.addQueueWatchDogListener(taskScheduler); @@ -134,6 +139,7 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer.addWorkerProducerHandler(reporter); workerProducer.addWorkerProducerHandler(watchDog); + workerSizeObserverProxy.addObserver(workerQueueName, startupGuard); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); workerSizeObserverProxy.addObserver(workerQueueName, watchDog); @@ -143,15 +149,21 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer.start(); // Set up metrics WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName); - // Set up dispatcher + dispatcher = new TaskDispatcher(workerQueueName, taskScheduler, workerPool); - executorService.execute(dispatcher); - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // just wait a little second to make sure the process is actually running. - LOG.info("Started taskscheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + executorService.execute(() -> { + try { + // Wait for worker queue to be empty. + startupGuard.waitForOpen(); + LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + dispatcher.run(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + }}); } /** - * @return + * @return true if the dispatcher is running. */ public boolean isRunning() { return dispatcher.isRunning(); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index e13338d..5eae040 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -31,6 +31,7 @@ import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.Meter; +import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.client.TaskMetrics; @@ -77,6 +78,8 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final DurationMetric workWorkerMetrics; private final LoadMetric loadMetrics = new LoadMetric(); + private final StartupGuard startupGuard; + private final Meter meter; private final String queueGroupName; private final WorkerMetrics workerMetrics; @@ -88,10 +91,11 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final Set dispatchedTasks = new HashSet<>(); public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, - final WorkerMetrics workerMetrics) { + final WorkerMetrics workerMetrics, final StartupGuard startupGuard) { this.queueGroupName = queueGroupName; this.meter = meter; this.workerMetrics = workerMetrics; + this.startupGuard = startupGuard; // Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker. // Measures by worker and per queue to the worker @@ -195,9 +199,11 @@ private static void metrics(final String prefixText, final DoubleGauge gauge, fi } private void workLoad() { - final double load = loadMetrics.process(); + if (startupGuard.isOpen()) { + final double load = loadMetrics.process(); - loadGauge.set(load, workerAttributes); - LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); + loadGauge.set(load, workerAttributes); + LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); + } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java new file mode 100644 index 0000000..9956c1b --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java @@ -0,0 +1,47 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * Test class for {@link StartupGuard} + */ +class StartupGuardTest { + + private boolean open; + @Test + void testOpen() { + open = false; + final StartupGuard guard = new StartupGuard(); + new Thread(this::trigger).run(); + assertFalse(guard.isOpen(), "Guard should not be open."); + guard.onNumberOfWorkersUpdate(0, 1); + assertFalse(guard.isOpen(), "Guard should still not be open when number of message > 0."); + guard.onNumberOfWorkersUpdate(0, 0); + assertTrue(guard.isOpen(), "Guard should be open when number of message has become 0."); + guard.onNumberOfWorkersUpdate(0, 1); + assertTrue(guard.isOpen(), "Guard should still be be open once the number of messages has been zero once."); + } + + private void trigger() { + open = true; + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java index b9ddfb6..59bb280 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java @@ -17,7 +17,8 @@ package nl.aerius.taskmanager; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import java.io.File; import java.io.IOException; @@ -29,9 +30,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import nl.aerius.taskmanager.MockTaskScheduler.MockSchedulerFactory; import nl.aerius.taskmanager.adaptor.AdaptorFactory; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.PriorityTaskSchedule; import nl.aerius.taskmanager.domain.RabbitMQQueueType; @@ -44,7 +48,9 @@ class TaskManagerTest { private static ExecutorService executor; private static ScheduledExecutorService scheduledExecutorService; + private final PriorityTaskSchedulerFileHandler handler = new PriorityTaskSchedulerFileHandler(); + private PriorityTaskSchedule schedule; private TaskManager taskManager; @@ -54,7 +60,14 @@ void setUp() throws IOException { scheduledExecutorService = Executors.newScheduledThreadPool(1); final AdaptorFactory factory = new MockAdaptorFactory(); final MockSchedulerFactory schedulerFactory = new MockTaskScheduler.MockSchedulerFactory(); - taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, factory.createWorkerSizeProvider()); + final WorkerSizeProviderProxy workerSizeProvider = factory.createWorkerSizeProvider(); + + doAnswer(a -> { + // This will unblock the startup guard + ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0); + return null; + }).when(workerSizeProvider).addObserver(any(), any()); + taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, workerSizeProvider); schedule = handler.read(new File(getClass().getClassLoader().getResource("queue/priority-task-scheduler.ops.json").getFile())); } @@ -67,25 +80,35 @@ void after() throws InterruptedException { } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testAddScheduler() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); assertEquals(RabbitMQQueueType.STREAM, schedule.getQueueType(), "Should have queueType STREAM"); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testModifyQueue() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); schedule.getQueues().get(0).setPriority(30); - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated"); + updateTaskScheduler(); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } @Test + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testRemoveQueue() throws InterruptedException { - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler running"); + updateTaskScheduler(); schedule.getQueues().remove(0); - assertTrue(taskManager.updateTaskScheduler(schedule), "TaskScheduler updated"); + updateTaskScheduler(); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } + + private void updateTaskScheduler() throws InterruptedException { + taskManager.updateTaskScheduler(schedule); + while(!taskManager.isRunning(schedule.getWorkerQueueName())) { + Thread.sleep(300); + } + } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index 16df98e..64286de 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -46,6 +46,7 @@ import io.opentelemetry.api.metrics.DoubleGaugeBuilder; import io.opentelemetry.api.metrics.Meter; +import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.client.TaskMetrics; @@ -75,13 +76,17 @@ void beforeEach() { final DoubleGaugeBuilder mockGaugeBuilder = mock(DoubleGaugeBuilder.class); doAnswer(inv -> { final DoubleGauge gauge = mock(DoubleGauge.class); + doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); doReturn(gauge).when(mockGaugeBuilder).build(); mockedGauges.put(inv.getArgument(0, String.class), gauge); return mockGaugeBuilder; }).when(mockedMeter).gaugeBuilder(any()); lenient().doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); - reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, workMetrics); + final StartupGuard startupGuard = new StartupGuard(); + + startupGuard.onNumberOfWorkersUpdate(0, 0); + reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, workMetrics, startupGuard); verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); } From f22391250457033c6524985ff91a438f489a88f4 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Fri, 13 Mar 2026 19:00:56 +0100 Subject: [PATCH 2/3] Changed StartUp guard, and refactored WorkerPool to look for messages that were still on the queue Stopping at startup when there are still messages on the queue can potential block when a very long task is still in progress. Therefore changed implementation to not block on waiting for reaching 0, but do block until the first time an update from RabbitMQ has been retrieved. By waiting for that information the taskmanager can be initialized with the initial number of messages that are still on the queue. The WorkerPool and the Metrics reporter can than initialize itself before actual scheduling is started, and it can than handle the tasks already on the queue when they are finished. --- .../nl/aerius/taskmanager/StartupGuard.java | 8 +-- .../nl/aerius/taskmanager/TaskManager.java | 6 +- .../nl/aerius/taskmanager/WorkerPool.java | 59 ++++++++++++------- .../metrics/PerformanceMetricsReporter.java | 23 +++++--- .../aerius/taskmanager/StartupGuardTest.java | 39 +++++++++--- .../nl/aerius/taskmanager/WorkerPoolTest.java | 27 ++++++--- .../PerformanceMetricsReporterTest.java | 27 +++++---- 7 files changed, 124 insertions(+), 65 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java index 0d95108..68c91a4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java @@ -21,9 +21,9 @@ import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; /** - * Class to be used at startup. The Scheduler should not start before the worker queue is empty. - * This to let any work that was left over after a restart of the taskmanager to complete before adding new tasks. - * Because the Taskmanager is not aware of the tasks already on the queue and therefore won't be counted in the metrics. + * Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue. + * This to register any work that is still on the queuue and to properly calculate load metrics. + * Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics. * This can result in the metrics being skewed, and thereby negatively reporting load metrics. */ public class StartupGuard implements WorkerSizeObserver { @@ -49,7 +49,7 @@ public void waitForOpen() throws InterruptedException { @Override public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { synchronized (openSemaphore) { - if (!open && numberOfMessages == 0) { + if (!open) { open = true; openSemaphore.release(); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index f06c4e8..61a226a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -130,7 +130,7 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), - OpenTelemetryMetrics.METER, workerPool, startupGuard); + OpenTelemetryMetrics.METER, startupGuard); watchDog.addQueueWatchDogListener(workerPool); watchDog.addQueueWatchDogListener(taskScheduler); @@ -139,9 +139,11 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep workerProducer.addWorkerProducerHandler(reporter); workerProducer.addWorkerProducerHandler(watchDog); - workerSizeObserverProxy.addObserver(workerQueueName, startupGuard); + workerSizeObserverProxy.addObserver(workerQueueName, reporter); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); workerSizeObserverProxy.addObserver(workerQueueName, watchDog); + // startup Guard should be the last observer added as it will unlock the task dispatcher + workerSizeObserverProxy.addObserver(workerQueueName, startupGuard); if (taskScheduler instanceof final WorkerSizeObserver wzo) { workerSizeObserverProxy.addObserver(workerQueueName, wzo); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index d489735..10b0332 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -47,11 +47,14 @@ class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMet private final Semaphore freeWorkers = new Semaphore(0); private final Map runningWorkers = new ConcurrentHashMap<>(); - private int totalConfiguredWorkers; private final String workerQueueName; private final WorkerProducer wp; private final WorkerUpdateHandler workerUpdateHandler; + private int totalReportedWorkers; + private int initialUnaccountedWorkers; + private boolean firstUpdateReceived; + public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) { this.workerQueueName = workerQueueName; this.wp = wp; @@ -83,19 +86,19 @@ public void sendTaskToWorker(final Task task) throws IOException { public int getWorkerSize() { synchronized (this) { - return freeWorkers.availablePermits() + runningWorkers.size(); + return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers; } } @Override public int getReportedWorkerSize() { - return totalConfiguredWorkers; + return totalReportedWorkers; } @Override public int getRunningWorkerSize() { synchronized (this) { - return runningWorkers.size(); + return runningWorkers.size() + initialUnaccountedWorkers; } } @@ -120,24 +123,32 @@ void releaseWorker(final String taskId) { * @param taskRecord the task is expected to be on. */ void releaseWorker(final String taskId, final TaskRecord taskRecord) { - if (taskRecord != null) { - synchronized (this) { - if (runningWorkers.containsKey(taskId)) { - // if currentSize is smaller than the worker size it means the worker - // must not be re-added as free worker but removed from the pool. - if (totalConfiguredWorkers >= runningWorkers.size()) { - freeWorkers.release(1); - } - runningWorkers.remove(taskId); - } else { - LOG.info("[{}][taskId:{}] Task for queue '{}' not found, maybe it was already released).", workerQueueName, taskId, taskRecord.queueName()); + synchronized (this) { + if (runningWorkers.containsKey(taskId)) { + freeWorker(); + runningWorkers.remove(taskId); + } else { + if (initialUnaccountedWorkers > 0) { + freeWorker(); } + initialUnaccountedWorkers = Math.max(initialUnaccountedWorkers - 1, 0); + LOG.info("[{}][taskId:{}] Unknown task received, possible left over of restart).", workerQueueName, taskId); } + } + if (taskRecord != null) { workerUpdateHandler.onTaskFinished(taskRecord); LOG.debug("[{}][taskId:{}] Task released).", workerQueueName, taskId); } } + private void freeWorker() { + // if currentSize is smaller than the worker size it means the worker + // must not be re-added as free worker but removed from the pool. + if (totalReportedWorkers >= (runningWorkers.size() + initialUnaccountedWorkers)) { + freeWorkers.release(1); + } + } + /** * Takes a worker from the free workers list and add it to the reserved workers list. Blocks until a worker becomes available on the free worker * list. @@ -169,24 +180,28 @@ public void reserveWorker() { @Override public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { synchronized (this) { + if (!firstUpdateReceived) { + initialUnaccountedWorkers = numberOfMessages; + firstUpdateReceived = true; + } updateNumberOfWorkers(numberOfWorkers); } } private void updateNumberOfWorkers(final int numberOfWorkers) { - final int previousTotalConfiguredWorkers = totalConfiguredWorkers; - totalConfiguredWorkers = numberOfWorkers; - final int deltaWorkers = totalConfiguredWorkers - getWorkerSize(); + final int previousTotalReportedWorkers = totalReportedWorkers; + totalReportedWorkers = numberOfWorkers; + final int deltaWorkers = totalReportedWorkers - getWorkerSize(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); - LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers); + LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalReportedWorkers, deltaWorkers); } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0) && freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers))) { - LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers); + LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalReportedWorkers, deltaWorkers); } - if (previousTotalConfiguredWorkers != totalConfiguredWorkers) { - workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers); + if (previousTotalReportedWorkers != totalReportedWorkers) { + workerUpdateHandler.onWorkerPoolSizeChange(totalReportedWorkers); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 5eae040..7b07f78 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -32,8 +32,8 @@ import io.opentelemetry.api.metrics.Meter; import nl.aerius.taskmanager.StartupGuard; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.client.TaskMetrics; import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; @@ -53,7 +53,7 @@ * * - Average load (in percentage) of all workers (of a certain type) together. */ -public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener { +public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener, WorkerSizeObserver { private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class); @@ -82,7 +82,6 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final Meter meter; private final String queueGroupName; - private final WorkerMetrics workerMetrics; private final DoubleGauge loadGauge; private final Attributes workerAttributes; @@ -90,11 +89,12 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW // as it doesn't have any metrics on it anymore. private final Set dispatchedTasks = new HashSet<>(); + private int numberOfWorkers; + public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, - final WorkerMetrics workerMetrics, final StartupGuard startupGuard) { + final StartupGuard startupGuard) { this.queueGroupName = queueGroupName; this.meter = meter; - this.workerMetrics = workerMetrics; this.startupGuard = startupGuard; // Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker. @@ -140,7 +140,7 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, workerMetrics.getReportedWorkerSize()); + loadMetrics.register(1, numberOfWorkers); } @Override @@ -149,8 +149,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - if (dispatchedTasks.remove(messageId)) { - loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + loadMetrics.register(-1, numberOfWorkers); + } + + @Override + public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + this.numberOfWorkers = numberOfWorkers; + if (!startupGuard.isOpen() && numberOfMessages > 0) { + LOG.info("Queue {} will be started with {} messages already on the queue.", queueGroupName, numberOfMessages); + loadMetrics.register(numberOfMessages, numberOfWorkers); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java index 9956c1b..73397c7 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java @@ -19,29 +19,50 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; /** * Test class for {@link StartupGuard} */ class StartupGuardTest { - private boolean open; @Test void testOpen() { - open = false; final StartupGuard guard = new StartupGuard(); - new Thread(this::trigger).run(); + assertFalse(guard.isOpen(), "Guard should not be open."); guard.onNumberOfWorkersUpdate(0, 1); - assertFalse(guard.isOpen(), "Guard should still not be open when number of message > 0."); - guard.onNumberOfWorkersUpdate(0, 0); - assertTrue(guard.isOpen(), "Guard should be open when number of message has become 0."); + assertTrue(guard.isOpen(), "Guard should be open when onNumberOfWorkersUpdate is called."); guard.onNumberOfWorkersUpdate(0, 1); - assertTrue(guard.isOpen(), "Guard should still be be open once the number of messages has been zero once."); + assertTrue(guard.isOpen(), "Guard should still remain open onNumberOfWorkersUpdate has been called."); } - private void trigger() { - open = true; + @Test + @Timeout(value = 3, unit = TimeUnit.SECONDS) + void testWaitForOpen() throws InterruptedException { + final StartupGuard guard = new StartupGuard(); + final Semaphore waitForStart = new Semaphore(0); + final Semaphore waitForOpen = new Semaphore(0); + + new Thread(() -> { + try { + waitForStart.release(); + guard.waitForOpen(); + waitForOpen.release(); + } catch (final InterruptedException e) { + // Ignore exception + } + }).start(); + // First wait for first semaphore to be unlocked. + waitForStart.acquire(); + assertFalse(guard.isOpen(), "Guard should not be open."); + guard.onNumberOfWorkersUpdate(1, 1); + // Wait for semaphore that is called after waitForOpen is unlocked. + waitForOpen.acquire(); + assertTrue(guard.isOpen(), "Guard should now be open."); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 562cdbf..83a3767 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.lenient; @@ -26,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -75,16 +75,30 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { - assertSame(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); + assertEquals(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); workerPool.onNumberOfWorkersUpdate(10, 0); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); final Task task = createAndSendTaskToWorker(); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); workerPool.releaseWorker(task.getId()); - assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + } + + @Test + void testWorkerPoolSizingWithInitialSize() throws IOException { + workerPool.onNumberOfWorkersUpdate(10, 5); + assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is 5"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5); + assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is still 5"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + IntStream.range(1, 6).forEach(a -> workerPool.onWorkerFinished("", null)); + // workerPool.onNumberOfWorkersUpdate(10, 0); + assertEquals(0, workerPool.getRunningWorkerSize(), "After unknonw tasks received running size should be 0"); + assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); } @Test @@ -112,7 +126,6 @@ void testWorkerPoolScaleDown() throws IOException { assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); } - @Test void testReleaseTaskTwice() throws IOException { workerPool.onNumberOfWorkersUpdate(2, 0); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index 64286de..2e608a3 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -47,7 +47,6 @@ import io.opentelemetry.api.metrics.Meter; import nl.aerius.taskmanager.StartupGuard; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.client.TaskMetrics; /** @@ -64,11 +63,11 @@ class PerformanceMetricsReporterTest { private final Map mockedGauges = new HashMap<>(); private @Mock Meter mockedMeter; - private @Mock WorkerMetrics workMetrics; private @Mock ScheduledExecutorService scheduledExecutorService; private @Captor ArgumentCaptor methodCaptor; private @Captor ArgumentCaptor durationCaptor; + private StartupGuard startupGuard; private PerformanceMetricsReporter reporter; @BeforeEach @@ -83,32 +82,29 @@ void beforeEach() { return mockGaugeBuilder; }).when(mockedMeter).gaugeBuilder(any()); lenient().doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); - final StartupGuard startupGuard = new StartupGuard(); + startupGuard = new StartupGuard(); - startupGuard.onNumberOfWorkersUpdate(0, 0); - reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, workMetrics, startupGuard); + reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, startupGuard); verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); } @Test void testOnWorkDispatched() { + startUp(10, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); assertGaugeCalls("dispatched", "wait", 2.0, v -> v > 99.0); } @Test void testOnWorkerFinished() { + startUp(2, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); - lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); assertGaugeCalls("work", "duration", 2.0, v -> v > 99.0); - // getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1" - verify(workMetrics, times(2)).getReportedWorkerSize(); } private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) { @@ -117,24 +113,24 @@ private void assertGaugeCalls(final String label, final String type, final doubl verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any()); verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() - .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test void testWorkLoad() throws InterruptedException { - doReturn(4).when(workMetrics).getReportedWorkerSize(); + startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. methodCaptor.getValue().run(); verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(durationCaptor.capture(), any()); - assertEquals(50.0, durationCaptor.getAllValues().get(1), "Expected workload of 50%"); + assertEquals(75.0, durationCaptor.getAllValues().get(1), "Expected workload of 75%"); } @Test void testReset() throws InterruptedException { - doReturn(4).when(workMetrics).getReportedWorkerSize(); + startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. @@ -148,6 +144,11 @@ void testReset() throws InterruptedException { assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore"); } + private void startUp(final int numberOfWorkers, final int numberOfMessages) { + reporter.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + } + private Map createMap(final String queueName, final long duration) { return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); } From 00a803d74ef34f66d3389ee1a520c1170a85b1ed Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Thu, 19 Mar 2026 14:53:42 +0100 Subject: [PATCH 3/3] Review comments --- .../src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java | 3 +-- .../taskmanager/metrics/PerformanceMetricsReporterTest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 83a3767..0a205fd 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -96,8 +96,7 @@ void testWorkerPoolSizingWithInitialSize() throws IOException { assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is still 5"); assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); IntStream.range(1, 6).forEach(a -> workerPool.onWorkerFinished("", null)); - // workerPool.onNumberOfWorkersUpdate(10, 0); - assertEquals(0, workerPool.getRunningWorkerSize(), "After unknonw tasks received running size should be 0"); + assertEquals(0, workerPool.getRunningWorkerSize(), "After unknown tasks received running size should be 0"); assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index 2e608a3..d30590b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -113,7 +113,7 @@ private void assertGaugeCalls(final String label, final String type, final doubl verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any()); verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() - .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test