Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions doc/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,56 @@ The metric value `aerius.worker.<worker type>` for attribute `rabbitmq_queue` sh

The TaskManager defines a few custom metrics for OpenTelemetry to capture.
These metrics are all defined with the `nl.aerius.TaskManager` instrumentation scope.

| metric name | type | description |
|-----------------------------------------------------|-----------|----------------------------------------------------------------------|
| `aer.taskmanager.work.load`<sup>1</sup> | gauge | Percentage of workers occupied. |
| `aer.taskmanager.worker_size`<sup>1</sup> | gauge | The sum of idle workers + occupied workers. |
| `aer.taskmanager.current_worker_size`<sup>1</sup> | gauge | The number of workers based on what RabbitMQ reports. |
| `aer.taskmanager.running_worker_size`<sup>1</sup> | gauge | The number of workers that are occupied. |
| `aer.taskmanager.running_client_size`<sup>2</sup> | gauge | The number of workers that are occupied for a specific client queue. |
| `aer.taskmanager.dispatched`<sup>1</sup> | histogram | The number of tasks dispatched. |
| `aer.taskmanager.dispatched.wait`<sup>1</sup> | histogram | The average wait time of tasks dispatched. |
| `aer.taskmanager.dispatched.queue`<sup>2</sup> | histogram | The number of tasks dispatched per client queue. |
| `aer.taskmanager.dispatched.queue.wait`<sup>2</sup> | histogram | The average wait time of tasks dispatched per client queue. |
The metrics about availability of workers use the Open Telemetry standard for reporting such metrics.
Therefore `.limit` gives the total number of workers available.
And `.usage` gives the metrics about how the workers are used.
The usage metrics have an attribute `state` that identifies if the metric is for `used` or `free` amount of workers.
Additional `aer.rabbitmq.worker.usage` records a third state `waiting`.
And `aer.taskmanager.client.queue.usage` has states `used` and `waiting` that provides information on tasks send to the TaskManager.
This shows the number of tasks on the worker queue that are not yet picked up by any worker.


| metric name | type | description |
|-------------------------------------------------------|-----------|-------------------------------------------------------------------------|
| `aer.taskmanager.work.load`<sup>1</sup> | gauge | Percentage of workers occupied. |
| `aer.taskmanager.worker.limit`<sup>1</sup> | gauge | Weighted total number of workers based on tasks send to workers. |
| `aer.taskmanager.worker.usage`<sup>2</sup> | gauge | Weighted usage of workers based on tasks send to workers. |
| `aer.taskmanager.workerpool.worker.limit`<sup>1</sup> | gauge | TaskManager internal total number of workers. |
| `aer.taskmanager.workerpool.worker.usage`<sup>2</sup> | gauge | TaskManager internal usage of workers. |
| `aer.taskmanager.client.queue.usage`<sup>2</sup> | gauge | TaskManager internal metrics on client queue usage. |
| `aer.rabbitmq.worker.limit`<sup>1</sup> | gauge | Total number of workers available as reported by RabbitMQ |
| `aer.rabbitmq.worker.usage`<sup>2</sup> | gauge | Usage o the workers based on the messages on the RabbitMQ worker queue. |
| `aer.taskmanager.dispatched`<sup>1</sup> | histogram | The number of tasks dispatched. |
| `aer.taskmanager.dispatched.wait`<sup>1</sup> | histogram | The average wait time of tasks dispatched. |
| `aer.taskmanager.dispatched.queue`<sup>3</sup> | histogram | The number of tasks dispatched per client queue. |
| `aer.taskmanager.dispatched.queue.wait`<sup>3/sup> | histogram | The average wait time of tasks dispatched per client queue. |

Basically there are 3 metric groups that report similar information.
First the `aer.taskmanager.worker.*` metrics are a weighted value based on when tasks are send to the workers.
These metrics should give the most accurate value about usage of the workers.
Second the `aer.taskmanager.workerpool.worker.*` metrics are the internally used values to base scheduling priorities on.
These metrics should be close to the other metrics, but can be used to detect issues in case the TaskManager doesn't seem to operate as expected.
Third the `aer.rabbitmq.worker.*` metrics are the values as received from the RabbitMQ api.
These metrics could also be obtained by directly reading the RabbitMQ api, but specifically for the usage don't require additional logic to get the usage metrics.
In general these metrics should report the same values, but due to timing (e.g. the moment the measure is taken) there can be differences.

##### Deprecated metrics

The following metrics have been replaced by the more standardized naming mentioned above

| Metric name | type | description | Replaced by |
|-------------------------------------------------------|-----------|----------------------------------------------------------------------|--------------------------------------------|
| `aer.taskmanager.worker_size`<sup>1</sup> | gauge | The sum of idle workers + occupied workers. | `aer.taskmanager.workerpool.worker.limit` |
| `aer.taskmanager.current_worker_size`<sup>1</sup> | gauge | The number of workers based on what RabbitMQ reports. | `aer.rabbitmq.worker.limit` |
| `aer.taskmanager.running_worker_size`<sup>1</sup> | gauge | The number of workers that are occupied. | `aer.taskmanager.workerpool.worker..usage` |
| `aer.taskmanager.running_client_size`<sup>3</sup> | gauge | The number of workers that are occupied for a specific client queue. | `aer.taskmanager.client.queue.usage` |

##### Metric attributes

The workers have different attributes to distinguish specific metrics.
* <sup>1</sup> have attribute `worker_type`.
* <sup>2</sup> have attribute `worker_type` and `queue_name`.
* <sup>2</sup> have attribute `worker_type` and `state`. `state` can have the value `used`, `free` or `waiting`.
* <sup>3</sup> have attribute `worker_type` and `queue_name`.

`worker_type` is the type of worker, e.g. `ops`.
`queue_name` is the originating queue the task initially was put on, e.g. `...calculator_ui_small`.
Expand All @@ -103,6 +137,9 @@ The workers have different attributes to distinguish specific metrics.

### RabbitMQ metrics

Besides the RabbitMQ metrics reported by the TaskManager it is also possible to get the metrics from RabbitMQ directly.
To read those metrics the following metrics are available:

| metric name | type | description |
|-----------------------------------------------------|-------|--------------------------------------------------------------------|
| `rabbitmq_detailed_queue_messages` | gauge | Total number of messages on the queue, both picked up and waiting. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void onWorkerFinished(final String messageId, final Map<String, Object> m
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, int numberOfMessagesInProgress) {
if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) {
LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName);
listeners.forEach(QueueWatchDogListener::reset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

/**
* Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue.
* This to register any work that is still on the queuue and to properly calculate load metrics.
* Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics.
* This to register any work that is still on the queue and to properly calculate load metrics.
* Because the Task Manager is not aware of the tasks already on the queue and therefore otherwise these messages won't be counted in the metrics.
* This can result in the metrics being skewed, and thereby negatively reporting load metrics.
*/
public class StartupGuard implements WorkerSizeObserver {
Expand All @@ -47,7 +47,7 @@ public void waitForOpen() throws InterruptedException {
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) {
synchronized (openSemaphore) {
if (!open) {
open = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import nl.aerius.taskmanager.domain.TaskSchedule;
import nl.aerius.taskmanager.metrics.OpenTelemetryMetrics;
import nl.aerius.taskmanager.metrics.PerformanceMetricsReporter;
import nl.aerius.taskmanager.metrics.RabbitMQUsageMetricsProvider;
import nl.aerius.taskmanager.metrics.TaskManagerMetricsRegister;
import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsProvider;
import nl.aerius.taskmanager.metrics.TaskManagerUsageMetricsWrapper;
import nl.aerius.taskmanager.scheduler.TaskScheduler;
import nl.aerius.taskmanager.scheduler.TaskScheduler.TaskSchedulerFactory;

Expand All @@ -58,6 +62,7 @@ class TaskManager<T extends TaskQueue, S extends TaskSchedule<T>> {
private final TaskSchedulerFactory<T, S> schedulerFactory;
private final WorkerSizeProviderProxy workerSizeObserverProxy;
private final Map<String, TaskScheduleBucket> buckets = new HashMap<>();
private final TaskManagerUsageMetricsWrapper taskManagerMetrics;

public TaskManager(final ExecutorService executorService, final ScheduledExecutorService scheduledExecutorService, final AdaptorFactory factory,
final TaskSchedulerFactory<T, S> schedulerFactory, final WorkerSizeProviderProxy workerSizeObserverProxy) {
Expand All @@ -66,6 +71,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto
this.factory = factory;
this.schedulerFactory = schedulerFactory;
this.workerSizeObserverProxy = workerSizeObserverProxy;
this.taskManagerMetrics = new TaskManagerUsageMetricsWrapper(OpenTelemetryMetrics.METER);
}

/**
Expand All @@ -78,6 +84,7 @@ public void updateTaskScheduler(final TaskSchedule<T> schedule) throws Interrupt
// Set up scheduler with worker pool
final String workerQueueName = schedule.getWorkerQueueName();
final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType());

if (!buckets.containsKey(workerQueueName)) {
LOG.info("Added scheduler for worker queue {}", workerQueueName);
buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueConfig));
Expand Down Expand Up @@ -112,9 +119,20 @@ public void removeTaskScheduler(final String workerQueueName) {
public void shutdown() {
buckets.forEach((k, v) -> v.shutdown());
buckets.clear();
taskManagerMetrics.close();
}

/**
* Returns the {@link TaskScheduleBucket}. Intended to be used in tests.
*
* @param queueName queue to get the bucket for
* @return the bucket for the given queue name
*/
TaskScheduleBucket getTaskScheduleBucket(final String queueName) {
return buckets.get(queueName);
}

private class TaskScheduleBucket {
class TaskScheduleBucket {
private final TaskDispatcher dispatcher;
private final WorkerProducer workerProducer;
private final Map<String, TaskConsumer> taskConsumers = new HashMap<>();
Expand All @@ -129,17 +147,23 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep
taskScheduler = schedulerFactory.createScheduler(queueConfig);
workerProducer = factory.createWorkerProducer(queueConfig);
final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler);
final TaskManagerUsageMetricsProvider taskManagerUsageMetrics = new TaskManagerUsageMetricsProvider(workerQueueName);
final TaskManagerMetricsRegister taskManagerMetricsRegister = new TaskManagerMetricsRegister(taskManagerUsageMetrics, startupGuard);
final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(),
OpenTelemetryMetrics.METER, startupGuard);
OpenTelemetryMetrics.METER);

watchDog.addQueueWatchDogListener(workerPool);
watchDog.addQueueWatchDogListener(taskScheduler);
watchDog.addQueueWatchDogListener(reporter);
watchDog.addQueueWatchDogListener(taskManagerMetricsRegister);

workerProducer.addWorkerProducerHandler(reporter);
workerProducer.addWorkerProducerHandler(taskManagerMetricsRegister);
workerProducer.addWorkerProducerHandler(watchDog);

workerSizeObserverProxy.addObserver(workerQueueName, reporter);
final RabbitMQUsageMetricsProvider rabbitMQWorkerSizeObserver = new RabbitMQUsageMetricsProvider(workerQueueName);
workerSizeObserverProxy.addObserver(workerQueueName, rabbitMQWorkerSizeObserver);
workerSizeObserverProxy.addObserver(workerQueueName, taskManagerMetricsRegister);
workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
workerSizeObserverProxy.addObserver(workerQueueName, watchDog);
// startup Guard should be the last observer added as it will unlock the task dispatcher
Expand All @@ -158,6 +182,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep
// Wait for worker queue to be empty.
startupGuard.waitForOpen();
LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig);
taskManagerMetrics.addRabbitMQUsageMetricsProvider(rabbitMQWorkerSizeObserver);
taskManagerMetrics.addWorkerPoolUsageMetricsProvider(workerPool);
taskManagerMetrics.addTaskManagerUsageMetricsProvider(taskManagerUsageMetrics);
dispatcher.run();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -206,6 +233,7 @@ public void addTaskConsumerIfAbsent(final QueueConfig queueConfig) {
});
}


/**
* Removes a task consumer with the given queue name.
*
Expand All @@ -220,8 +248,19 @@ private void removeTaskConsumer(final String taskQueueName) {
public void shutdown() {
dispatcher.shutdown();
workerProducer.shutdown();
taskManagerMetrics.remove(workerQueueName);
WorkerPoolMetrics.removeMetrics(workerQueueName);
taskConsumers.forEach((k, v) -> v.shutdown());
}

/**
* Test method to check if there is a task consumer for the given queue name.
*
* @param queueName name of the queue to check
* @return true if the queue is present
*/
boolean hasTaskConsumer(final String queueName) {
return taskConsumers.containsKey(queueName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
import nl.aerius.taskmanager.domain.TaskRecord;
import nl.aerius.taskmanager.domain.WorkerUpdateHandler;
import nl.aerius.taskmanager.exception.NoFreeWorkersException;
import nl.aerius.taskmanager.metrics.UsageMetricsProvider;

/**
* Class to manage workers. Contains a list of all available workers, which are: free workers, reserved workers and running workers.
* <p>Free workers are workers available for processing tasks.
* <p>Reserved workers are workers that are waiting for a task to become available on the queue.
* <p>Running workers are workers for that are busy running the task and are waiting for the task to finish.
*/
class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener {
class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, UsageMetricsProvider, QueueWatchDogListener, WorkerMetrics {

private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class);

Expand Down Expand Up @@ -84,7 +85,13 @@ public void sendTaskToWorker(final Task task) throws IOException {
LOG.trace("[{}][taskId:{}] Task sent", workerQueueName, task.getId());
}

public int getWorkerSize() {
@Override
public String getWorkerQueueName() {
return workerQueueName;
}

@Override
public int getNumberOfWorkers() {
synchronized (this) {
return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers;
}
Expand All @@ -96,12 +103,17 @@ public int getReportedWorkerSize() {
}

@Override
public int getRunningWorkerSize() {
public int getNumberOfUsedWorkers() {
synchronized (this) {
return runningWorkers.size() + initialUnaccountedWorkers;
}
}

@Override
public int getNumberOfFreeWorkers() {
return freeWorkers.availablePermits();
}

@Override
public void onWorkerFinished(final String messageId, final Map<String, Object> messageMetaData) {
releaseWorker(messageId);
Expand Down Expand Up @@ -165,6 +177,17 @@ public void reserveWorker() {
}
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages, final int numberOfMessagesInProgress) {
synchronized (this) {
if (!firstUpdateReceived) {
initialUnaccountedWorkers = numberOfMessages;
firstUpdateReceived = true;
}
updateNumberOfWorkers(numberOfWorkers);
}
}

/**
* Sets the number of workers which are actually available. This number should
* be determined on the number of workers that are actually in operation.
Expand All @@ -175,23 +198,11 @@ public void reserveWorker() {
* workers matches the actual number.
*
* @param numberOfWorkers Actual size of number of workers in operation
* @param numberOfMessages Actual total number of messages on the queue
*/
@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
synchronized (this) {
if (!firstUpdateReceived) {
initialUnaccountedWorkers = numberOfMessages;
firstUpdateReceived = true;
}
updateNumberOfWorkers(numberOfWorkers);
}
}

private void updateNumberOfWorkers(final int numberOfWorkers) {
final int previousTotalReportedWorkers = totalReportedWorkers;
totalReportedWorkers = numberOfWorkers;
final int deltaWorkers = totalReportedWorkers - getWorkerSize();
final int deltaWorkers = totalReportedWorkers - getNumberOfWorkers();

if (deltaWorkers > 0) {
freeWorkers.release(deltaWorkers);
Expand Down
Loading
Loading