Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ The json format of the configuration files is as follows:
```
{
"workerQueueName": "<type of the queue>",
"durable" : <true|false>
"eagerFetch" : <true|false>
"queueType": <classic|quorum|stream>
"durable" : <true|false>,
"eagerFetch" : <true|false>,
"queueType": <classic|quorum|stream>,
"maxWorkersAvailable": <number>,
"queues": [
{
"queueName": "<client queue name>",
Expand All @@ -115,7 +116,7 @@ Some queues can have derived messages on the queue, that are recreated by the pa
For these queues it would make sense to not make them durable.
RabbitMQ will require less storage space/IOPS and be faster as it won't need to depend on disk I/O for these queues.

The parameter `eagerFetch` indicates that all tasks on the queue will be prefetched.
The parameter `eagerFetch` (optional) indicates that all tasks on the queue will be prefetched.
If not set or set to false it will only fetch a single task from the queue.
By fetching all tasks from the queue as they arrive it can give different priorities to all tasks put on the same queue.
Effectively it will give tasks on the same queue that have fewer tasks running on a worker a higher priority over tasks with more tasks running on a worker.
Expand All @@ -128,6 +129,15 @@ If `durable` is true queues will be set to `classic` (or default if not set) bec
Changing the `queueType` configuration parameter if the queues are already created won't work.
If `queueType` needs to be changed, the queues need to be deleted first.

The parameter `maxWorkersAvailable` (optional) can be set to the maximum number of workers the system can scale to when using automatically scaling of workers.
This is useful because when worker resources are automatically scaled it can cause issues in combination with a low `maxCapacityUse` parameter.
For example if worker scaling is based on the percentage workers being used it can mean the system will never scale up because the `maxCapacityUse` for certain input queues will never exceed the scaling threshold percentage.
Especially when the system runs with a low number of workers the `maxCapacityUse` is easily reached before the scaling threshold is reached.
By setting `maxWorkersAvailable` the scheduler will determine if tasks already have reached the `maxCapacityUse` value based on the `maxWorkersAvailable` value and not the actual number of workers running.
It will than schedule tasks beyond the `maxCapacityUse` assuming this would trigger the system to scale up if needed,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
It will than schedule tasks beyond the `maxCapacityUse` assuming this would trigger the system to scale up if needed,
It will then schedule tasks beyond the `maxCapacityUse`, assuming this will trigger the system to scale up as needed,

and only for a short amount of time claim more resources for a specific queue than would be allowed by the `maxCapacityUse` percentage.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only for a short time if the maxWorkersAvailable matches the actual system max scaling. If that isn't configured correctly, it might end up claiming more for longer periods (bit nitpicky perhaps).

When more workers would be available than was configured as `maxWorkersAvailable` the scheduler will assign workers proportionally to the `maxCapacityUse` based on the actual number of workers available.

In `queues` there can be 1 or more queue configurations.
Each queue configuration consists of 3 parameters:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto
public void updateTaskScheduler(final TaskSchedule<T> schedule) throws InterruptedException {
// Set up scheduler with worker pool
final String workerQueueName = schedule.getWorkerQueueName();
final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType());
final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(),
schedule.getMaxWorkersAvailable(), schedule.getQueueType());

if (!buckets.containsKey(workerQueueName)) {
LOG.info("Added scheduler for worker queue {}", workerQueueName);
LOG.info("Added scheduler for worker queue {}", schedule);
buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueConfig));
}
final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName);
Expand Down Expand Up @@ -211,7 +212,7 @@ private void updateQueues(final List<T> newTaskQueues, final QueueConfig workerQ

private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final QueueConfig workerQueueConfig) {
addTaskConsumerIfAbsent(new QueueConfig(taskQueueConfiguration.getQueueName(), workerQueueConfig.durable(), workerQueueConfig.eagerFetch(),
workerQueueConfig.queueType()));
workerQueueConfig.maxWorkersAvailable(), workerQueueConfig.queueType()));
taskScheduler.updateQueue(taskQueueConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package nl.aerius.taskmanager.domain;

public record QueueConfig(String queueName, boolean durable, boolean eagerFetch, RabbitMQQueueType queueType) {
public record QueueConfig(String queueName, boolean durable, boolean eagerFetch, int maxWorkersAvailable, RabbitMQQueueType queueType) {
@Override
public final String toString() {
return String.format("Queue name:%s, durable:%b, eagerFetch:%b, queueType:%s", queueName, durable, eagerFetch,
queueType == null ? "default" : queueType.type());
return String.format("Queue name:%s, durable:%b, eagerFetch:%b, maxWorkersAvailable:%d, queueType:%s", queueName, durable, eagerFetch,
maxWorkersAvailable, queueType == null ? "default" : queueType.type());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class TaskSchedule<T extends TaskQueue> {

private Boolean eagerFetch;

/*
* Maximum number of workers available when the system is fully scaled.
*/
private int maxWorkersAvailable;

private RabbitMQQueueType queueType;

private List<T> queues = new ArrayList<>();
Expand Down Expand Up @@ -75,4 +80,19 @@ public RabbitMQQueueType getQueueType() {
public void setQueueType(final String queueType) {
this.queueType = RabbitMQQueueType.valueOf(queueType.toUpperCase(Locale.ROOT));
}

public int getMaxWorkersAvailable() {
return maxWorkersAvailable;
}

public void setMaxWorkersAvailable(final int maxWorkersAvailable) {
this.maxWorkersAvailable = maxWorkersAvailable;
}

@Override
public String toString() {
return "workerQueueName=" + workerQueueName + ", durable=" + durable + ", eagerFetch=" + eagerFetch + ", maxWorkersAvailable="
+ maxWorkersAvailable + ", queueType=" + queueType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void incrementOnWorker(final TaskRecord taskRecord) {
tasksOnWorkersPerQueue.computeIfAbsent(key(taskRecord), k -> new AtomicInteger()).incrementAndGet();
}

/**
* Returns the number of tasks on the worker as known by the taskmanager.
*
* @return total number of tasks on the worker
*/
public int onWorkerTotal() {
return tasksOnWorkersPerQueue.entrySet().stream().mapToInt(e -> e.getValue().get()).sum();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ class PriorityTaskScheduler implements TaskScheduler<PriorityTaskQueue>, Compara
private final Lock lock = new ReentrantLock();
private final Condition nextTaskCondition = lock.newCondition();
private final String workerQueueName;
private final int maxWorkersAvailable;

private int numberOfWorkers;

/**
* Constructs scheduler for given configuration.
*/
PriorityTaskScheduler(final PriorityQueueMap<?> priorityQueueKeyMap, final Function<Comparator<Task>, Queue<Task>> queueCreator,
final String workerQueueName) {
final String workerQueueName, final int maxWorkersAvailable) {
this.priorityQueueMap = priorityQueueKeyMap;
this.workerQueueName = workerQueueName;
this.maxWorkersAvailable = maxWorkersAvailable;
queue = queueCreator.apply(this);
}

Expand Down Expand Up @@ -136,7 +139,7 @@ private void obtainTask() {
* @return true if this task is next in line
*/
private boolean isTaskNext(final TaskRecord taskRecord) {
final boolean taskNext = (numberOfWorkers == 1) || ((getFreeWorkers() > 1) && hasCapacityRemaining(taskRecord))
final boolean taskNext = (numberOfWorkers == 1) || ((getPotentialFreeWorkers() > 1) && hasCapacityRemaining(taskRecord))
|| (priorityQueueMap.onWorker(taskRecord) == 0);

if (!taskNext) {
Expand All @@ -147,13 +150,22 @@ private boolean isTaskNext(final TaskRecord taskRecord) {
return taskNext;
}

private int getFreeWorkers() {
return numberOfWorkers - priorityQueueMap.onWorkerTotal();
private int getPotentialFreeWorkers() {
return potentialNumberOfWorkers() - priorityQueueMap.onWorkerTotal();
}

private boolean hasCapacityRemaining(final TaskRecord taskRecord) {
return (numberOfWorkers > 0)
&& ((((double) priorityQueueMap.onWorker(taskRecord)) / numberOfWorkers) < priorityQueueMap.get(taskRecord).getMaxCapacityUse());
&& ((((double) priorityQueueMap.onWorker(taskRecord)) / potentialNumberOfWorkers()) < priorityQueueMap.get(taskRecord).getMaxCapacityUse());
}

/**
* Returns the number of workers that are available or the number of configured maximum numbers that are potential available. Which ever number
* is the highest. This number can be used to check if tasks can still be scheduled. If the actual number of workers is lower than the
* max workers that can be scaled up, the maximum to be scaled up should be used. Otherwise the real number of workers should be used.
*/
private int potentialNumberOfWorkers() {
return Math.max(numberOfWorkers, maxWorkersAvailable);
}

@Override
Expand Down Expand Up @@ -242,7 +254,7 @@ public final int compare(final Task task1, final Task task2) {

private int compareWith1Worker(final TaskRecord taskRecord1, final TaskRecord taskRecord2) {
int cmp = 0;
if ((numberOfWorkers == 1) || (getFreeWorkers() == 1)) {
if ((numberOfWorkers == 1) || (getPotentialFreeWorkers() == 1)) {
cmp = compareTaskOnQueue(taskRecord1, taskRecord2);
if (cmp == 0) {
cmp = comparePriority(taskRecord1, taskRecord2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public TaskScheduler<PriorityTaskQueue> createScheduler(final QueueConfig queueC
final Function<Comparator<Task>, Queue<Task>> queueCreator = c -> createQueue(queueConfig.eagerFetch(), c);
final PriorityQueueMap<?> priorityQueueMap = createPriorityQueueMap(queueConfig.eagerFetch());

return new PriorityTaskScheduler(priorityQueueMap, queueCreator, queueConfig.queueName());
return new PriorityTaskScheduler(priorityQueueMap, queueCreator, queueConfig.queueName(), queueConfig.maxWorkersAvailable());
}

private static Queue<Task> createQueue(final boolean eagerFetch, final Comparator<Task> c) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private PriorityTaskSchedule readFromFile(final File file) throws IOException {
private PriorityTaskSchedule readFromEnvironment(final String workerQueueName) throws JsonProcessingException {
final String environmentKey = ENV_PREFIX + workerQueueName.toUpperCase(Locale.ROOT);
final String environmentValue = System.getenv(environmentKey);

if (environmentValue != null) {
LOG.info("Using configuration for worker queue {} from environment", workerQueueName);
return objectMapper.readValue(environmentValue, PriorityTaskSchedule.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setUp() throws IOException {
workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, workerProducer, scheduler);
dispatcher = new TaskDispatcher(WORKER_QUEUE_NAME_TEST, scheduler, workerPool);
factory = new MockAdaptorFactory();
taskConsumer = new TaskConsumerImpl(executor, new QueueConfig("testqueue", false, false, null), dispatcher, factory);
taskConsumer = new TaskConsumerImpl(executor, new QueueConfig("testqueue", false, false, -1, null), dispatcher, factory);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void setUp() throws IOException {
return null;
}).when(workerUpdateHandler).onWorkerPoolSizeChange(anyInt());
workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, new MockWorkerProducer(), workerUpdateHandler);
taskConsumer = new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig("testqueue", false, false, null), mock(ForwardTaskHandler.class),
taskConsumer = new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig("testqueue", false, false, -1, null),
mock(ForwardTaskHandler.class),
new MockAdaptorFactory()) {
@Override
public void messageDelivered(final Message message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest {
@Timeout(value = 10, unit = TimeUnit.SECONDS)
void testMessageReceivedHandler() throws IOException, InterruptedException {
final byte[] receivedBody = "4321".getBytes();
final TaskMessageHandler<?> tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null));
final TaskMessageHandler<?> tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, -1, null));
final Semaphore lock = new Semaphore(0);
final DataDock data = new DataDock();
tmh.start();
Expand Down Expand Up @@ -108,7 +108,7 @@ void testReStart() throws IOException, InterruptedException {
final AtomicInteger shutdownCallsCounter = new AtomicInteger();

final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class);
final TaskMessageHandler<?> tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null));
final TaskMessageHandler<?> tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, -1, null));

((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L);
doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest {
void testForwardMessage() throws IOException, InterruptedException {
final byte[] sendBody = "4321".getBytes();

final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, null));
final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, -1, null));
wp.start();
final BasicProperties bp = new BasicProperties();
wp.dispatchMessage(new RabbitMQMessage(WORKER_QUEUE_NAME, null, 4321, bp, sendBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -71,23 +73,23 @@ class PriorityTaskSchedulerTest {
private Task task2a;
private Task task2b;
private Task task3;
private PriorityTaskSchedule configuration;
private PriorityTaskScheduler scheduler;

@BeforeEach
void setUp() throws IOException {
taskConsumer1 = createMockTaskConsumer(QUEUE1);
taskConsumer2 = createMockTaskConsumer(QUEUE2);
final TaskConsumer taskConsumer3 = createMockTaskConsumer(QUEUE3);
final PriorityTaskSchedule configuration = new PriorityTaskSchedule();
configuration = new PriorityTaskSchedule();
configuration.setWorkerQueueName("TEST");
final PriorityTaskQueue tc1 = new PriorityTaskQueue(QUEUE1, 0, TEST_CAPACITY);
final PriorityTaskQueue tc2 = new PriorityTaskQueue(QUEUE2, 1, TEST_CAPACITY);
final PriorityTaskQueue tc3 = new PriorityTaskQueue(QUEUE3, 1, TEST_CAPACITY);
configuration.getQueues().add(tc1);
configuration.getQueues().add(tc2);
configuration.getQueues().add(tc3);
scheduler = (PriorityTaskScheduler) FACTORY.createScheduler(new QueueConfig(QUEUE1, false, true, null));
configuration.getQueues().forEach(scheduler::updateQueue);
scheduler = createScheduler(0);
task1 = createTask(taskConsumer1, "1");
task2a = createTask(taskConsumer2, "2a");
task2b = createTask(taskConsumer2, "2b");
Expand Down Expand Up @@ -202,24 +204,36 @@ void testGetTask() throws InterruptedException, ExecutionException {
* Test if getNextTask correctly gets new task in case of a big worker pool.
* If capacity is reached for a task, it should not run unless a task of the same queue is returned as finished.
* In the meanwhile, other tasks can start/finish (as long as there is a capacity for those tasks).
*
* This test also tests the maxWorkersAvailable setting.
*
* @param maxWorkersAvailable the configured maximum number of workers available, 0 means not set
* @param nrOfTasksToTake number of tasks to initial take to reach 70% of total number of available workers
*/
@Test
@ParameterizedTest
@CsvSource({
// No max worker available set. With 10 actual workers available it should take 70% of that, which is 7
"0,7",
// Max workers available is 20. With 10 actual workers it should look at the potential 20 numbers, and take 70% of that, which is 14.
"20,14"})
@Timeout(value = 7, unit = TimeUnit.SECONDS)
void testGetTaskBigPool() throws InterruptedException, ExecutionException {
void testGetTaskBigPool(final int maxWorkersAvailable, final int nrOfTasksToTake) throws InterruptedException, ExecutionException {
scheduler = createScheduler(maxWorkersAvailable);
scheduler.onWorkerPoolSizeChange(10);
final List<Task> tasks = new ArrayList<>();
final List<Task> sendTasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// Put 20 tasks in the queue to be picked up.
for (int i = 0; i < 20; i++) {
final Task task = createTask(taskConsumer2, "1");
scheduler.addTask(task);
tasks.add(task);
}
for (int i = 0; i < 7; i++) {
final Task sendTask = scheduler.getNextTask();
sendTasks.add(sendTask);
// It should at least be able to get the given nr of nrOfTasksToTake
for (int i = 0; i < nrOfTasksToTake; i++) {
sendTasks.add(scheduler.getNextTask());
}
scheduler.addTask(task1);
assertSame(task1, scheduler.getNextTask(), "Should still get task 1");
assertSame(task1, scheduler.getNextTask(), "Should still get task 1 because other tasks have reach the limit");
final Task task1b = createTask(taskConsumer1, "1b");
scheduler.addTask(task1b);
assertSame(task1b, scheduler.getNextTask(), "Should still get task 1b");
Expand Down Expand Up @@ -294,7 +308,7 @@ public Task call() throws Exception {
}

private TaskConsumer createMockTaskConsumer(final String taskQueueName) throws IOException {
return new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig(taskQueueName, false, false, null), mock(ForwardTaskHandler.class),
return new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig(taskQueueName, false, false, -1, null), mock(ForwardTaskHandler.class),
new MockAdaptorFactory()) {
@Override
public void messageDelivered(final Message message) {
Expand All @@ -303,6 +317,14 @@ public void messageDelivered(final Message message) {
};
}

private PriorityTaskScheduler createScheduler(final int maxWorkerAvailable) {
final PriorityTaskScheduler scheduler = (PriorityTaskScheduler) FACTORY
.createScheduler(new QueueConfig(QUEUE1, false, true, maxWorkerAvailable, null));

configuration.getQueues().forEach(scheduler::updateQueue);
return scheduler;
}

private static Task createTask(final TaskConsumer tc, final String messageId) {
return new MockTask(tc, messageId);
}
Expand Down
Loading