Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static Stream<Arguments> getCompactionSupervisorTestParams()
}

@Test
@Timeout(20)
@Timeout(120)
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
{
final int maxRowsPerSegment = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
lastKnownMetrics = collectMetrics();

final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
int currentTaskCount = supervisor.getIoConfig().getTaskCount();

// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
|| currentTaskCount > config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use currentTaskCount

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It is cleaner.

}

// Perform scale-up actions; scale-down actions only if configured.
final int taskCount;
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {

// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to respect this isScaleActionAllowed if we're violating min/max task count bounds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tricky thing, but my take here is -- eventually we will scale (and by eventually I mean - within minTriggerScaleActionFrequencyMillis ms), and it might be harmful to scale immediately.

I don't have a strong opinion here, I am open to remove isScaleActionAllowed() from the condition.

taskCount = currentTaskCount;
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to set: lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

} else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private Runnable computeAndCollectLag()
* @param lags the lag metrics of Stream (Kafka/Kinesis)
* @return Integer, target number of tasksCount. -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
int computeDesiredTaskCount(List<Long> lags)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark with the proper @VisibleForTests annotation

{
// if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
Expand All @@ -239,19 +239,30 @@ private int computeDesiredTaskCount(List<Long> lags)
withinProportion, spec.getId()
);

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We donot need to change this no?
We can still reference the activeTaskGroupCount and start clamping things below ?

int desiredActiveTaskCount;
int partitionCount = supervisor.getPartitionCount();
final int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId());
return -1;
}

final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);

// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
// If that is happening, take the bound and return early.
final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin
|| currentActiveTaskCount > actualTaskCountMax;
if (isTaskCountOutOfBounds) {
currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount));
return currentActiveTaskCount;
}

if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();

int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
if (currentActiveTaskCount == actualTaskCountMax) {
log.debug(
"CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].",
Expand All @@ -272,8 +283,7 @@ private int computeDesiredTaskCount(List<Long> lags)

if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == actualTaskCountMin) {
log.debug(
"CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
Assert.assertEquals(3, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
Expand Down Expand Up @@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In
EasyMock.replay(taskMaster);

StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
{
@Override
public int getActiveTaskGroupsCount()
{
return 2;
}
};
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);

LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
Expand All @@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount()
spec,
dynamicActionEmitter
);
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,56 @@ public void testScaleUpFromMinimumTasks()
);
}

@Test
public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
{
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(50)
.enableTaskAutoScaler(true)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));

final int configuredTaskCount = 1;
final int expectedTaskCount = 50;

doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2);

final int result = autoScaler.computeTaskCountForScaleAction();

Assert.assertEquals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Either mock computeOptimalTaskCount to return a value different from the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert the boundary is returned, or use verify(autoScaler, never()).computeOptimalTaskCount(any()) to confirm the early-return path was taken.

"Should scale to taskCountMin when the configured task count is below the minimum boundary",
expectedTaskCount,
result
);
}

@Test
public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
{
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
.taskCountMax(50)
.taskCountMin(1)
.enableTaskAutoScaler(true)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));

final int configuredTaskCount = 100;
final int expectedTaskCount = 50;

doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);

final int result = autoScaler.computeTaskCountForScaleAction();

Assert.assertEquals(
"Should scale to taskCountMax when the configured task count is above the maximum boundary",
expectedTaskCount,
result
);
}

@Test
public void testScaleUpToMaximumTasks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public void testScalingActionSkippedWhenMovingAverageRateUnavailable()
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("test-stream");
when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
when(ioConfig.getTaskCount()).thenReturn(1);
when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
// No task stats means the moving average rate is unavailable
when(supervisor.getStats()).thenReturn(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;

import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.when;


public class LagBasedAutoScalerTest
{
private static final int PARTITION_COUNT = 100;

private SupervisorSpec mockSpec;
private SeekableStreamSupervisor mockSupervisor;
private SeekableStreamSupervisorIOConfig mockIoConfig;
private ServiceEmitter mockEmitter;
private LagBasedAutoScalerConfig config;

@Before
public void setUp()
{
mockSpec = Mockito.mock(SupervisorSpec.class);
mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
mockEmitter = Mockito.mock(ServiceEmitter.class);
mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);

when(mockSpec.getId()).thenReturn("test-supervisor");
when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn("test-stream");

config = new LagBasedAutoScalerConfig(
30_000L, // lagCollectionIntervalMillis
300_000L, // lagCollectionRangeMillis
300_000L, // scaleActionStartDelayMillis
60_000L, // scaleActionPeriodMillis
2_000_000L,
300_000L,
0.7,
0.9,
100,
null, // taskCountStart
50,
1, // scaleInStep
4,
true, // enableTaskAutoScaler
6_000_000L, // minTriggerScaleActionFrequencyMillis
null, // lagAggregate
null // stopTaskCountRatio
);
}

/**
* Verifies that scale-out uses the configured task count as the baseline.
*/
@Test
public void testScaleOutDoesNotReturnCountBelowTaskCountMin()
{
when(mockIoConfig.getTaskCount()).thenReturn(50);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(54, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
}

@Test
public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
{
when(mockIoConfig.getTaskCount()).thenReturn(1);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
}

@Test
public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
{
when(mockIoConfig.getTaskCount()).thenReturn(101);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
}

private LagBasedAutoScaler createAutoScaler()
{
return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter);
}

private List<Long> createLagSamples(long lag)
{
return new ArrayList<>(Collections.nCopies(11, lag));
}
}
Loading