From 746c7608009254ead4f756617834a5fc438f0dac Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Tue, 7 Apr 2026 15:27:09 +0300 Subject: [PATCH 1/4] bug: use taskCount from ioConfig for scale action instead of activeTaskGroups --- .../autoscaler/LagBasedAutoScaler.java | 4 +- .../SeekableStreamSupervisorSpecTest.java | 12 +- .../autoscaler/LagBasedAutoScalerTest.java | 110 ++++++++++++++++++ 3 files changed, 115 insertions(+), 11 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index bf35576fbd85..61eb055e0d54 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -212,7 +212,7 @@ private Runnable computeAndCollectLag() * @param lags the lag metrics of Stream (Kafka/Kinesis) * @return Integer, target number of tasksCount. -1 means skip scale action. */ - private int computeDesiredTaskCount(List lags) + int computeDesiredTaskCount(List lags) { // if the supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop @@ -239,7 +239,7 @@ private int computeDesiredTaskCount(List lags) withinProportion, spec.getId() ); - int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); + int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); int desiredActiveTaskCount; int partitionCount = supervisor.getPartitionCount(); if (partitionCount <= 0) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index f0bdac00a8ce..7c20855b033d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(2, taskCountAfterScaleOut); + Assert.assertEquals(3, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) @@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In EasyMock.replay(taskMaster); StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10) - { - @Override - public int getActiveTaskGroupsCount() - { - return 2; - } - }; + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( supervisor, @@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount() spec, dynamicActionEmitter ); + supervisor.getIoConfig().setTaskCount(2); supervisor.start(); autoScaler.start(); supervisor.runInternal(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java new file mode 100644 index 000000000000..1352843943ca --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.when; + + +public class LagBasedAutoScalerTest +{ + private static final int ACTIVE_TASK_GROUP_AMOUNT = 25; + private static final int PARTITION_COUNT = 100; + private static final int TASK_COUNT_MIN = 50; + private static final int TASK_COUNT_MAX = 100; + private static final int SCALE_OUT_STEP = 4; + private static final long SCALE_OUT_THRESHOLD = 2_000_000L; + private static final long SCALE_IN_THRESHOLD = 300_000L; + private static final double TRIGGER_SCALE_OUT_FRACTION = 0.7; + private static final double TRIGGER_SCALE_IN_FRACTION = 0.9; + + private SupervisorSpec mockSpec; + private SeekableStreamSupervisor mockSupervisor; + private SeekableStreamSupervisorIOConfig mockIoConfig; + private ServiceEmitter mockEmitter; + private LagBasedAutoScalerConfig config; + + @Before + public void setUp() + { + mockSpec = Mockito.mock(SupervisorSpec.class); + mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class); + mockEmitter = Mockito.mock(ServiceEmitter.class); + mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(mockSpec.getId()).thenReturn("test-supervisor"); + when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource")); + when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); + when(mockIoConfig.getStream()).thenReturn("test-stream"); + + config = new LagBasedAutoScalerConfig( + 30_000L, // lagCollectionIntervalMillis + 300_000L, // lagCollectionRangeMillis + 300_000L, // scaleActionStartDelayMillis + 60_000L, // scaleActionPeriodMillis + SCALE_OUT_THRESHOLD, + SCALE_IN_THRESHOLD, + TRIGGER_SCALE_OUT_FRACTION, + TRIGGER_SCALE_IN_FRACTION, + TASK_COUNT_MAX, + null, // taskCountStart + TASK_COUNT_MIN, + 1, // scaleInStep + SCALE_OUT_STEP, + true, // enableTaskAutoScaler + 6_000_000L, // minTriggerScaleActionFrequencyMillis + null, // lagAggregate + null // stopTaskCountRatio + ); + } + + /** + * Reproduces the bug where scale-out from a low activelyReadingTaskGroups count + * yields a desiredTaskCount below taskCountMin. + */ + @Test + public void testScaleOutDoesNotReturnCountBelowTaskCountMin() + { + when(mockIoConfig.getTaskCount()).thenReturn(TASK_COUNT_MIN); + when(mockSupervisor.getActiveTaskGroupsCount()).thenReturn(ACTIVE_TASK_GROUP_AMOUNT); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); + List highLagSamples = Collections.nCopies(11, SCALE_OUT_THRESHOLD + 1); + + int result = autoScaler.computeDesiredTaskCount(new ArrayList<>(highLagSamples)); + + // Bug: old code used getActiveTaskGroupsCount()=25 as baseline → 25+4=29 < taskCountMin(50) + // Fix: uses ioConfig.getTaskCount()=50 as baseline → 50+4=54 >= taskCountMin(50) + Assert.assertEquals(TASK_COUNT_MIN + SCALE_OUT_STEP, result); + } +} From b385bf7b5f9c67b0bbe83733a59dcb77bbadd53d Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Tue, 7 Apr 2026 17:39:19 +0300 Subject: [PATCH 2/4] Increase timeout for the test --- .../testing/embedded/indexing/KafkaClusterMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java index 39742a9d5787..bc7385e6bf6e 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java @@ -153,7 +153,7 @@ public static Stream getCompactionSupervisorTestParams() } @Test - @Timeout(20) + @Timeout(120) public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues() { final int maxRowsPerSegment = 1000; From b88fe7b83f91c25185ea76d2bdf07a7ca99dfbd2 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 8 Apr 2026 11:05:28 +0300 Subject: [PATCH 3/4] In case of boundaries breach, scale to bound limit --- .../autoscaler/CostBasedAutoScaler.java | 19 ++++- .../autoscaler/LagBasedAutoScaler.java | 22 ++++-- .../CostBasedAutoScalerMockTest.java | 50 ++++++++++++ .../autoscaler/CostBasedAutoScalerTest.java | 1 + .../autoscaler/LagBasedAutoScalerTest.java | 79 +++++++++++-------- 5 files changed, 129 insertions(+), 42 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index f6527beb8ec4..6fbff6bba4f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction() lastKnownMetrics = collectMetrics(); final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics); - final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + + // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. + // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. + final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() + || currentTaskCount > config.getTaskCountMax(); + if (isTaskCountOutOfBounds) { + currentTaskCount = Math.min(config.getTaskCountMax(), + Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount())); + } // Perform scale-up actions; scale-down actions only if configured. final int taskCount; - if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { + + // If task count is out of bounds, scale to the configured boundary + // regardless of optimal task count, to get back to a safe state. + if (isScaleActionAllowed() && isTaskCountOutOfBounds) { + taskCount = currentTaskCount; + log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); + } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { taskCount = optimalTaskCount; lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 61eb055e0d54..88767c32d163 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -241,17 +241,28 @@ int computeDesiredTaskCount(List lags) int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); int desiredActiveTaskCount; - int partitionCount = supervisor.getPartitionCount(); + final int partitionCount = supervisor.getPartitionCount(); if (partitionCount <= 0) { log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId()); return -1; } + final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); + final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + + // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. + // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. + // If that is happening, take the bound and return early. + final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin + || currentActiveTaskCount > actualTaskCountMax; + if (isTaskCountOutOfBounds) { + currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount)); + return currentActiveTaskCount; + } + if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { // Do Scale out - int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); - - int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); + final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); if (currentActiveTaskCount == actualTaskCountMax) { log.debug( "CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].", @@ -272,8 +283,7 @@ int computeDesiredTaskCount(List lags) if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { // Do Scale in - int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); - int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); if (currentActiveTaskCount == actualTaskCountMin) { log.debug( "CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java index 7b46c4c65589..2a7384a4f392 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java @@ -232,6 +232,56 @@ public void testScaleUpFromMinimumTasks() ); } + @Test + public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + { + CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(50) + .enableTaskAutoScaler(true) + .build(); + CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); + + final int configuredTaskCount = 1; + final int expectedTaskCount = 50; + + doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); + setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2); + + final int result = autoScaler.computeTaskCountForScaleAction(); + + Assert.assertEquals( + "Should scale to taskCountMin when the configured task count is below the minimum boundary", + expectedTaskCount, + result + ); + } + + @Test + public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + { + CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(50) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .build(); + CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); + + final int configuredTaskCount = 100; + final int expectedTaskCount = 50; + + doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); + setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8); + + final int result = autoScaler.computeTaskCountForScaleAction(); + + Assert.assertEquals( + "Should scale to taskCountMax when the configured task count is above the maximum boundary", + expectedTaskCount, + result + ); + } + @Test public void testScaleUpToMaximumTasks() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index 12f52279226e..4b5666a85ee0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -495,6 +495,7 @@ public void testScalingActionSkippedWhenMovingAverageRateUnavailable() when(supervisor.getIoConfig()).thenReturn(ioConfig); when(ioConfig.getStream()).thenReturn("test-stream"); when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1)); + when(ioConfig.getTaskCount()).thenReturn(1); when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100)); // No task stats means the moving average rate is unavailable when(supervisor.getStats()).thenReturn(Collections.emptyMap()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java index 1352843943ca..7f5a0f5a9dfe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -37,15 +37,7 @@ public class LagBasedAutoScalerTest { - private static final int ACTIVE_TASK_GROUP_AMOUNT = 25; private static final int PARTITION_COUNT = 100; - private static final int TASK_COUNT_MIN = 50; - private static final int TASK_COUNT_MAX = 100; - private static final int SCALE_OUT_STEP = 4; - private static final long SCALE_OUT_THRESHOLD = 2_000_000L; - private static final long SCALE_IN_THRESHOLD = 300_000L; - private static final double TRIGGER_SCALE_OUT_FRACTION = 0.7; - private static final double TRIGGER_SCALE_IN_FRACTION = 0.9; private SupervisorSpec mockSpec; private SeekableStreamSupervisor mockSupervisor; @@ -67,44 +59,63 @@ public void setUp() when(mockIoConfig.getStream()).thenReturn("test-stream"); config = new LagBasedAutoScalerConfig( - 30_000L, // lagCollectionIntervalMillis - 300_000L, // lagCollectionRangeMillis - 300_000L, // scaleActionStartDelayMillis - 60_000L, // scaleActionPeriodMillis - SCALE_OUT_THRESHOLD, - SCALE_IN_THRESHOLD, - TRIGGER_SCALE_OUT_FRACTION, - TRIGGER_SCALE_IN_FRACTION, - TASK_COUNT_MAX, - null, // taskCountStart - TASK_COUNT_MIN, - 1, // scaleInStep - SCALE_OUT_STEP, - true, // enableTaskAutoScaler + 30_000L, // lagCollectionIntervalMillis + 300_000L, // lagCollectionRangeMillis + 300_000L, // scaleActionStartDelayMillis + 60_000L, // scaleActionPeriodMillis + 2_000_000L, + 300_000L, + 0.7, + 0.9, + 100, + null, // taskCountStart + 50, + 1, // scaleInStep + 4, + true, // enableTaskAutoScaler 6_000_000L, // minTriggerScaleActionFrequencyMillis - null, // lagAggregate - null // stopTaskCountRatio + null, // lagAggregate + null // stopTaskCountRatio ); } /** - * Reproduces the bug where scale-out from a low activelyReadingTaskGroups count - * yields a desiredTaskCount below taskCountMin. + * Verifies that scale-out uses the configured task count as the baseline. */ @Test public void testScaleOutDoesNotReturnCountBelowTaskCountMin() { - when(mockIoConfig.getTaskCount()).thenReturn(TASK_COUNT_MIN); - when(mockSupervisor.getActiveTaskGroupsCount()).thenReturn(ACTIVE_TASK_GROUP_AMOUNT); + when(mockIoConfig.getTaskCount()).thenReturn(50); when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); - LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); - List highLagSamples = Collections.nCopies(11, SCALE_OUT_THRESHOLD + 1); + Assert.assertEquals(54, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); + } + + @Test + public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + { + when(mockIoConfig.getTaskCount()).thenReturn(1); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); + } + + @Test + public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + { + when(mockIoConfig.getTaskCount()).thenReturn(101); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L))); + } - int result = autoScaler.computeDesiredTaskCount(new ArrayList<>(highLagSamples)); + private LagBasedAutoScaler createAutoScaler() + { + return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); + } - // Bug: old code used getActiveTaskGroupsCount()=25 as baseline → 25+4=29 < taskCountMin(50) - // Fix: uses ioConfig.getTaskCount()=50 as baseline → 50+4=54 >= taskCountMin(50) - Assert.assertEquals(TASK_COUNT_MIN + SCALE_OUT_STEP, result); + private List createLagSamples(long lag) + { + return new ArrayList<>(Collections.nCopies(11, lag)); } } From b65ca376672327423487f298844d8684db758d05 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Fri, 10 Apr 2026 08:42:10 +0300 Subject: [PATCH 4/4] Address review comments --- .../SeekableStreamSupervisorIOConfig.java | 5 ++++- .../autoscaler/CostBasedAutoScaler.java | 4 ++-- .../autoscaler/LagBasedAutoScaler.java | 2 ++ .../autoscaler/CostBasedAutoScalerMockTest.java | 16 ++++++++++------ 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 633bd9b70dc9..2e994e32cd6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -88,13 +88,16 @@ public SeekableStreamSupervisorIOConfig( this.lagAggregator = lagAggregator; // Could be null this.autoScalerConfig = autoScalerConfig; - this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler(); + boolean isAutoScalerAvailable = autoScalerConfig != null; + this.autoScalerEnabled = isAutoScalerAvailable && autoScalerConfig.getEnableTaskAutoScaler(); if (autoScalerEnabled) { // Priority: taskCountStart > taskCount > taskCountMin this.taskCount = Configs.valueOrDefault( autoScalerConfig.getTaskCountStart(), Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin()) ); + } else if (isAutoScalerAvailable) { + this.taskCount = taskCount != null ? taskCount : autoScalerConfig.getTaskCountMin(); } else { this.taskCount = taskCount != null ? taskCount : 1; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 6fbff6bba4f4..be697c460777 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -181,8 +181,7 @@ public int computeTaskCountForScaleAction() final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() || currentTaskCount > config.getTaskCountMax(); if (isTaskCountOutOfBounds) { - currentTaskCount = Math.min(config.getTaskCountMax(), - Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount())); + currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount)); } // Perform scale-up actions; scale-down actions only if configured. @@ -192,6 +191,7 @@ public int computeTaskCountForScaleAction() // regardless of optimal task count, to get back to a safe state. if (isScaleActionAllowed() && isTaskCountOutOfBounds) { taskCount = currentTaskCount; + lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { taskCount = optimalTaskCount; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 88767c32d163..b96a1de7bd41 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; @@ -212,6 +213,7 @@ private Runnable computeAndCollectLag() * @param lags the lag metrics of Stream (Kafka/Kinesis) * @return Integer, target number of tasksCount. -1 means skip scale action. */ + @VisibleForTesting int computeDesiredTaskCount(List lags) { // if the supervisor is not suspended, ensure required tasks are running diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java index 2a7384a4f392..39bacebd94d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java @@ -243,16 +243,18 @@ public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); final int configuredTaskCount = 1; - final int expectedTaskCount = 50; + final int taskCountMin = 50; - doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); + // Mock computeOptimalTaskCount to return a value different from the boundary, + // so the assertion proves the boundary clamping path was taken. + doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any()); setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2); final int result = autoScaler.computeTaskCountForScaleAction(); Assert.assertEquals( "Should scale to taskCountMin when the configured task count is below the minimum boundary", - expectedTaskCount, + taskCountMin, result ); } @@ -268,16 +270,18 @@ public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); final int configuredTaskCount = 100; - final int expectedTaskCount = 50; + final int taskCountMax = 50; - doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); + // Mock computeOptimalTaskCount to return a value different from the boundary, + // so the assertion proves the boundary clamping path was taken. + doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any()); setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8); final int result = autoScaler.computeTaskCountForScaleAction(); Assert.assertEquals( "Should scale to taskCountMax when the configured task count is above the maximum boundary", - expectedTaskCount, + taskCountMax, result ); }