diff --git a/README.md b/README.md index a920e87..a7f9f4c 100644 --- a/README.md +++ b/README.md @@ -91,9 +91,10 @@ The json format of the configuration files is as follows: ``` { "workerQueueName": "", - "durable" : - "eagerFetch" : - "queueType": + "durable" : , + "eagerFetch" : , + "queueType": , + "maxWorkersAvailable": , "queues": [ { "queueName": "", @@ -115,7 +116,7 @@ Some queues can have derived messages on the queue, that are recreated by the pa For these queues it would make sense to not make them durable. RabbitMQ will require less storage space/IOPS and be faster as it won't need to depend on disk I/O for these queues. -The parameter `eagerFetch` indicates that all tasks on the queue will be prefetched. +The parameter `eagerFetch` (optional) indicates that all tasks on the queue will be prefetched. If not set or set to false it will only fetch a single task from the queue. By fetching all tasks from the queue as they arrive it can give different priorities to all tasks put on the same queue. Effectively it will give tasks on the same queue that have fewer tasks running on a worker a higher priority over tasks with more tasks running on a worker. @@ -128,6 +129,15 @@ If `durable` is true queues will be set to `classic` (or default if not set) bec Changing the `queueType` configuration parameter if the queues are already created won't work. If `queueType` needs to be changed, the queues need to be deleted first. +The parameter `maxWorkersAvailable` (optional) can be set to the maximum number of workers the system can scale to when using automatically scaling of workers. +This is useful because when worker resources are automatically scaled it can cause issues in combination with a low `maxCapacityUse` parameter. +For example if worker scaling is based on the percentage workers being used it can mean the system will never scale up because the `maxCapacityUse` for certain input queues will never exceed the scaling threshold percentage. +Especially when the system runs with a low number of workers the `maxCapacityUse` is easily reached before the scaling threshold is reached. +By setting `maxWorkersAvailable` the scheduler will determine if tasks already have reached the `maxCapacityUse` value based on the `maxWorkersAvailable` value and not the actual number of workers running. +It will than schedule tasks beyond the `maxCapacityUse` assuming this would trigger the system to scale up if needed, +and only for a short amount of time claim more resources for a specific queue than would be allowed by the `maxCapacityUse` percentage. +When more workers would be available than was configured as `maxWorkersAvailable` the scheduler will assign workers proportionally to the `maxCapacityUse` based on the actual number of workers available. + In `queues` there can be 1 or more queue configurations. Each queue configuration consists of 3 parameters: diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 551e92b..8638dc9 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -83,10 +83,11 @@ public TaskManager(final ExecutorService executorService, final ScheduledExecuto public void updateTaskScheduler(final TaskSchedule schedule) throws InterruptedException { // Set up scheduler with worker pool final String workerQueueName = schedule.getWorkerQueueName(); - final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), schedule.getQueueType()); + final QueueConfig workerQueueConfig = new QueueConfig(workerQueueName, schedule.isDurable(), schedule.isEagerFetch(), + schedule.getMaxWorkersAvailable(), schedule.getQueueType()); if (!buckets.containsKey(workerQueueName)) { - LOG.info("Added scheduler for worker queue {}", workerQueueName); + LOG.info("Added scheduler for worker queue {}", schedule); buckets.put(workerQueueName, new TaskScheduleBucket(workerQueueConfig)); } final TaskScheduleBucket taskScheduleBucket = buckets.get(workerQueueName); @@ -211,7 +212,7 @@ private void updateQueues(final List newTaskQueues, final QueueConfig workerQ private void addOrUpdateTaskQueue(final T taskQueueConfiguration, final QueueConfig workerQueueConfig) { addTaskConsumerIfAbsent(new QueueConfig(taskQueueConfiguration.getQueueName(), workerQueueConfig.durable(), workerQueueConfig.eagerFetch(), - workerQueueConfig.queueType())); + workerQueueConfig.maxWorkersAvailable(), workerQueueConfig.queueType())); taskScheduler.updateQueue(taskQueueConfiguration); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java index 55b2ac5..ebe8623 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueConfig.java @@ -16,10 +16,10 @@ */ package nl.aerius.taskmanager.domain; -public record QueueConfig(String queueName, boolean durable, boolean eagerFetch, RabbitMQQueueType queueType) { +public record QueueConfig(String queueName, boolean durable, boolean eagerFetch, int maxWorkersAvailable, RabbitMQQueueType queueType) { @Override public final String toString() { - return String.format("Queue name:%s, durable:%b, eagerFetch:%b, queueType:%s", queueName, durable, eagerFetch, - queueType == null ? "default" : queueType.type()); + return String.format("Queue name:%s, durable:%b, eagerFetch:%b, maxWorkersAvailable:%d, queueType:%s", queueName, durable, eagerFetch, + maxWorkersAvailable, queueType == null ? "default" : queueType.type()); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java index 6ea4e78..198d379 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/TaskSchedule.java @@ -32,6 +32,11 @@ public class TaskSchedule { private Boolean eagerFetch; + /* + * Maximum number of workers available when the system is fully scaled. + */ + private int maxWorkersAvailable; + private RabbitMQQueueType queueType; private List queues = new ArrayList<>(); @@ -75,4 +80,19 @@ public RabbitMQQueueType getQueueType() { public void setQueueType(final String queueType) { this.queueType = RabbitMQQueueType.valueOf(queueType.toUpperCase(Locale.ROOT)); } + + public int getMaxWorkersAvailable() { + return maxWorkersAvailable; + } + + public void setMaxWorkersAvailable(final int maxWorkersAvailable) { + this.maxWorkersAvailable = maxWorkersAvailable; + } + + @Override + public String toString() { + return "workerQueueName=" + workerQueueName + ", durable=" + durable + ", eagerFetch=" + eagerFetch + ", maxWorkersAvailable=" + + maxWorkersAvailable + ", queueType=" + queueType; + } + } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index 435cb7b..b655942 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -70,6 +70,11 @@ public void incrementOnWorker(final TaskRecord taskRecord) { tasksOnWorkersPerQueue.computeIfAbsent(key(taskRecord), k -> new AtomicInteger()).incrementAndGet(); } + /** + * Returns the number of tasks on the worker as known by the taskmanager. + * + * @return total number of tasks on the worker + */ public int onWorkerTotal() { return tasksOnWorkersPerQueue.entrySet().stream().mapToInt(e -> e.getValue().get()).sum(); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 1611086..0991116 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -50,15 +50,18 @@ class PriorityTaskScheduler implements TaskScheduler, Compara private final Lock lock = new ReentrantLock(); private final Condition nextTaskCondition = lock.newCondition(); private final String workerQueueName; + private final int maxWorkersAvailable; + private int numberOfWorkers; /** * Constructs scheduler for given configuration. */ PriorityTaskScheduler(final PriorityQueueMap priorityQueueKeyMap, final Function, Queue> queueCreator, - final String workerQueueName) { + final String workerQueueName, final int maxWorkersAvailable) { this.priorityQueueMap = priorityQueueKeyMap; this.workerQueueName = workerQueueName; + this.maxWorkersAvailable = maxWorkersAvailable; queue = queueCreator.apply(this); } @@ -136,7 +139,7 @@ private void obtainTask() { * @return true if this task is next in line */ private boolean isTaskNext(final TaskRecord taskRecord) { - final boolean taskNext = (numberOfWorkers == 1) || ((getFreeWorkers() > 1) && hasCapacityRemaining(taskRecord)) + final boolean taskNext = (numberOfWorkers == 1) || ((getPotentialFreeWorkers() > 1) && hasCapacityRemaining(taskRecord)) || (priorityQueueMap.onWorker(taskRecord) == 0); if (!taskNext) { @@ -147,13 +150,22 @@ private boolean isTaskNext(final TaskRecord taskRecord) { return taskNext; } - private int getFreeWorkers() { - return numberOfWorkers - priorityQueueMap.onWorkerTotal(); + private int getPotentialFreeWorkers() { + return potentialNumberOfWorkers() - priorityQueueMap.onWorkerTotal(); } private boolean hasCapacityRemaining(final TaskRecord taskRecord) { return (numberOfWorkers > 0) - && ((((double) priorityQueueMap.onWorker(taskRecord)) / numberOfWorkers) < priorityQueueMap.get(taskRecord).getMaxCapacityUse()); + && ((((double) priorityQueueMap.onWorker(taskRecord)) / potentialNumberOfWorkers()) < priorityQueueMap.get(taskRecord).getMaxCapacityUse()); + } + + /** + * Returns the number of workers that are available or the number of configured maximum numbers that are potential available. Which ever number + * is the highest. This number can be used to check if tasks can still be scheduled. If the actual number of workers is lower than the + * max workers that can be scaled up, the maximum to be scaled up should be used. Otherwise the real number of workers should be used. + */ + private int potentialNumberOfWorkers() { + return Math.max(numberOfWorkers, maxWorkersAvailable); } @Override @@ -242,7 +254,7 @@ public final int compare(final Task task1, final Task task2) { private int compareWith1Worker(final TaskRecord taskRecord1, final TaskRecord taskRecord2) { int cmp = 0; - if ((numberOfWorkers == 1) || (getFreeWorkers() == 1)) { + if ((numberOfWorkers == 1) || (getPotentialFreeWorkers() == 1)) { cmp = compareTaskOnQueue(taskRecord1, taskRecord2); if (cmp == 0) { cmp = comparePriority(taskRecord1, taskRecord2); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFactory.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFactory.java index 22b8b33..875c402 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFactory.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFactory.java @@ -40,7 +40,7 @@ public TaskScheduler createScheduler(final QueueConfig queueC final Function, Queue> queueCreator = c -> createQueue(queueConfig.eagerFetch(), c); final PriorityQueueMap priorityQueueMap = createPriorityQueueMap(queueConfig.eagerFetch()); - return new PriorityTaskScheduler(priorityQueueMap, queueCreator, queueConfig.queueName()); + return new PriorityTaskScheduler(priorityQueueMap, queueCreator, queueConfig.queueName(), queueConfig.maxWorkersAvailable()); } private static Queue createQueue(final boolean eagerFetch, final Comparator c) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFileHandler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFileHandler.java index 878881b..8471b6c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFileHandler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerFileHandler.java @@ -55,6 +55,7 @@ private PriorityTaskSchedule readFromFile(final File file) throws IOException { private PriorityTaskSchedule readFromEnvironment(final String workerQueueName) throws JsonProcessingException { final String environmentKey = ENV_PREFIX + workerQueueName.toUpperCase(Locale.ROOT); final String environmentValue = System.getenv(environmentKey); + if (environmentValue != null) { LOG.info("Using configuration for worker queue {} from environment", workerQueueName); return objectMapper.readValue(environmentValue, PriorityTaskSchedule.class); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index 7555254..ae341ef 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -66,7 +66,7 @@ void setUp() throws IOException { workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, workerProducer, scheduler); dispatcher = new TaskDispatcher(WORKER_QUEUE_NAME_TEST, scheduler, workerPool); factory = new MockAdaptorFactory(); - taskConsumer = new TaskConsumerImpl(executor, new QueueConfig("testqueue", false, false, null), dispatcher, factory); + taskConsumer = new TaskConsumerImpl(executor, new QueueConfig("testqueue", false, false, -1, null), dispatcher, factory); } @AfterEach diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 30554f2..523e5a0 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -64,7 +64,8 @@ void setUp() throws IOException { return null; }).when(workerUpdateHandler).onWorkerPoolSizeChange(anyInt()); workerPool = new WorkerPool(WORKER_QUEUE_NAME_TEST, new MockWorkerProducer(), workerUpdateHandler); - taskConsumer = new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig("testqueue", false, false, null), mock(ForwardTaskHandler.class), + taskConsumer = new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig("testqueue", false, false, -1, null), + mock(ForwardTaskHandler.class), new MockAdaptorFactory()) { @Override public void messageDelivered(final Message message) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index 0ba27a1..3662f53 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -62,7 +62,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { @Timeout(value = 10, unit = TimeUnit.SECONDS) void testMessageReceivedHandler() throws IOException, InterruptedException { final byte[] receivedBody = "4321".getBytes(); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, -1, null)); final Semaphore lock = new Semaphore(0); final DataDock data = new DataDock(); tmh.start(); @@ -108,7 +108,7 @@ void testReStart() throws IOException, InterruptedException { final AtomicInteger shutdownCallsCounter = new AtomicInteger(); final MessageReceivedHandler mockMessageReceivedHandler = mock(MessageReceivedHandler.class); - final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); + final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, -1, null)); ((RabbitMQMessageHandler) tmh).setRetryTimeMilliseconds(1L); doAnswer(invoke -> null).when(mockChannel).addShutdownListener(shutdownListenerCaptor.capture()); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java index 5a6f2e3..de12a4b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java @@ -48,7 +48,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest { void testForwardMessage() throws IOException, InterruptedException { final byte[] sendBody = "4321".getBytes(); - final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, null)); + final WorkerProducer wp = adapterFactory.createWorkerProducer(new QueueConfig(WORKER_QUEUE_NAME, false, false, -1, null)); wp.start(); final BasicProperties bp = new BasicProperties(); wp.dispatchMessage(new RabbitMQMessage(WORKER_QUEUE_NAME, null, 4321, bp, sendBody) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java index 227f6d0..5788270 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java @@ -38,6 +38,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +73,7 @@ class PriorityTaskSchedulerTest { private Task task2a; private Task task2b; private Task task3; + private PriorityTaskSchedule configuration; private PriorityTaskScheduler scheduler; @BeforeEach @@ -78,7 +81,7 @@ void setUp() throws IOException { taskConsumer1 = createMockTaskConsumer(QUEUE1); taskConsumer2 = createMockTaskConsumer(QUEUE2); final TaskConsumer taskConsumer3 = createMockTaskConsumer(QUEUE3); - final PriorityTaskSchedule configuration = new PriorityTaskSchedule(); + configuration = new PriorityTaskSchedule(); configuration.setWorkerQueueName("TEST"); final PriorityTaskQueue tc1 = new PriorityTaskQueue(QUEUE1, 0, TEST_CAPACITY); final PriorityTaskQueue tc2 = new PriorityTaskQueue(QUEUE2, 1, TEST_CAPACITY); @@ -86,8 +89,7 @@ void setUp() throws IOException { configuration.getQueues().add(tc1); configuration.getQueues().add(tc2); configuration.getQueues().add(tc3); - scheduler = (PriorityTaskScheduler) FACTORY.createScheduler(new QueueConfig(QUEUE1, false, true, null)); - configuration.getQueues().forEach(scheduler::updateQueue); + scheduler = createScheduler(0); task1 = createTask(taskConsumer1, "1"); task2a = createTask(taskConsumer2, "2a"); task2b = createTask(taskConsumer2, "2b"); @@ -202,24 +204,36 @@ void testGetTask() throws InterruptedException, ExecutionException { * Test if getNextTask correctly gets new task in case of a big worker pool. * If capacity is reached for a task, it should not run unless a task of the same queue is returned as finished. * In the meanwhile, other tasks can start/finish (as long as there is a capacity for those tasks). + * + * This test also tests the maxWorkersAvailable setting. + * + * @param maxWorkersAvailable the configured maximum number of workers available, 0 means not set + * @param nrOfTasksToTake number of tasks to initial take to reach 70% of total number of available workers */ - @Test + @ParameterizedTest + @CsvSource({ + // No max worker available set. With 10 actual workers available it should take 70% of that, which is 7 + "0,7", + // Max workers available is 20. With 10 actual workers it should look at the potential 20 numbers, and take 70% of that, which is 14. + "20,14"}) @Timeout(value = 7, unit = TimeUnit.SECONDS) - void testGetTaskBigPool() throws InterruptedException, ExecutionException { + void testGetTaskBigPool(final int maxWorkersAvailable, final int nrOfTasksToTake) throws InterruptedException, ExecutionException { + scheduler = createScheduler(maxWorkersAvailable); scheduler.onWorkerPoolSizeChange(10); final List tasks = new ArrayList<>(); final List sendTasks = new ArrayList<>(); - for (int i = 0; i < 10; i++) { + // Put 20 tasks in the queue to be picked up. + for (int i = 0; i < 20; i++) { final Task task = createTask(taskConsumer2, "1"); scheduler.addTask(task); tasks.add(task); } - for (int i = 0; i < 7; i++) { - final Task sendTask = scheduler.getNextTask(); - sendTasks.add(sendTask); + // It should at least be able to get the given nr of nrOfTasksToTake + for (int i = 0; i < nrOfTasksToTake; i++) { + sendTasks.add(scheduler.getNextTask()); } scheduler.addTask(task1); - assertSame(task1, scheduler.getNextTask(), "Should still get task 1"); + assertSame(task1, scheduler.getNextTask(), "Should still get task 1 because other tasks have reach the limit"); final Task task1b = createTask(taskConsumer1, "1b"); scheduler.addTask(task1b); assertSame(task1b, scheduler.getNextTask(), "Should still get task 1b"); @@ -294,7 +308,7 @@ public Task call() throws Exception { } private TaskConsumer createMockTaskConsumer(final String taskQueueName) throws IOException { - return new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig(taskQueueName, false, false, null), mock(ForwardTaskHandler.class), + return new TaskConsumerImpl(mock(ExecutorService.class), new QueueConfig(taskQueueName, false, false, -1, null), mock(ForwardTaskHandler.class), new MockAdaptorFactory()) { @Override public void messageDelivered(final Message message) { @@ -303,6 +317,14 @@ public void messageDelivered(final Message message) { }; } + private PriorityTaskScheduler createScheduler(final int maxWorkerAvailable) { + final PriorityTaskScheduler scheduler = (PriorityTaskScheduler) FACTORY + .createScheduler(new QueueConfig(QUEUE1, false, true, maxWorkerAvailable, null)); + + configuration.getQueues().forEach(scheduler::updateQueue); + return scheduler; + } + private static Task createTask(final TaskConsumer tc, final String messageId) { return new MockTask(tc, messageId); }