diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java index 39742a9d5787..bc7385e6bf6e 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java @@ -153,7 +153,7 @@ public static Stream getCompactionSupervisorTestParams() } @Test - @Timeout(20) + @Timeout(120) public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues() { final int maxRowsPerSegment = 1000; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 633bd9b70dc9..2e994e32cd6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -88,13 +88,16 @@ public SeekableStreamSupervisorIOConfig( this.lagAggregator = lagAggregator; // Could be null this.autoScalerConfig = autoScalerConfig; - this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler(); + boolean isAutoScalerAvailable = autoScalerConfig != null; + this.autoScalerEnabled = isAutoScalerAvailable && autoScalerConfig.getEnableTaskAutoScaler(); if (autoScalerEnabled) { // Priority: taskCountStart > taskCount > taskCountMin this.taskCount = Configs.valueOrDefault( autoScalerConfig.getTaskCountStart(), Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin()) ); + } else if (isAutoScalerAvailable) { + this.taskCount = taskCount != null ? taskCount : autoScalerConfig.getTaskCountMin(); } else { this.taskCount = taskCount != null ? taskCount : 1; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index f6527beb8ec4..be697c460777 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction() lastKnownMetrics = collectMetrics(); final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics); - final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + + // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. + // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. + final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() + || currentTaskCount > config.getTaskCountMax(); + if (isTaskCountOutOfBounds) { + currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount)); + } // Perform scale-up actions; scale-down actions only if configured. final int taskCount; - if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { + + // If task count is out of bounds, scale to the configured boundary + // regardless of optimal task count, to get back to a safe state. + if (isScaleActionAllowed() && isTaskCountOutOfBounds) { + taskCount = currentTaskCount; + lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); + log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); + } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { taskCount = optimalTaskCount; lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index bf35576fbd85..b96a1de7bd41 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; @@ -212,7 +213,8 @@ private Runnable computeAndCollectLag() * @param lags the lag metrics of Stream (Kafka/Kinesis) * @return Integer, target number of tasksCount. -1 means skip scale action. */ - private int computeDesiredTaskCount(List lags) + @VisibleForTesting + int computeDesiredTaskCount(List lags) { // if the supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop @@ -239,19 +241,30 @@ private int computeDesiredTaskCount(List lags) withinProportion, spec.getId() ); - int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); + int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); int desiredActiveTaskCount; - int partitionCount = supervisor.getPartitionCount(); + final int partitionCount = supervisor.getPartitionCount(); if (partitionCount <= 0) { log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId()); return -1; } + final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); + final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + + // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. + // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. + // If that is happening, take the bound and return early. + final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin + || currentActiveTaskCount > actualTaskCountMax; + if (isTaskCountOutOfBounds) { + currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount)); + return currentActiveTaskCount; + } + if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { // Do Scale out - int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); - - int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); + final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); if (currentActiveTaskCount == actualTaskCountMax) { log.debug( "CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].", @@ -272,8 +285,7 @@ private int computeDesiredTaskCount(List lags) if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { // Do Scale in - int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); - int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); + final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); if (currentActiveTaskCount == actualTaskCountMin) { log.debug( "CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index f0bdac00a8ce..7c20855b033d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(2, taskCountAfterScaleOut); + Assert.assertEquals(3, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) @@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In EasyMock.replay(taskMaster); StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10) - { - @Override - public int getActiveTaskGroupsCount() - { - return 2; - } - }; + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( supervisor, @@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount() spec, dynamicActionEmitter ); + supervisor.getIoConfig().setTaskCount(2); supervisor.start(); autoScaler.start(); supervisor.runInternal(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java index 7b46c4c65589..39bacebd94d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java @@ -232,6 +232,60 @@ public void testScaleUpFromMinimumTasks() ); } + @Test + public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + { + CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(50) + .enableTaskAutoScaler(true) + .build(); + CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); + + final int configuredTaskCount = 1; + final int taskCountMin = 50; + + // Mock computeOptimalTaskCount to return a value different from the boundary, + // so the assertion proves the boundary clamping path was taken. + doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any()); + setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2); + + final int result = autoScaler.computeTaskCountForScaleAction(); + + Assert.assertEquals( + "Should scale to taskCountMin when the configured task count is below the minimum boundary", + taskCountMin, + result + ); + } + + @Test + public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + { + CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(50) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .build(); + CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); + + final int configuredTaskCount = 100; + final int taskCountMax = 50; + + // Mock computeOptimalTaskCount to return a value different from the boundary, + // so the assertion proves the boundary clamping path was taken. + doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any()); + setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8); + + final int result = autoScaler.computeTaskCountForScaleAction(); + + Assert.assertEquals( + "Should scale to taskCountMax when the configured task count is above the maximum boundary", + taskCountMax, + result + ); + } + @Test public void testScaleUpToMaximumTasks() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index 12f52279226e..4b5666a85ee0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -495,6 +495,7 @@ public void testScalingActionSkippedWhenMovingAverageRateUnavailable() when(supervisor.getIoConfig()).thenReturn(ioConfig); when(ioConfig.getStream()).thenReturn("test-stream"); when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1)); + when(ioConfig.getTaskCount()).thenReturn(1); when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100)); // No task stats means the moving average rate is unavailable when(supervisor.getStats()).thenReturn(Collections.emptyMap()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java new file mode 100644 index 000000000000..7f5a0f5a9dfe --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.when; + + +public class LagBasedAutoScalerTest +{ + private static final int PARTITION_COUNT = 100; + + private SupervisorSpec mockSpec; + private SeekableStreamSupervisor mockSupervisor; + private SeekableStreamSupervisorIOConfig mockIoConfig; + private ServiceEmitter mockEmitter; + private LagBasedAutoScalerConfig config; + + @Before + public void setUp() + { + mockSpec = Mockito.mock(SupervisorSpec.class); + mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class); + mockEmitter = Mockito.mock(ServiceEmitter.class); + mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(mockSpec.getId()).thenReturn("test-supervisor"); + when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource")); + when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); + when(mockIoConfig.getStream()).thenReturn("test-stream"); + + config = new LagBasedAutoScalerConfig( + 30_000L, // lagCollectionIntervalMillis + 300_000L, // lagCollectionRangeMillis + 300_000L, // scaleActionStartDelayMillis + 60_000L, // scaleActionPeriodMillis + 2_000_000L, + 300_000L, + 0.7, + 0.9, + 100, + null, // taskCountStart + 50, + 1, // scaleInStep + 4, + true, // enableTaskAutoScaler + 6_000_000L, // minTriggerScaleActionFrequencyMillis + null, // lagAggregate + null // stopTaskCountRatio + ); + } + + /** + * Verifies that scale-out uses the configured task count as the baseline. + */ + @Test + public void testScaleOutDoesNotReturnCountBelowTaskCountMin() + { + when(mockIoConfig.getTaskCount()).thenReturn(50); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + Assert.assertEquals(54, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); + } + + @Test + public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + { + when(mockIoConfig.getTaskCount()).thenReturn(1); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); + } + + @Test + public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + { + when(mockIoConfig.getTaskCount()).thenReturn(101); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L))); + } + + private LagBasedAutoScaler createAutoScaler() + { + return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); + } + + private List createLagSamples(long lag) + { + return new ArrayList<>(Collections.nCopies(11, lag)); + } +}