Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright the State of the Netherlands
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package nl.aerius.taskmanager;

import java.util.concurrent.Semaphore;

import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;

/**
* Class to be used at startup. The Scheduler should not start before it is known how many messages are still on the queue.
* This to register any work that is still on the queuue and to properly calculate load metrics.
* Because the Taskmanager is not aware of the tasks already on the queue and therefore otherwise these messaages won't be counted in the metrics.
* This can result in the metrics being skewed, and thereby negatively reporting load metrics.
*/
public class StartupGuard implements WorkerSizeObserver {

private final Semaphore openSemaphore = new Semaphore(0);

private boolean open;

/**
* @return Returns true once the number of messages has become zero for the first time.
*/
public boolean isOpen() {
return open;
}

/**
* Wait for the number of messages on the message queue to become zero.
*/
public void waitForOpen() throws InterruptedException {
openSemaphore.acquire();
}

@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
synchronized (openSemaphore) {
if (!open) {
open = true;
openSemaphore.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public enum State {

private final WorkerPool workerPool;
private final TaskScheduler<?> scheduler;
private final String workerQueueName;

private boolean running;
private State state;
private final String workerQueueName;

public TaskDispatcher(final String workerQueueName, final TaskScheduler<?> scheduler, final WorkerPool workerPool) {
this.workerQueueName = workerQueueName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,7 +74,7 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto
* @param schedule scheduler configuration
* @throws InterruptedException
*/
public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws InterruptedException {
public void updateTaskScheduler(final TaskSchedule<T> schedule) throws InterruptedException {
// Set up scheduler with worker pool
final String workerQueueName = schedule.getWorkerQueueName();
final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType());
Expand All @@ -85,7 +85,10 @@ public boolean updateTaskScheduler(final TaskSchedule<T> schedule) throws Interr
final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName);

taskScheduleBucket.updateQueues(schedule.getQueues(), workerQueueConfig);
return taskScheduleBucket.isRunning();
}

public boolean isRunning(final String queue) {
return Optional.ofNullable(buckets.get(queue)).map(TaskScheduleBucket::isRunning).orElse(false);
}

/**
Expand Down Expand Up @@ -121,11 +124,13 @@ private class TaskScheduleBucket {
public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException {
this.workerQueueName = queueConfig.queueName();
final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName);
final StartupGuard startupGuard = new StartupGuard();

taskScheduler = schedulerFactory.createScheduler(queueConfig);
workerProducer = factory.createWorkerProducer(queueConfig);
final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler);
final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(),
OpenTelemetryMetrics.METER, workerPool);
OpenTelemetryMetrics.METER, startupGuard);

watchDog.addQueueWatchDogListener(workerPool);
watchDog.addQueueWatchDogListener(taskScheduler);
Expand All @@ -134,24 +139,33 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep
workerProducer.addWorkerProducerHandler(reporter);
workerProducer.addWorkerProducerHandler(watchDog);

workerSizeObserverProxy.addObserver(workerQueueName, reporter);
workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
workerSizeObserverProxy.addObserver(workerQueueName, watchDog);
// startup Guard should be the last observer added as it will unlock the task dispatcher
workerSizeObserverProxy.addObserver(workerQueueName, startupGuard);

if (taskScheduler instanceof final WorkerSizeObserver wzo) {
workerSizeObserverProxy.addObserver(workerQueueName, wzo);
}
workerProducer.start();
// Set up metrics
WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName);
// Set up dispatcher

dispatcher = new TaskDispatcher(workerQueueName, taskScheduler, workerPool);
executorService.execute(dispatcher);
Thread.sleep(TimeUnit.SECONDS.toMillis(1)); // just wait a little second to make sure the process is actually running.
LOG.info("Started taskscheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig);
executorService.execute(() -> {
try {
// Wait for worker queue to be empty.
startupGuard.waitForOpen();
LOG.info("Starting task scheduler {}: {}", taskScheduler.getClass().getSimpleName(), queueConfig);
dispatcher.run();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}});
}

/**
* @return
* @return true if the dispatcher is running.
*/
public boolean isRunning() {
return dispatcher.isRunning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMet

private final Semaphore freeWorkers = new Semaphore(0);
private final Map<String, TaskRecord> runningWorkers = new ConcurrentHashMap<>();
private int totalConfiguredWorkers;
private final String workerQueueName;
private final WorkerProducer wp;
private final WorkerUpdateHandler workerUpdateHandler;

private int totalReportedWorkers;
private int initialUnaccountedWorkers;
private boolean firstUpdateReceived;

public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) {
this.workerQueueName = workerQueueName;
this.wp = wp;
Expand Down Expand Up @@ -83,19 +86,19 @@ public void sendTaskToWorker(final Task task) throws IOException {

public int getWorkerSize() {
synchronized (this) {
return freeWorkers.availablePermits() + runningWorkers.size();
return freeWorkers.availablePermits() + runningWorkers.size() + initialUnaccountedWorkers;
}
}

@Override
public int getReportedWorkerSize() {
return totalConfiguredWorkers;
return totalReportedWorkers;
}

@Override
public int getRunningWorkerSize() {
synchronized (this) {
return runningWorkers.size();
return runningWorkers.size() + initialUnaccountedWorkers;
}
}

Expand All @@ -120,24 +123,32 @@ void releaseWorker(final String taskId) {
* @param taskRecord the task is expected to be on.
*/
void releaseWorker(final String taskId, final TaskRecord taskRecord) {
if (taskRecord != null) {
synchronized (this) {
if (runningWorkers.containsKey(taskId)) {
// if currentSize is smaller than the worker size it means the worker
// must not be re-added as free worker but removed from the pool.
if (totalConfiguredWorkers >= runningWorkers.size()) {
freeWorkers.release(1);
}
runningWorkers.remove(taskId);
} else {
LOG.info("[{}][taskId:{}] Task for queue '{}' not found, maybe it was already released).", workerQueueName, taskId, taskRecord.queueName());
synchronized (this) {
if (runningWorkers.containsKey(taskId)) {
freeWorker();
runningWorkers.remove(taskId);
} else {
if (initialUnaccountedWorkers > 0) {
freeWorker();
}
initialUnaccountedWorkers = Math.max(initialUnaccountedWorkers - 1, 0);
LOG.info("[{}][taskId:{}] Unknown task received, possible left over of restart).", workerQueueName, taskId);
}
}
if (taskRecord != null) {
workerUpdateHandler.onTaskFinished(taskRecord);
LOG.debug("[{}][taskId:{}] Task released).", workerQueueName, taskId);
}
}

private void freeWorker() {
// if currentSize is smaller than the worker size it means the worker
// must not be re-added as free worker but removed from the pool.
if (totalReportedWorkers >= (runningWorkers.size() + initialUnaccountedWorkers)) {
freeWorkers.release(1);
}
}

/**
* Takes a worker from the free workers list and add it to the reserved workers list. Blocks until a worker becomes available on the free worker
* list.
Expand Down Expand Up @@ -169,24 +180,28 @@ public void reserveWorker() {
@Override
public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
synchronized (this) {
if (!firstUpdateReceived) {
initialUnaccountedWorkers = numberOfMessages;
firstUpdateReceived = true;
}
updateNumberOfWorkers(numberOfWorkers);
}
}

private void updateNumberOfWorkers(final int numberOfWorkers) {
final int previousTotalConfiguredWorkers = totalConfiguredWorkers;
totalConfiguredWorkers = numberOfWorkers;
final int deltaWorkers = totalConfiguredWorkers - getWorkerSize();
final int previousTotalReportedWorkers = totalReportedWorkers;
totalReportedWorkers = numberOfWorkers;
final int deltaWorkers = totalReportedWorkers - getWorkerSize();

if (deltaWorkers > 0) {
freeWorkers.release(deltaWorkers);
LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers);
LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalReportedWorkers, deltaWorkers);
} else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0)
&& freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers))) {
LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers);
LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalReportedWorkers, deltaWorkers);
}
if (previousTotalConfiguredWorkers != totalConfiguredWorkers) {
workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers);
if (previousTotalReportedWorkers != totalReportedWorkers) {
workerUpdateHandler.onWorkerPoolSizeChange(totalReportedWorkers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import io.opentelemetry.api.metrics.DoubleGauge;
import io.opentelemetry.api.metrics.Meter;

import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics;
import nl.aerius.taskmanager.StartupGuard;
import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.client.TaskMetrics;
import nl.aerius.taskmanager.domain.QueueWatchDogListener;
import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue;
Expand All @@ -52,7 +53,7 @@
*
* - Average load (in percentage) of all workers (of a certain type) together.
*/
public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener {
public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener, WorkerSizeObserver {

private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class);

Expand All @@ -77,21 +78,24 @@ public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueW
private final DurationMetric workWorkerMetrics;
private final LoadMetric loadMetrics = new LoadMetric();

private final StartupGuard startupGuard;

private final Meter meter;
private final String queueGroupName;
private final WorkerMetrics workerMetrics;
private final DoubleGauge loadGauge;

private final Attributes workerAttributes;
// Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue
// as it doesn't have any metrics on it anymore.
private final Set<String> dispatchedTasks = new HashSet<>();

private int numberOfWorkers;

public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter,
final WorkerMetrics workerMetrics) {
final StartupGuard startupGuard) {
this.queueGroupName = queueGroupName;
this.meter = meter;
this.workerMetrics = workerMetrics;
this.startupGuard = startupGuard;

// Gauges for measuring number of tasks, and average duration time it took before a task was send to to the worker.
// Measures by worker and per queue to the worker
Expand Down Expand Up @@ -136,7 +140,7 @@ public void onWorkDispatched(final String messageId, final Map<String, Object> m
taskMetrics.determineDuration();
dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
dispatchedWorkerMetrics.register(taskMetrics);
loadMetrics.register(1, workerMetrics.getReportedWorkerSize());
loadMetrics.register(1, numberOfWorkers);
}

@Override
Expand All @@ -145,8 +149,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map<Stri
taskMetrics.determineDuration();
workQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
workWorkerMetrics.register(taskMetrics);
if (dispatchedTasks.remove(messageId)) {
loadMetrics.register(-1, workerMetrics.getReportedWorkerSize());
loadMetrics.register(-1, numberOfWorkers);
}

@Override
public synchronized void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) {
this.numberOfWorkers = numberOfWorkers;
if (!startupGuard.isOpen() && numberOfMessages > 0) {
LOG.info("Queue {} will be started with {} messages already on the queue.", queueGroupName, numberOfMessages);
loadMetrics.register(numberOfMessages, numberOfWorkers);
}
}

Expand Down Expand Up @@ -195,9 +206,11 @@ private static void metrics(final String prefixText, final DoubleGauge gauge, fi
}

private void workLoad() {
final double load = loadMetrics.process();
if (startupGuard.isOpen()) {
final double load = loadMetrics.process();

loadGauge.set(load, workerAttributes);
LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load));
loadGauge.set(load, workerAttributes);
LOG.debug("Workload for '{}' is: {}%", queueGroupName, Math.round(load));
}
}
}
Loading
Loading