diff --git a/doc/telemetry.md b/doc/telemetry.md index 6f60d15..441ae67 100644 --- a/doc/telemetry.md +++ b/doc/telemetry.md @@ -77,22 +77,56 @@ The metric value `aerius.worker.` for attribute `rabbitmq_queue` sh The TaskManager defines a few custom metrics for OpenTelemetry to capture. These metrics are all defined with the `nl.aerius.TaskManager` instrumentation scope. - -| metric name | type | description | -|-----------------------------------------------------|-----------|----------------------------------------------------------------------| -| `aer.taskmanager.work.load`1 | gauge | Percentage of workers occupied. | -| `aer.taskmanager.worker_size`1 | gauge | The sum of idle workers + occupied workers. | -| `aer.taskmanager.current_worker_size`1 | gauge | The number of workers based on what RabbitMQ reports. | -| `aer.taskmanager.running_worker_size`1 | gauge | The number of workers that are occupied. | -| `aer.taskmanager.running_client_size`2 | gauge | The number of workers that are occupied for a specific client queue. | -| `aer.taskmanager.dispatched`1 | histogram | The number of tasks dispatched. | -| `aer.taskmanager.dispatched.wait`1 | histogram | The average wait time of tasks dispatched. | -| `aer.taskmanager.dispatched.queue`2 | histogram | The number of tasks dispatched per client queue. | -| `aer.taskmanager.dispatched.queue.wait`2 | histogram | The average wait time of tasks dispatched per client queue. | +The metrics about availability of workers use the Open Telemetry standard for reporting such metrics. +Therefore `.limit` gives the total number of workers available. +And `.usage` gives the metrics about how the workers are used. +The usage metrics have an attribute `state` that identifies if the metric is for `used` or `free` amount of workers. +Additional `aer.rabbitmq.worker.usage` records a third state `waiting`. +And `aer.taskmanager.client.queue.usage` has states `used` and `waiting` that provides information on tasks send to the TaskManager. +This shows the number of tasks on the worker queue that are not yet picked up by any worker. + + +| metric name | type | description | +|-------------------------------------------------------|-----------|-------------------------------------------------------------------------| +| `aer.taskmanager.work.load`1 | gauge | Percentage of workers occupied. | +| `aer.taskmanager.worker.limit`1 | gauge | Weighted total number of workers based on tasks send to workers. | +| `aer.taskmanager.worker.usage`2 | gauge | Weighted usage of workers based on tasks send to workers. | +| `aer.taskmanager.workerpool.worker.limit`1 | gauge | TaskManager internal total number of workers. | +| `aer.taskmanager.workerpool.worker.usage`2 | gauge | TaskManager internal usage of workers. | +| `aer.taskmanager.client.queue.usage`2 | gauge | TaskManager internal metrics on client queue usage. | +| `aer.rabbitmq.worker.limit`1 | gauge | Total number of workers available as reported by RabbitMQ | +| `aer.rabbitmq.worker.usage`2 | gauge | Usage o the workers based on the messages on the RabbitMQ worker queue. | +| `aer.taskmanager.dispatched`1 | histogram | The number of tasks dispatched. | +| `aer.taskmanager.dispatched.wait`1 | histogram | The average wait time of tasks dispatched. | +| `aer.taskmanager.dispatched.queue`3 | histogram | The number of tasks dispatched per client queue. | +| `aer.taskmanager.dispatched.queue.wait`3/sup> | histogram | The average wait time of tasks dispatched per client queue. | + +Basically there are 3 metric groups that report similar information. +First the `aer.taskmanager.worker.*` metrics are a weighted value based on when tasks are send to the workers. +These metrics should give the most accurate value about usage of the workers. +Second the `aer.taskmanager.workerpool.worker.*` metrics are the internally used values to base scheduling priorities on. +These metrics should be close to the other metrics, but can be used to detect issues in case the TaskManager doesn't seem to operate as expected. +Third the `aer.rabbitmq.worker.*` metrics are the values as received from the RabbitMQ api. +These metrics could also be obtained by directly reading the RabbitMQ api, but specifically for the usage don't require additional logic to get the usage metrics. +In general these metrics should report the same values, but due to timing (e.g. the moment the measure is taken) there can be differences. + +##### Deprecated metrics + +The following metrics have been replaced by the more standardized naming mentioned above + +| Metric name | type | description | Replaced by | +|-------------------------------------------------------|-----------|----------------------------------------------------------------------|--------------------------------------------| +| `aer.taskmanager.worker_size`1 | gauge | The sum of idle workers + occupied workers. | `aer.taskmanager.workerpool.worker.limit` | +| `aer.taskmanager.current_worker_size`1 | gauge | The number of workers based on what RabbitMQ reports. | `aer.rabbitmq.worker.limit` | +| `aer.taskmanager.running_worker_size`1 | gauge | The number of workers that are occupied. | `aer.taskmanager.workerpool.worker..usage` | +| `aer.taskmanager.running_client_size`3 | gauge | The number of workers that are occupied for a specific client queue. | `aer.taskmanager.client.queue.usage` | + +##### Metric attributes The workers have different attributes to distinguish specific metrics. * 1 have attribute `worker_type`. -* 2 have attribute `worker_type` and `queue_name`. +* 2 have attribute `worker_type` and `state`. `state` can have the value `used`, `free` or `waiting`. +* 3 have attribute `worker_type` and `queue_name`. `worker_type` is the type of worker, e.g. `ops`. `queue_name` is the originating queue the task initially was put on, e.g. `...calculator_ui_small`. @@ -103,6 +137,9 @@ The workers have different attributes to distinguish specific metrics. ### RabbitMQ metrics +Besides the RabbitMQ metrics reported by the TaskManager it is also possible to get the metrics from RabbitMQ directly. +To read those metrics the following metrics are available: + | metric name | type | description | |-----------------------------------------------------|-------|--------------------------------------------------------------------| | `rabbitmq_detailed_queue_messages` | gauge | Total number of messages on the queue, both picked up and waiting. | diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 2dd111c..c58db32 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -69,7 +69,7 @@ public void onWorkerFinished(final String messageId, final Map m } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) { if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); listeners.forEach(QueueWatchDogListener::reset); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java index 68c91a4..580eaad 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/StartupGuard.java @@ -22,8 +22,8 @@ /** * Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue. - * This to register any work that is still on the queuue and to properly calculate load metrics. - * Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics. + * This to register any work that is still on the queue and to properly calculate load metrics. + * Because the Task Manager is not aware of the tasks already on the queue and therefore otherwise these messages won't be counted in the metrics. * This can result in the metrics being skewed, and thereby negatively reporting load metrics. */ public class StartupGuard implements WorkerSizeObserver { @@ -47,7 +47,7 @@ public void waitForOpen() throws InterruptedException { } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { synchronized (openSemaphore) { if (!open) { open = true; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 61a226a..551e92b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -42,6 +42,10 @@ import nl.aerius.taskmanager.domain.TaskSchedule; import nl.aerius.taskmanager.metrics.OpenTelemetryMetrics; import nl.aerius.taskmanager.metrics.PerformanceMetricsReporter; +import nl.aerius.taskmanager.metrics.RabbitMQUsageMetricsProvider; +import nl.aerius.taskmanager.metrics.TaskManagerMetricsRegister; +import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsProvider; +import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsWrapper; import nl.aerius.taskmanager.scheduler.TaskScheduler; import nl.aerius.taskmanager.scheduler.TaskScheduler.TaskSchedulerFactory; @@ -58,6 +62,7 @@ class TaskManager> { private final TaskSchedulerFactory schedulerFactory; private final WorkerSizeProviderProxy workerSizeObserverProxy; private final Map buckets = new HashMap<>(); + private final TaskManagerUsageMetricsWrapper taskManagerMetrics; public TaskManager(final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService, final AdaptorFactory factory, final TaskSchedulerFactory schedulerFactory, final WorkerSizeProviderProxy workerSizeObserverProxy) { @@ -66,6 +71,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto this.factory = factory; this.schedulerFactory = schedulerFactory; this.workerSizeObserverProxy = workerSizeObserverProxy; + this.taskManagerMetrics = new TaskManagerUsageMetricsWrapper(OpenTelemetryMetrics.METER); } /** @@ -78,6 +84,7 @@ public void updateTaskScheduler(final TaskSchedule schedule) throws Interrupt // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType()); + if (!buckets.containsKey(workerQueueName)) { LOG.info("Added scheduler for worker queue {}", workerQueueName); buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueConfig)); @@ -112,9 +119,20 @@ public void removeTaskScheduler(final String workerQueueName) { public void shutdown() { buckets.forEach((k, v) -> v.shutdown()); buckets.clear(); + taskManagerMetrics.close(); + } + + /** + * Returns the {@link TaskScheduleBucket}. Intended to be used in tests. + * + * @param queueName queue to get the bucket for + * @return the bucket for the given queue name + */ + TaskScheduleBucket getTaskScheduleBucket(final String queueName) { + return buckets.get(queueName); } - private class TaskScheduleBucket { + class TaskScheduleBucket { private final TaskDispatcher dispatcher; private final WorkerProducer workerProducer; private final Map taskConsumers = new HashMap<>(); @@ -129,17 +147,23 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep taskScheduler = schedulerFactory.createScheduler(queueConfig); workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); + final TaskManagerUsageMetricsProvider taskManagerUsageMetrics = new TaskManagerUsageMetricsProvider(workerQueueName); + final TaskManagerMetricsRegister taskManagerMetricsRegister = new TaskManagerMetricsRegister(taskManagerUsageMetrics, startupGuard); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), - OpenTelemetryMetrics.METER, startupGuard); + OpenTelemetryMetrics.METER); watchDog.addQueueWatchDogListener(workerPool); watchDog.addQueueWatchDogListener(taskScheduler); watchDog.addQueueWatchDogListener(reporter); + watchDog.addQueueWatchDogListener(taskManagerMetricsRegister); workerProducer.addWorkerProducerHandler(reporter); + workerProducer.addWorkerProducerHandler(taskManagerMetricsRegister); workerProducer.addWorkerProducerHandler(watchDog); - workerSizeObserverProxy.addObserver(workerQueueName, reporter); + final RabbitMQUsageMetricsProvider rabbitMQWorkerSizeObserver = new RabbitMQUsageMetricsProvider(workerQueueName); + workerSizeObserverProxy.addObserver(workerQueueName, rabbitMQWorkerSizeObserver); + workerSizeObserverProxy.addObserver(workerQueueName, taskManagerMetricsRegister); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); workerSizeObserverProxy.addObserver(workerQueueName, watchDog); // startup Guard should be the last observer added as it will unlock the task dispatcher @@ -158,6 +182,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep // Wait for worker queue to be empty. startupGuard.waitForOpen(); LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig); + taskManagerMetrics.addRabbitMQUsageMetricsProvider(rabbitMQWorkerSizeObserver); + taskManagerMetrics.addWorkerPoolUsageMetricsProvider(workerPool); + taskManagerMetrics.addTaskManagerUsageMetricsProvider(taskManagerUsageMetrics); dispatcher.run(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); @@ -206,6 +233,7 @@ public void addTaskConsumerIfAbsent(final QueueConfig queueConfig) { }); } + /** * Removes a task consumer with the given queue name. * @@ -220,8 +248,19 @@ private void removeTaskConsumer(final String taskQueueName) { public void shutdown() { dispatcher.shutdown(); workerProducer.shutdown(); + taskManagerMetrics.remove(workerQueueName); WorkerPoolMetrics.removeMetrics(workerQueueName); taskConsumers.forEach((k, v) -> v.shutdown()); } + + /** + * Test method to check if there is a task consumer for the given queue name. + * + * @param queueName name of the queue to check + * @return true if the queue is present + */ + boolean hasTaskConsumer(final String queueName) { + return taskConsumers.containsKey(queueName); + } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 10b0332..90454ed 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -34,6 +34,7 @@ import nl.aerius.taskmanager.domain.TaskRecord; import nl.aerius.taskmanager.domain.WorkerUpdateHandler; import nl.aerius.taskmanager.exception.NoFreeWorkersException; +import nl.aerius.taskmanager.metrics.UsageMetricsProvider; /** * Class to manage workers. Contains a list of all available workers, which are: free workers, reserved workers and running workers. @@ -41,7 +42,7 @@ *

Reserved workers are workers that are waiting for a task to become available on the queue. *

Running workers are workers for that are busy running the task and are waiting for the task to finish. */ -class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener { +class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, UsageMetricsProvider, QueueWatchDogListener, WorkerMetrics { private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class); @@ -84,7 +85,13 @@ public void sendTaskToWorker(final Task task) throws IOException { LOG.trace("[{}][taskId:{}] Task sent", workerQueueName, task.getId()); } - public int getWorkerSize() { + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + @Override + public int getNumberOfWorkers() { synchronized (this) { return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers; } @@ -96,12 +103,17 @@ public int getReportedWorkerSize() { } @Override - public int getRunningWorkerSize() { + public int getNumberOfUsedWorkers() { synchronized (this) { return runningWorkers.size() + initialUnaccountedWorkers; } } + @Override + public int getNumberOfFreeWorkers() { + return freeWorkers.availablePermits(); + } + @Override public void onWorkerFinished(final String messageId, final Map messageMetaData) { releaseWorker(messageId); @@ -165,6 +177,17 @@ public void reserveWorker() { } } + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + synchronized (this) { + if (!firstUpdateReceived) { + initialUnaccountedWorkers = numberOfMessages; + firstUpdateReceived = true; + } + updateNumberOfWorkers(numberOfWorkers); + } + } + /** * Sets the number of workers which are actually available. This number should * be determined on the number of workers that are actually in operation. @@ -175,23 +198,11 @@ public void reserveWorker() { * workers matches the actual number. * * @param numberOfWorkers Actual size of number of workers in operation - * @param numberOfMessages Actual total number of messages on the queue */ - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - synchronized (this) { - if (!firstUpdateReceived) { - initialUnaccountedWorkers = numberOfMessages; - firstUpdateReceived = true; - } - updateNumberOfWorkers(numberOfWorkers); - } - } - private void updateNumberOfWorkers(final int numberOfWorkers) { final int previousTotalReportedWorkers = totalReportedWorkers; totalReportedWorkers = numberOfWorkers; - final int deltaWorkers = totalReportedWorkers - getWorkerSize(); + final int deltaWorkers = totalReportedWorkers - getNumberOfWorkers(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java index 23f1bf4..f762a0f 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java @@ -25,6 +25,7 @@ import io.opentelemetry.api.metrics.ObservableDoubleGauge; import nl.aerius.taskmanager.metrics.OpenTelemetryMetrics; +import nl.aerius.taskmanager.metrics.UsageMetricsProvider; /** * Set up metric collection for this worker pool with the given type name. @@ -35,9 +36,13 @@ public final class WorkerPoolMetrics { private enum WorkerPoolMetricType { // @formatter:off - WORKER_SIZE(WorkerPool::getWorkerSize, "Number of workers based on internal state of taskmanager"), - CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, "Current number of workers according to taskmanager"), - RUNNING_WORKER_SIZE(WorkerPool::getRunningWorkerSize, "Running (or occupied) number of workers according to taskmanager"); + WORKER_SIZE(UsageMetricsProvider::getNumberOfWorkers, "Number of workers based on internal state of taskmanager"), + @Deprecated + CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, + "Current number of workers according to taskmanager (deprecated replaced with 'aer.taskmanager.workerppol.worker.usage')"), + @Deprecated + RUNNING_WORKER_SIZE(UsageMetricsProvider::getNumberOfUsedWorkers, + "Used number of workers according to taskmanager (deprecated replaced with 'aer.taskmanager.workerppol.worker.usage')"); // @formatter:on private final Function function; @@ -59,7 +64,6 @@ String getGaugeName() { String getDescription() { return description; } - } private WorkerPoolMetrics() { @@ -68,6 +72,7 @@ private WorkerPoolMetrics() { public static void setupMetrics(final WorkerPool workerPool, final String workerQueueName) { final Attributes attributes = OpenTelemetryMetrics.workerAttributes(workerQueueName); + for (final WorkerPoolMetricType metricType : WorkerPoolMetricType.values()) { REGISTERED_METRICS.put(gaugeIdentifier(workerQueueName, metricType), OpenTelemetryMetrics.METER.gaugeBuilder(metricType.getGaugeName()) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java index 34c67ab..7203053 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java @@ -75,11 +75,11 @@ default void onWorkDispatched(final String messageId, final Map /** * Interface to retrieve metrics about the current worker sizes. */ - interface WorkerMetrics { + public interface WorkerMetrics { /** * @return Returns the number of workers currently busy. */ - int getRunningWorkerSize(); + int getNumberOfUsedWorkers(); /** * @return Returns the number total number of workers based on what the queue reports as being active. diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java index 7bcb427..54c57c3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java @@ -25,7 +25,8 @@ public interface WorkerSizeObserver { * Gives the number of workers processes connected on the queue. * * @param numberOfWorkers number of number of workers processes - * @param numberOfMessages Actual total number of messages on the queue + * @param numberOfMessages Total number of messages on the queue + * @param numberOfMessagesInProgress Number of messages being processed by the workers */ - void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages); + void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 2a2a837..3e8cd0b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -16,6 +16,8 @@ */ package nl.aerius.taskmanager.metrics; +import java.util.function.ToDoubleBiFunction; + /** * Class to keep track of work load per worker type. * Each time a new task is added (dispatched) or removed (work finished) the number of running workers is counted and calculated how long @@ -32,41 +34,50 @@ class LoadMetric { /** * Total measured time since the last time {@link #process()} was called. */ - private int totalMeasureTime; + private long totalMeasureTime; /** - * Total registered load time. + * Measured free workers as the sum of number of free workers for specific time moments. Sum of (free workers * time frame). + * Dividing this number by the total time of the time frame will give an average number of free workers. */ - private double totalLoad; + private double total; /** * Number of workers running at a time. */ - private int runningWorkers; + private int usedWorkers; /** * Total number of available workers. */ - private int totalWorkers; + private int numberOfWorkers; + private final ToDoubleBiFunction countFunction; + private final ToDoubleBiFunction sumFunction; + + public LoadMetric(final ToDoubleBiFunction countFunction, final ToDoubleBiFunction sumFunction) { + this.countFunction = countFunction; + this.sumFunction = sumFunction; + } /** * Register change in number of running workers. * - * @param deltaActiveWorkers number of jobs on the workers being added or subtracted. - * @param totalWorkers Total number of available workers + * @param deltaUsedWorkers number of jobs on the workers being added or subtracted. + * @param numberOfWorkers Number of available workers */ - public synchronized void register(final int deltaActiveWorkers, final int totalWorkers) { - this.totalWorkers = totalWorkers; + public synchronized void register(final int deltaUsedWorkers, final int numberOfWorkers) { + this.numberOfWorkers = numberOfWorkers; final long newLast = System.currentTimeMillis(); final long delta = newLast - last; - totalLoad += delta * (totalWorkers > 0 ? (runningWorkers / (double) totalWorkers) : 0); + + total += delta * countFunction.applyAsDouble(numberOfWorkers, usedWorkers); totalMeasureTime += delta; last = newLast; - runningWorkers += deltaActiveWorkers; + usedWorkers += deltaUsedWorkers; } /** * Resets the metric state. Sets running workers to 0, and resets the average load time by calling process. */ public synchronized void reset() { - runningWorkers = 0; + usedWorkers = 0; process(); } @@ -77,11 +88,11 @@ public synchronized void reset() { */ public synchronized double process() { // Call register here to set the end time this moment. This will calculate workers running up till now as being active. - register(0, totalWorkers); - final double averageLoad = (totalLoad * 100.0) / totalMeasureTime; + register(0, numberOfWorkers); + final double averageTotal = totalMeasureTime > 0 ? sumFunction.applyAsDouble(total, totalMeasureTime) : 0; totalMeasureTime = 0; - totalLoad = 0; - return averageLoad; + total = 0; + return averageTotal; } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java index ef66e57..5f2a38c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/OpenTelemetryMetrics.java @@ -24,7 +24,7 @@ import io.opentelemetry.api.metrics.Meter; /** - * Class to help with opntelemetry metrics within the taskmanager. + * Class to help with Open Telemetry metrics within the TaskManager. */ public final class OpenTelemetryMetrics { @@ -43,6 +43,13 @@ public static Attributes workerAttributes(final String workerType) { .build(); } + public static Attributes workerAttributes(final String workerType, final String atttributeName, final String attributeValue) { + return Attributes.builder() + .put(WORKER_TYPE_ATTRIBUTE, workerIdentifier(workerType)) + .put(atttributeName, attributeValue) + .build(); + } + public static Attributes queueAttributes(final String workerQueueName, final String queueName) { return Attributes.builder() .put(WORKER_TYPE_ATTRIBUTE, workerIdentifier(workerQueueName)) @@ -50,6 +57,15 @@ public static Attributes queueAttributes(final String workerQueueName, final Str .build(); } + public static Attributes queueAttributes(final String workerQueueName, final String queueName, final String atttributeName, + final String attributeValue) { + return Attributes.builder() + .put(WORKER_TYPE_ATTRIBUTE, workerIdentifier(workerQueueName)) + .put(QUEUE_ATTRIBUTE, queueName) + .put(atttributeName, attributeValue) + .build(); + } + private static String workerIdentifier(final String workerQueueName) { final int workerTypeIndex = workerQueueName.lastIndexOf('.'); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 7b07f78..8d84ce4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -31,9 +31,7 @@ import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.client.TaskMetrics; import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; @@ -53,7 +51,7 @@ * * - Average load (in percentage) of all workers (of a certain type) together. */ -public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener, WorkerSizeObserver { +public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener { private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class); @@ -76,26 +74,19 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW private final DurationMetric dispatchedWorkerMetrics; private final Map workQueueMetrics = new HashMap<>(); private final DurationMetric workWorkerMetrics; - private final LoadMetric loadMetrics = new LoadMetric(); - - private final StartupGuard startupGuard; private final Meter meter; private final String queueGroupName; - private final DoubleGauge loadGauge; - private final Attributes workerAttributes; + private final Attributes attributesWorker; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue // as it doesn't have any metrics on it anymore. private final Set dispatchedTasks = new HashSet<>(); - private int numberOfWorkers; - - public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, - final StartupGuard startupGuard) { + public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter) { this.queueGroupName = queueGroupName; this.meter = meter; - this.startupGuard = startupGuard; // Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker. // Measures by worker and per queue to the worker @@ -117,12 +108,9 @@ public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThr workQueueDurationGauge = createGauge("aer.taskmanager.work.queue.duration", "Average duration time a task from a queue took to process on a worker, including wait time."); - // Average load time (in percentage) of the work load on all workers together. - loadGauge = meter.gaugeBuilder("aer.taskmanager.work.load").setDescription("Percentage of workers used in the timeframe.").build(); - - workerAttributes = OpenTelemetryMetrics.workerAttributes(queueGroupName); - dispatchedWorkerMetrics = new DurationMetric(workerAttributes); - workWorkerMetrics = new DurationMetric(workerAttributes); + attributesWorker = OpenTelemetryMetrics.workerAttributes(queueGroupName); + dispatchedWorkerMetrics = new DurationMetric(attributesWorker); + workWorkerMetrics = new DurationMetric(attributesWorker); newScheduledThreadPool.scheduleWithFixedDelay(this::update, 1, UPDATE_TIME_SECONDS, TimeUnit.SECONDS); } @@ -140,7 +128,6 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, numberOfWorkers); } @Override @@ -149,16 +136,7 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, numberOfWorkers); - } - @Override - public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - this.numberOfWorkers = numberOfWorkers; - if (!startupGuard.isOpen() && numberOfMessages > 0) { - LOG.info("Queue {} will be started with {} messages already on the queue.", queueGroupName, numberOfMessages); - loadMetrics.register(numberOfMessages, numberOfWorkers); - } } @Override @@ -166,8 +144,6 @@ public void reset() { dispatchedTasks.clear(); dispatchedQueueMetrics.entrySet().forEach(e -> e.getValue().process()); dispatchedWorkerMetrics.process(); - // work metrics not needed to be reset because they are about work already done. - loadMetrics.reset(); } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { @@ -180,7 +156,6 @@ private synchronized void update() { metrics(DISPATCH, dispatchedQueueCountGauge, dispatchedQueueWaitGauge, queueGroupName, dispatchedWorkerMetrics); metrics(WORK, workQueueMetrics, workWorkerCountGauge, workWorkerDurationGauge); metrics(WORK, workQueueCountGauge, workQueueDurationGauge, queueGroupName, workWorkerMetrics); - workLoad(); } catch (final RuntimeException e) { LOG.error("Update metrics failed.", e); } @@ -204,13 +179,4 @@ private static void metrics(final String prefixText, final DoubleGauge gauge, fi LOG.debug("{} for {}: {} ms/task (#tasks: {})", prefixText, name, metric.avgDuration(), count); } } - - private void workLoad() { - if (startupGuard.isOpen()) { - final double load = loadMetrics.process(); - - loadGauge.set(load, workerAttributes); - LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load)); - } - } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java new file mode 100644 index 0000000..24654c6 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/RabbitMQUsageMetricsProvider.java @@ -0,0 +1,65 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; + +public class RabbitMQUsageMetricsProvider implements WorkerSizeObserver, UsageMetricsProvider { + + private final String workerQueueName; + + private int numberOfWorkers; + private int numberOfMessages; + private int numberOfMessagesInProgress; + + public RabbitMQUsageMetricsProvider(final String workerQueueName) { + this.workerQueueName = workerQueueName; + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + this.numberOfWorkers = numberOfWorkers; + this.numberOfMessages = numberOfMessages; + this.numberOfMessagesInProgress = numberOfMessagesInProgress; + } + + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + @Override + public int getNumberOfWorkers() { + return numberOfWorkers; + } + + @Override + public int getNumberOfUsedWorkers() { + return numberOfMessagesInProgress; + } + + @Override + public int getNumberOfFreeWorkers() { + return Math.max(0, numberOfWorkers - numberOfMessagesInProgress); + } + + @Override + public int getNumberOfWaiting() { + return Math.max(0, numberOfMessages - numberOfMessagesInProgress); + } + +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java new file mode 100644 index 0000000..4f0e6a8 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegister.java @@ -0,0 +1,73 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import nl.aerius.taskmanager.StartupGuard; +import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; + +/** + * This class provides the input for the {@link TaskManagerUsageMetricsProvider}. It will register updates on the amount of worker/workers from + * different sources. It also make sure the update will not be started before startup guard gives a green light. + * Note the startup guard doesn't have to be used in the {@link #onWorkDispatched(String, Map)} and {@link #onWorkerFinished(String, Map)} because + * these methods won't be called before the scheduling is started. + */ +public class TaskManagerMetricsRegister implements WorkerProducerHandler, WorkerSizeObserver, QueueWatchDogListener { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerMetricsRegister.class); + + private final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; + private final StartupGuard startupGuard; + + private int numberOfWorkers; + + public TaskManagerMetricsRegister(final TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider, final StartupGuard startupGuard) { + this.taskManagerUsageMetricsProvider = taskManagerUsageMetricsProvider; + this.startupGuard = startupGuard; + } + + @Override + public void onWorkDispatched(final String messageId, final Map messageMetaData) { + taskManagerUsageMetricsProvider.register(1, numberOfWorkers); + } + + @Override + public void onWorkerFinished(final String messageId, final Map messageMetaData) { + taskManagerUsageMetricsProvider.register(-1, numberOfWorkers); + } + + @Override + public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { + this.numberOfWorkers = numberOfWorkers; + if (!startupGuard.isOpen() && numberOfMessages > 0) { + LOG.info("Queue {} will be started with {} messages already on the queue.", taskManagerUsageMetricsProvider.getWorkerQueueName(), + numberOfMessages); + taskManagerUsageMetricsProvider.register(numberOfMessages, numberOfWorkers); + } + } + + @Override + public void reset() { + taskManagerUsageMetricsProvider.reset(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java new file mode 100644 index 0000000..1f2cb9e --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProvider.java @@ -0,0 +1,81 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.function.ToDoubleBiFunction; + +/** + * The {@link UsageMetricsProvider} for weighted load. limit and usage metrics for worker queues. + * The values are calculated averages over the time between the last measurement point and the moment the metric value is requested. + */ +public class TaskManagerUsageMetricsProvider implements UsageMetricsProvider { + + private static final ToDoubleBiFunction LOAD_SUM_FUNCTION = (total, totalMeasureTime) -> (total * 100.0) / totalMeasureTime; + private static final ToDoubleBiFunction COUNT_SUM_FUNCTION = (total, totalMeasureTime) -> Math.floor(total / totalMeasureTime); + + private final String workerQueueName; + private final LoadMetric load; + private final LoadMetric limit; + private final LoadMetric used; + private final LoadMetric free; + + public TaskManagerUsageMetricsProvider(final String workerQueueName) { + this.workerQueueName = workerQueueName; + load = new LoadMetric((numberOfWorkers, usedWorkers) -> (numberOfWorkers > 0 ? (usedWorkers / (double) numberOfWorkers) : 0), LOAD_SUM_FUNCTION); + limit = new LoadMetric((numberOfWorkers, usedWorkers) -> numberOfWorkers, COUNT_SUM_FUNCTION); + used = new LoadMetric((numberOfWorkers, usedWorkers) -> usedWorkers, COUNT_SUM_FUNCTION); + free = new LoadMetric((numberOfWorkers, usedWorkers) -> numberOfWorkers - usedWorkers, COUNT_SUM_FUNCTION); + } + + public synchronized void register(final int deltaUsedWorkers, final int numberOfWorkers) { + load.register(deltaUsedWorkers, numberOfWorkers); + limit.register(deltaUsedWorkers, numberOfWorkers); + used.register(deltaUsedWorkers, numberOfWorkers); + free.register(deltaUsedWorkers, numberOfWorkers); + } + + @Override + public String getWorkerQueueName() { + return workerQueueName; + } + + public double getLoad() { + return load.process(); + } + + @Override + public int getNumberOfWorkers() { + return (int) Math.max(0, limit.process()); + } + + @Override + public int getNumberOfUsedWorkers() { + return (int) Math.max(0, used.process()); + } + + @Override + public int getNumberOfFreeWorkers() { + return (int) Math.max(0, free.process()); + } + + public void reset() { + load.reset(); + limit.reset(); + used.reset(); + free.reset(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java new file mode 100644 index 0000000..c864947 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsWrapper.java @@ -0,0 +1,86 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import io.opentelemetry.api.metrics.Meter; + +/** + * Class that wraps all Task Manager usage metrics reporters. TaskSchedulerBuckets should add the metric providers to this class. + * Each of those providers added is for a specific worker queue. The {@link UsageMetricsWrapper} will manager metrics per worker queue. + */ +public class TaskManagerUsageMetricsWrapper { + + private final UsageMetricsWrapper rabbitMQUsageMetrics; + private final UsageMetricsWrapper workerPoolUsageMetrics; + private final UsageMetricsWrapper taskManagerUsageMetrics; + private final UsageMetricsReporter loadUsageMetricsReporter; + + public TaskManagerUsageMetricsWrapper(final Meter meter) { + rabbitMQUsageMetrics = new UsageMetricsWrapper(meter, "aer.rabbitmq", true); + workerPoolUsageMetrics = new UsageMetricsWrapper(meter, "aer.taskmanager.workerpool", false); + taskManagerUsageMetrics = new UsageMetricsWrapper(meter, "aer.taskmanager", false); + loadUsageMetricsReporter = new UsageMetricsReporter(meter, "aer.taskmanager.worker.load", ""); + } + + /** + * Add the usage metric provider that registers metrics on the queue state of a worker from the RabbitMQ queue information. + * + * @param provider The provider that registers the RabbitMQ metrics. + */ + public void addRabbitMQUsageMetricsProvider(final UsageMetricsProvider provider) { + rabbitMQUsageMetrics.add(provider); + } + + /** + * Add the usage metric provider that registers metrics on the queue state in the internal worker pool. + * + * @param provider The provider that registers the worker pool metrics. + */ + public void addWorkerPoolUsageMetricsProvider(final UsageMetricsProvider provider) { + workerPoolUsageMetrics.add(provider); + } + + /** + * Add the usage metric provider that registers metrics on the usage state of a worker from the information collected by the Task Manager. + * + * @param provider The {@link TaskManagerUsageMetricsProvider}. + */ + public void addTaskManagerUsageMetricsProvider(final TaskManagerUsageMetricsProvider provider) { + taskManagerUsageMetrics.add(provider); + loadUsageMetricsReporter.addMetrics(provider.getWorkerQueueName(), provider::getLoad, + OpenTelemetryMetrics.workerAttributes(provider.getWorkerQueueName())); + } + + /** + * Remove metrics reporting for the given worker queue. + * + * @param workerQueueName worker queue to remove the metric reporting for + */ + public void remove(final String workerQueueName) { + rabbitMQUsageMetrics.remove(workerQueueName); + workerPoolUsageMetrics.remove(workerQueueName); + taskManagerUsageMetrics.remove(workerQueueName); + loadUsageMetricsReporter.removeMetrics(workerQueueName); + } + + public void close() { + rabbitMQUsageMetrics.close(); + workerPoolUsageMetrics.close(); + taskManagerUsageMetrics.close(); + loadUsageMetricsReporter.close(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java new file mode 100644 index 0000000..a7e5c4c --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsProvider.java @@ -0,0 +1,51 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +/** + * Interface for providers that collect usage metrics. Each implementation can use a different way or algorithm to calculate the metrics. + * This to provide insights in different areas to be able to identify possible issues with the Task Manager performance. + */ +public interface UsageMetricsProvider { + + /** + * @return The name of the worker queue these metrics are for + */ + String getWorkerQueueName(); + + /** + * @return The total number of workers available + */ + int getNumberOfWorkers(); + + /** + * @return The number of workers that are in use + */ + int getNumberOfUsedWorkers(); + + /** + * @return The number of workers not being used + */ + int getNumberOfFreeWorkers(); + + /** + * @return The number of tasks that are waiting to be processed. The backlog for the worker + */ + default int getNumberOfWaiting() { + return 0; + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java new file mode 100644 index 0000000..b6241ee --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsReporter.java @@ -0,0 +1,84 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.DoubleSupplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; + +/** + * Class that creates the Telemetry Gauge that uses the callback to record metrics when triggered. + * When triggered it will iterate over the list of added {@link UsageMetric}s. + */ +class UsageMetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(UsageMetricsReporter.class); + + private record UsageMetric(DoubleSupplier metricSupplier, Attributes attributes) {} + + private final Map metricsMap = new HashMap<>(); + private final ObservableDoubleGauge gauge; + + public UsageMetricsReporter(final Meter meter, final String metricName, final String description) { + gauge = meter + .gaugeBuilder(metricName) + .setDescription(description) + .buildWithCallback(this::recordMetrics); + } + + private void recordMetrics(final ObservableDoubleMeasurement measurement) { + for (final Map.Entry entry : metricsMap.entrySet()) { + final UsageMetric metric = entry.getValue(); + + measurement.record(metric.metricSupplier().getAsDouble(), metric.attributes()); + } + LOG.debug("Workload for {}", measurement); + } + + /** + * Add a metric supplier for a specific worker queue/attributes. + * + * @param workerQueueName The worker queue this metric supplier is for + * @param metricSupplier Supplies the metric value when called + * @param attributes attributes for the metric + */ + public void addMetrics(final String workerQueueName, final DoubleSupplier metricSupplier, final Attributes attributes) { + metricsMap.put(workerQueueName, new UsageMetric(metricSupplier, attributes)); + } + + /** + * Removes the metric reporter for the given worker queue to not report the metric anymore. + * + * @param workerQueueName Worker queue to remove the metric reporter for + */ + public void removeMetrics(final String workerQueueName) { + metricsMap.remove(workerQueueName); + } + + public void close() { + gauge.close(); + metricsMap.clear(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java new file mode 100644 index 0000000..a3ee3ef --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/UsageMetricsWrapper.java @@ -0,0 +1,60 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import io.opentelemetry.api.metrics.Meter; + +/** + * Wraps several metrics to be reported on. + * The metrics supported are limit (i.e. number of workers available). + * + */ +class UsageMetricsWrapper { + private final boolean hasWaiting; + private final UsageMetricsReporter limitReporter; + private final UsageMetricsReporter usageReporter; + + public UsageMetricsWrapper(final Meter meter, final String metricPrefix, final boolean hasWaiting) { + this.hasWaiting = hasWaiting; + limitReporter = new UsageMetricsReporter(meter, metricPrefix + ".worker.limit", ""); + usageReporter = new UsageMetricsReporter(meter, metricPrefix + ".worker.usage", ""); + } + + public void add(final UsageMetricsProvider provider) { + final String workerQueueName = provider.getWorkerQueueName(); + + limitReporter.addMetrics(workerQueueName, provider::getNumberOfWorkers, OpenTelemetryMetrics.workerAttributes(workerQueueName)); + usageReporter.addMetrics(workerQueueName, provider::getNumberOfUsedWorkers, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "used")); + usageReporter.addMetrics(workerQueueName, provider::getNumberOfFreeWorkers, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "free")); + if (hasWaiting) { + usageReporter.addMetrics(workerQueueName, provider::getNumberOfWaiting, + OpenTelemetryMetrics.workerAttributes(workerQueueName, "state", "waiting")); + } + } + + public void remove(final String workerQueueName) { + limitReporter.removeMetrics(workerQueueName); + usageReporter.removeMetrics(workerQueueName); + } + + public void close() { + limitReporter.close(); + usageReporter.close(); + } +} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java index 2611a03..3cf1ed3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitor.java @@ -56,6 +56,7 @@ public class RabbitMQQueueMonitor { private static final Logger LOG = LoggerFactory.getLogger(RabbitMQQueueMonitor.class); + private static final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(3); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -117,8 +118,9 @@ public void updateWorkerQueueState(final String queueName, final WorkerSizeObser } else { final int numberOfWorkers = getJsonIntPrimitive(jsonObject, "consumers"); final int numberOfMessages = getJsonIntPrimitive(jsonObject, "messages"); + final int numberOfMessagesInProgress = getJsonIntPrimitive(jsonObject, "messages_unacknowledged"); - observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, numberOfMessagesInProgress); LOG.trace("[{}] active workers:{}", queueName, numberOfWorkers); } } catch (final URISyntaxException | IOException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java index e177c41..e2e0d8e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java @@ -111,7 +111,7 @@ private void updateMetrics() { try { metrics.forEach((q, wpm) -> { final int size = wpm.getReportedWorkerSize(); - final int utilisation = wpm.getRunningWorkerSize(); + final int utilisation = wpm.getNumberOfUsedWorkers(); try { publish(q, size, utilisation); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 0aa5a16..8027717 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -178,10 +178,10 @@ public void add(final WorkerSizeObserver observer) { } @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) { for (final WorkerSizeObserver observer : observers) { try { - observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, numberOfMessagesInProgress); } catch (final RuntimeException e) { LOG.error("RuntimeException during onNumberOfWorkersUpdate in {}", observer.getClass(), e); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/GroupedPriorityQueue.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/GroupedPriorityQueue.java index e457801..b586f0d 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/GroupedPriorityQueue.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/GroupedPriorityQueue.java @@ -77,6 +77,10 @@ public synchronized Task poll() { return task; } + public synchronized int getGroupSize() { + return groupedQueue.entrySet().stream().mapToInt(e -> e.getValue().size()).sum(); + } + @Override public Stream stream() { return queue.stream(); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 2ba755a..1611086 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -187,17 +187,20 @@ public void onWorkerPoolSizeChange(final int numberOfWorkers) { } @Override - public void updateQueue(final PriorityTaskQueue queue) { + public void updateQueue(final PriorityTaskQueue priorityTaskQueue) { lock.lock(); try { - final String queueName = queue.getQueueName(); + final String queueName = priorityTaskQueue.getQueueName(); if (!priorityQueueMap.containsKey(queueName)) { metrics.addMetric(() -> priorityQueueMap.onWorkerByQueue(queueName), workerQueueName, queueName); + if (this.queue instanceof final GroupedPriorityQueue gpq) { + metrics.addMetricWaiting(() -> gpq.getGroupSize(), workerQueueName, queueName); + } } - final PriorityTaskQueue old = priorityQueueMap.put(queueName, queue); + final PriorityTaskQueue old = priorityQueueMap.put(queueName, priorityTaskQueue); - if (old != null && !old.equals(queue)) { - LOG.info("Queue {} was updated with new values: {}", queueName, queue); + if (old != null && !old.equals(priorityTaskQueue)) { + LOG.info("Queue {} was updated with new values: {}", queueName, priorityTaskQueue); } } finally { lock.unlock(); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java index 9f3f945..f8d4ab5 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerMetrics.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.function.IntSupplier; import io.opentelemetry.api.metrics.ObservableDoubleGauge; @@ -29,10 +30,17 @@ */ class PriorityTaskSchedulerMetrics { - private static final String METRIC_PREFIX = "aer.taskmanager.running_client_size"; + /** + * @deprecated replaced by "aer.taskmanager.client.queue" metric. + */ + @Deprecated + private static final String METRIC_PREFIX_LEGACY = "aer.taskmanager.running_client_size"; + private static final String METRIC_PREFIX = "aer.taskmanager.client.queue"; private static final String DESCRIPTION = "Number of tasks running for the client queue "; private final Map metrics = new HashMap<>(); + private final Map usageMetrics = new HashMap<>(); + private final Map waitingMetrics = new HashMap<>(); /** * Adds collecting metrics to count the number of active tasks by client queue. @@ -43,10 +51,32 @@ class PriorityTaskSchedulerMetrics { */ public void addMetric(final IntSupplier countSupplier, final String workerQueueName, final String clientQueueName) { metrics.put(clientQueueName, OpenTelemetryMetrics.METER + .gaugeBuilder(METRIC_PREFIX_LEGACY) + .setDescription(DESCRIPTION + clientQueueName) + .buildWithCallback( + result -> result.record(countSupplier.getAsInt(), + OpenTelemetryMetrics.queueAttributes(workerQueueName, clientQueueName, "state", "used")))); + metrics.put(clientQueueName, OpenTelemetryMetrics.METER + .gaugeBuilder(METRIC_PREFIX) + .setDescription(DESCRIPTION + clientQueueName) + .buildWithCallback( + result -> result.record(countSupplier.getAsInt(), + OpenTelemetryMetrics.queueAttributes(workerQueueName, clientQueueName, "state", "used")))); + } + + /** + * + * @param countSupplier + * @param workerQueueName + * @param clientQueueName + */ + public void addMetricWaiting(final IntSupplier countSupplier, final String workerQueueName, final String clientQueueName) { + waitingMetrics.put(clientQueueName, OpenTelemetryMetrics.METER .gaugeBuilder(METRIC_PREFIX) - .setDescription(DESCRIPTION) + .setDescription(DESCRIPTION + clientQueueName) .buildWithCallback( - result -> result.record(countSupplier.getAsInt(), OpenTelemetryMetrics.queueAttributes(workerQueueName, clientQueueName)))); + result -> result.record(countSupplier.getAsInt(), + OpenTelemetryMetrics.queueAttributes(workerQueueName, clientQueueName, "state", "waiting")))); } /** @@ -55,8 +85,8 @@ public void addMetric(final IntSupplier countSupplier, final String workerQueueN * @param clienQueueName */ public void removeMetric(final String clienQueueName) { - if (metrics.containsKey(clienQueueName)) { - metrics.remove(clienQueueName).close(); - } + Optional.ofNullable(metrics.remove(clienQueueName)).ifPresent(ObservableDoubleGauge::close); + Optional.ofNullable(usageMetrics.remove(clienQueueName)).ifPresent(ObservableDoubleGauge::close); + Optional.ofNullable(waitingMetrics.remove(clienQueueName)).ifPresent(ObservableDoubleGauge::close); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index 1974a67..88bff47 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -55,13 +55,13 @@ protected LocalDateTime now() { IntStream.range(0, runningWorkers).forEach(i -> qwd.onWorkDispatched(String.valueOf(i), null)); IntStream.range(0, finishedWorkers).forEach(i -> qwd.onWorkerFinished(String.valueOf(i), null)); - qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); // reset should never trigger the first time the problem was reported. verify(listener, never()).reset(); // Fast forward 20 minutes to trigger reset if there is a problem. now.set(now.get().plusMinutes(20)); - qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + qwd.onNumberOfWorkersUpdate(0, numberOfMessages, 0); verify(listener, times(expected)).reset(); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java index 73397c7..9c42e15 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/StartupGuardTest.java @@ -35,9 +35,9 @@ void testOpen() { final StartupGuard guard = new StartupGuard(); assertFalse(guard.isOpen(), "Guard should not be open."); - guard.onNumberOfWorkersUpdate(0, 1); + guard.onNumberOfWorkersUpdate(0, 1, 0); assertTrue(guard.isOpen(), "Guard should be open when onNumberOfWorkersUpdate is called."); - guard.onNumberOfWorkersUpdate(0, 1); + guard.onNumberOfWorkersUpdate(0, 1, 0); assertTrue(guard.isOpen(), "Guard should still remain open onNumberOfWorkersUpdate has been called."); } @@ -60,7 +60,7 @@ void testWaitForOpen() throws InterruptedException { // First wait for first semaphore to be unlocked. waitForStart.acquire(); assertFalse(guard.isOpen(), "Guard should not be open."); - guard.onNumberOfWorkersUpdate(1, 1); + guard.onNumberOfWorkersUpdate(1, 1, 0); // Wait for semaphore that is called after waitForOpen is unlocked. waitForOpen.acquire(); assertTrue(guard.isOpen(), "Guard should now be open."); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index 79ea691..7555254 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -80,17 +80,17 @@ void after() throws InterruptedException { @Timeout(value = 3, unit = TimeUnit.SECONDS) void testNoFreeWorkers() { // Add Worker which will unlock - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); executor.execute(dispatcher); await().until(() -> dispatcher.getState() == State.WAIT_FOR_TASK); // Remove worker, 1 worker locked but at this point no actual workers available. - workerPool.onNumberOfWorkersUpdate(0, 0); + workerPool.onNumberOfWorkersUpdate(0, 0, 0); // Send task, should get NoFreeWorkersException in dispatcher. forwardTaskAsync(createTask(), null); // Dispatcher should go back to wait for worker to become available. await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); assertEquals(0, workerPool.getReportedWorkerSize(), "WorkerPool should be empty"); - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); assertEquals(1, workerPool.getReportedWorkerSize(), "WorkerPool should have 1 running"); } @@ -100,7 +100,7 @@ void testForwardTest() { final Task task = createTask(); final Future future = forwardTaskAsync(task, null); executor.execute(dispatcher); - workerPool.onNumberOfWorkersUpdate(1, 0); //add worker which will unlock + workerPool.onNumberOfWorkersUpdate(1, 0, 0); //add worker which will unlock await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); await().until(future::isDone); assertFalse(future.isCancelled(), "Taskconsumer must be unlocked at this point without error"); @@ -114,7 +114,7 @@ void testForwardDuplicateTask() { executor.execute(dispatcher); final Future future = forwardTaskAsync(task, null); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); - workerPool.onNumberOfWorkersUpdate(2, 0); //add worker which will unlock + workerPool.onNumberOfWorkersUpdate(2, 0, 0); //add worker which will unlock // Now force the issue. assertSame(TaskDispatcher.State.WAIT_FOR_TASK, dispatcher.getState(), "Taskdispatcher must be waiting for task"); // Forwarding same Task object, so same id. @@ -134,19 +134,19 @@ void testExceptionDuringForward() { final Future future = forwardTaskAsync(task, null); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); // Now open up a worker - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); // At this point the exception should be thrown. This could be the case when rabbitmq connection is lost for a second. // Wait for it to be unlocked again await().until(() -> dispatcher.getState() == State.WAIT_FOR_TASK); //simulate workerpool being reset - workerPool.onNumberOfWorkersUpdate(0, 0); + workerPool.onNumberOfWorkersUpdate(0, 0, 0); //now stop throwing exception to indicate connection is restored again workerProducer.setShutdownExceptionOnForward(false); //simulate connection being restored by first forwarding task again forwardTaskAsync(task, future); await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); //now simulate the worker being back - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); //should now be unlocked, but waiting for worker to be done await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); workerPool.onWorkerFinished(task.getId(), Map.of()); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java index 59bb280..b85eb29 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskManagerTest.java @@ -17,6 +17,8 @@ package nl.aerius.taskmanager; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -64,7 +66,7 @@ void setUp() throws IOException { doAnswer(a -> { // This will unblock the startup guard - ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0); + ((WorkerSizeObserver) a.getArgument(1)).onNumberOfWorkersUpdate(0, 0, 0); return null; }).when(workerSizeProvider).addObserver(any(), any()); taskManager = new TaskManager<>(executor, scheduledExecutorService, factory, schedulerFactory, workerSizeProvider); @@ -91,8 +93,13 @@ void testAddScheduler() throws InterruptedException { @Timeout(value = 2, unit = TimeUnit.SECONDS) void testModifyQueue() throws InterruptedException { updateTaskScheduler(); - schedule.getQueues().get(0).setPriority(30); + final PriorityTaskQueue queue = schedule.getQueues().get(0); + + assertTrue(taskManager.getTaskScheduleBucket(schedule.getWorkerQueueName()).hasTaskConsumer(queue.getQueueName()), "Queue should be present"); + queue.setPriority(30); updateTaskScheduler(); + assertTrue(taskManager.getTaskScheduleBucket(schedule.getWorkerQueueName()).hasTaskConsumer(queue.getQueueName()), + "Queue should be still present"); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } @@ -100,14 +107,18 @@ void testModifyQueue() throws InterruptedException { @Timeout(value = 2, unit = TimeUnit.SECONDS) void testRemoveQueue() throws InterruptedException { updateTaskScheduler(); - schedule.getQueues().remove(0); + assertTrue(taskManager.getTaskScheduleBucket(schedule.getWorkerQueueName()).hasTaskConsumer(schedule.getQueues().get(0).getQueueName()), + "Queue should be present"); + final PriorityTaskQueue queue = schedule.getQueues().remove(0); updateTaskScheduler(); + assertFalse(taskManager.getTaskScheduleBucket(schedule.getWorkerQueueName()).hasTaskConsumer(queue.getQueueName()), + "Queue should have been removed"); taskManager.removeTaskScheduler(schedule.getWorkerQueueName()); } private void updateTaskScheduler() throws InterruptedException { taskManager.updateTaskScheduler(schedule); - while(!taskManager.isRunning(schedule.getWorkerQueueName())) { + while (!taskManager.isRunning(schedule.getWorkerQueueName())) { Thread.sleep(300); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 0a205fd..30554f2 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -76,7 +76,7 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { assertEquals(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); - workerPool.onNumberOfWorkersUpdate(10, 0); + workerPool.onNumberOfWorkersUpdate(10, 0, 0); assertEquals(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); @@ -89,15 +89,15 @@ void testWorkerPoolSizing() throws IOException { @Test void testWorkerPoolSizingWithInitialSize() throws IOException { - workerPool.onNumberOfWorkersUpdate(10, 5); - assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is 5"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should match reported number of workers"); - workerPool.onNumberOfWorkersUpdate(10, 5); - assertEquals(5, workerPool.getRunningWorkerSize(), "Check if workerPool size is still 5"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5, 0); + assertEquals(5, workerPool.getNumberOfUsedWorkers(), "Check if workerPool size is 5"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should match reported number of workers"); + workerPool.onNumberOfWorkersUpdate(10, 5, 0); + assertEquals(5, workerPool.getNumberOfUsedWorkers(), "Check if workerPool size is still 5"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should still match reported number of workers"); IntStream.range(1, 6).forEach(a -> workerPool.onWorkerFinished("", null)); - assertEquals(0, workerPool.getRunningWorkerSize(), "After unknown tasks received running size should be 0"); - assertEquals(10, workerPool.getWorkerSize(), "Internal worker size should still match reported number of workers"); + assertEquals(0, workerPool.getNumberOfUsedWorkers(), "After unknown tasks received running size should be 0"); + assertEquals(10, workerPool.getNumberOfWorkers(), "Internal worker size should still match reported number of workers"); } @Test @@ -108,26 +108,26 @@ void testNoFreeWorkers() { @Test void testWorkerPoolScaleDown() throws IOException { - workerPool.onNumberOfWorkersUpdate(5, 0); + workerPool.onNumberOfWorkersUpdate(5, 0, 0); final Task task1 = createAndSendTaskToWorker(); final Task task2 = createAndSendTaskToWorker(); final Task task3 = createAndSendTaskToWorker(); assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); - workerPool.onNumberOfWorkersUpdate(1, 0); - assertEquals(3, workerPool.getWorkerSize(), + workerPool.onNumberOfWorkersUpdate(1, 0, 0); + assertEquals(3, workerPool.getNumberOfWorkers(), "Workpool size should match number of running tasks, since new total is lower than currently running"); assertEquals(1, workerPool.getReportedWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); workerPool.releaseWorker(task1.getId()); - assertEquals(2, workerPool.getWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); + assertEquals(2, workerPool.getNumberOfWorkers(), "Check if workerPool size is lower, but not yet same as total because still process running"); workerPool.releaseWorker(task2.getId()); - assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); + assertEquals(1, workerPool.getNumberOfWorkers(), "Check if workerPool size is lower"); workerPool.releaseWorker(task3.getId()); - assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); + assertEquals(1, workerPool.getNumberOfWorkers(), "Check if workerPool size should remain the same"); } @Test void testReleaseTaskTwice() throws IOException { - workerPool.onNumberOfWorkersUpdate(2, 0); + workerPool.onNumberOfWorkersUpdate(2, 0, 0); final Task task1 = createAndSendTaskToWorker(); final String id = task1.getId(); workerPool.releaseWorker(id); @@ -140,19 +140,19 @@ void testReleaseTaskTwice() throws IOException { @Test void testMessageDeliverd() throws IOException { - workerPool.onNumberOfWorkersUpdate(1, 0); + workerPool.onNumberOfWorkersUpdate(1, 0, 0); createAndSendTaskToWorker(); assertNotSame(0, message.getDeliveryTag(), "Check if message is delivered"); } @Test void testReset() throws IOException { - workerPool.onNumberOfWorkersUpdate(5, 0); + workerPool.onNumberOfWorkersUpdate(5, 0, 0); createAndSendTaskToWorker(); createAndSendTaskToWorker(); - assertEquals(2, workerPool.getRunningWorkerSize(), "Should report 2 workers running."); + assertEquals(2, workerPool.getNumberOfUsedWorkers(), "Should report 2 workers running."); workerPool.reset(); - assertEquals(0, workerPool.getRunningWorkerSize(), "Should report no workers running after internal state reset."); + assertEquals(0, workerPool.getNumberOfUsedWorkers(), "Should report no workers running after internal state reset."); } private Task createAndSendTaskToWorker() throws IOException { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index d30590b..c2e4c52 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -16,7 +16,6 @@ */ package nl.aerius.taskmanager.metrics; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -25,7 +24,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.HashMap; @@ -46,7 +44,6 @@ import io.opentelemetry.api.metrics.DoubleGaugeBuilder; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.StartupGuard; import nl.aerius.taskmanager.client.TaskMetrics; /** @@ -65,9 +62,9 @@ class PerformanceMetricsReporterTest { private @Mock Meter mockedMeter; private @Mock ScheduledExecutorService scheduledExecutorService; private @Captor ArgumentCaptor methodCaptor; - private @Captor ArgumentCaptor durationCaptor; + private @Captor ArgumentCaptor doubleValue1Captor; + private @Captor ArgumentCaptor doubleValue2Captor; - private StartupGuard startupGuard; private PerformanceMetricsReporter reporter; @BeforeEach @@ -82,15 +79,14 @@ void beforeEach() { return mockGaugeBuilder; }).when(mockedMeter).gaugeBuilder(any()); lenient().doReturn(mockGaugeBuilder).when(mockGaugeBuilder).setDescription(any()); - startupGuard = new StartupGuard(); - reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter, startupGuard); - verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); + reporter = new PerformanceMetricsReporter(scheduledExecutorService, QUEUE_GROUP_NAME, mockedMeter); + verify(scheduledExecutorService).scheduleWithFixedDelay(methodCaptor.capture(), anyLong(), anyLong(), + any(TimeUnit.class)); } @Test void testOnWorkDispatched() { - startUp(10, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -99,7 +95,6 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - startUp(2, 0); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); @@ -109,28 +104,16 @@ void testOnWorkerFinished() { private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) { verify(mockedGauges.get("aer.taskmanager." + label)).set(eq(expected), any()); - verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(durationCaptor.capture(), any()); + verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(doubleValue1Captor.capture(), any()); verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any()); - verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any()); - durationCaptor.getAllValues() - .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); - } - - @Test - void testWorkLoad() throws InterruptedException { - startUp(4, 1); - reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); - reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); - methodCaptor.getValue().run(); - Thread.sleep(10); // Add a bit of delay to get some time frame between these 2 run calls. - methodCaptor.getValue().run(); - verify(mockedGauges.get("aer.taskmanager.work.load"), times(2)).set(durationCaptor.capture(), any()); - assertEquals(75.0, durationCaptor.getAllValues().get(1), "Expected workload of 75%"); + verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(doubleValue1Captor.capture(), + any()); + doubleValue1Captor.getAllValues() + .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); } @Test void testReset() throws InterruptedException { - startUp(4, 1); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. @@ -138,15 +121,6 @@ void testReset() throws InterruptedException { methodCaptor.getValue().run(); // Verify dispatched metrics have been reset. assertGaugeCalls("dispatched", "wait", 0.0, v -> v == 0.0); - - // Verify load metric have been reset. - verify(mockedGauges.get("aer.taskmanager.work.load"), times(1)).set(durationCaptor.capture(), any()); - assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore"); - } - - private void startUp(final int numberOfWorkers, final int numberOfMessages) { - reporter.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); - startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); } private Map createMap(final String queueName, final long duration) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java new file mode 100644 index 0000000..4a81d4e --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerMetricsRegisterTest.java @@ -0,0 +1,103 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import nl.aerius.taskmanager.StartupGuard; +import nl.aerius.taskmanager.client.TaskMetrics; + +/** + * Test class for {@link TaskManagerMetricsRegister}. + */ +@ExtendWith(MockitoExtension.class) +public class TaskManagerMetricsRegisterTest { + + private static final String QUEUE_1 = "queue 1"; + private static final String QUEUE_2 = "queue 2"; + + private @Mock TaskManagerUsageMetricsProvider taskManagerUsageMetricsProvider; + + private @Captor ArgumentCaptor taskManagerUsagerMetricsProviderCaptor; + + private StartupGuard startupGuard; + private TaskManagerMetricsRegister register; + + @BeforeEach + void beforeEach() { + startupGuard = new StartupGuard(); + register = new TaskManagerMetricsRegister(taskManagerUsageMetricsProvider, startupGuard); + } + + @Test + void testOnWorkDispatched() { + startUp(10, 0); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + verifytaskManagerUsageMetricsProvider(2, 2); + } + + @Test + void testOnWorkerFinished() { + startUp(2, 0); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkerFinished("1", createMap(QUEUE_1, 100L)); + register.onWorkerFinished("2", createMap(QUEUE_2, 200L)); + verifytaskManagerUsageMetricsProvider(3, -1); + } + + @Test + void testReset() throws InterruptedException { + startUp(4, 1); + register.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + register.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. + register.reset(); + + // Verify load metric have been reset. + verify(taskManagerUsageMetricsProvider, times(1)).reset(); + } + + private void verifytaskManagerUsageMetricsProvider(final int times, final int sum) { + verify(taskManagerUsageMetricsProvider, times(times)).register(taskManagerUsagerMetricsProviderCaptor.capture(), + anyInt()); + assertEquals(sum, taskManagerUsagerMetricsProviderCaptor.getAllValues().stream().mapToInt(Integer::intValue).sum(), + "Should have registered a total sum of " + sum); + } + + private void startUp(final int numberOfWorkers, final int numberOfMessages) { + register.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, 0); + startupGuard.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages, 0); + } + + private Map createMap(final String queueName, final long duration) { + return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java new file mode 100644 index 0000000..6b033d6 --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/TaskManagerUsageMetricsProviderTest.java @@ -0,0 +1,75 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.function.DoubleSupplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test class for {@link TaskManagerUsageMetricsProvider}. + */ +class TaskManagerUsageMetricsProviderTest { + + private TaskManagerUsageMetricsProvider provider; + + @BeforeEach + void beforeEach() { + provider = new TaskManagerUsageMetricsProvider("TEST"); + } + + @Test + void testLoad() throws InterruptedException { + assertMetricAndZero(5, 10, provider::getLoad, 50, "Expected a load of 50%.", 0); + } + + @Test + void testNumberOfWorkers() throws InterruptedException { + assertMetricAndZero(4, 8, provider::getNumberOfWorkers, 8, "Expected 8 workers total.", 8); + } + + @Test + void testNumberOfUsedWorkers() throws InterruptedException { + assertMetricAndZero(4, 10, provider::getNumberOfUsedWorkers, 4, "Expected 4 worker being used.", 0); + } + + @Test + void testNumberOfFreeWorkers() throws InterruptedException { + assertMetricAndZero(3, 10, provider::getNumberOfFreeWorkers, 7, "Expected 7 worker to be free.", 10); + } + + private void assertMetricAndZero(final int numberOfUsed, final int numberOfWorkers, final DoubleSupplier supplier, final int expected, + final String description, final int expectedAfterReset) throws InterruptedException { + assertMetric(numberOfUsed, numberOfWorkers, supplier, expected, description, expectedAfterReset); + assertMetric(0, 0, supplier, 0, "0 workers is not correctly handled.", 0); + } + + private void assertMetric(final int numberOfUsed, final int numberOfWorkers, final DoubleSupplier supplier, final int expected, + final String description, final int expectedAfterReset) throws InterruptedException { + provider.register(numberOfUsed, numberOfWorkers); + supplier.getAsDouble(); + // Add a little delay to initalise the time + Thread.sleep(10); + assertEquals(expected, supplier.getAsDouble(), description); + provider.reset(); + Thread.sleep(10); + assertEquals(expectedAfterReset, supplier.getAsDouble(), "Not the expected number after a reset"); + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/UsageMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/UsageMetricsReporterTest.java new file mode 100644 index 0000000..8b222d3 --- /dev/null +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/UsageMetricsReporterTest.java @@ -0,0 +1,64 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.metrics; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; + +import java.util.function.Consumer; +import java.util.function.DoubleSupplier; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleGaugeBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; + +/** + * Test class for {@link UsageMetricsReporter}. + */ +@ExtendWith(MockitoExtension.class) +class UsageMetricsReporterTest { + + private @Mock Meter meter; + private @Mock DoubleGaugeBuilder builder; + private @Captor ArgumentCaptor> callbackCaptor; + private @Mock DoubleSupplier supplier; + private @Mock Attributes attributes; + private @Mock ObservableDoubleMeasurement measurement; + + @Test + void testRecord() { + doReturn(builder).when(meter).gaugeBuilder("TEST"); + doReturn(builder).when(builder).setDescription("description"); + doReturn(2.0).when(supplier).getAsDouble(); + + final UsageMetricsReporter reporter = new UsageMetricsReporter(meter, "TEST", "description"); + reporter.addMetrics("TEST", supplier, attributes); + verify(builder).buildWithCallback(callbackCaptor.capture()); + // Call the metric record callback method + callbackCaptor.getValue().accept(measurement); + // Verify the value 2 is recorded as metric with the provided attributes. + verify(measurement).record(2.0, attributes); + } +} diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java index a572901..e0ab641 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java @@ -16,7 +16,6 @@ */ package nl.aerius.taskmanager.mq; -import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -51,6 +50,7 @@ */ class RabbitMQChannelQueueEventsWatcherTest { private static final String TEST_QUEUENAME = "test"; + private static final String HEADER_PARAM_QUEUE = "queue"; private static ExecutorService executor; diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java index e82f254..4796d37 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java @@ -44,7 +44,7 @@ void testGetWorkerQueueState() { final ConnectionConfiguration configuration = ConnectionConfiguration.builder() .brokerHost(DUMMY).brokerPort(0).brokerUsername(DUMMY).brokerPassword(DUMMY).build(); final AtomicInteger workerSize = new AtomicInteger(); - final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages) -> workerSize.set(numberOfWorkers); + final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages, numberOfMessagesInProgress) -> workerSize.set(numberOfWorkers); final RabbitMQQueueMonitor rpm = new RabbitMQQueueMonitor(configuration) { @Override protected JsonNode getJsonResultFromApi(final String apiPath) throws IOException {