Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,22 @@ public TaskLockType taskLockType()
{
throw DruidException.defensive("TaskLockType is not used with class[%s]", getClass().getName());
}

@Override
public int maxNonLeafWorkerCount()
{
return context.getInt(
DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
);
}

@Override
public int targetPartitionsPerWorker()
{
return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
context,
DEFAULT_TARGET_PARTITIONS_PER_WORKER
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ public boolean isWorkerActive(String workerId)
return workerIdToNumber.containsKey(workerId);
}

@Override
public int getMaxWorkerCount()
{
return workerIds.size();
}

@Override
public Map<Integer, List<WorkerStats>> getWorkerStats()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
*/
class PrePlannedDartQueryMaker implements QueryMaker, QueryMaker.FromDruidLogical
{
private PlannerContext plannerContext;
private DartQueryMaker dartQueryMaker;
private final PlannerContext plannerContext;
private final DartQueryMaker dartQueryMaker;

public PrePlannedDartQueryMaker(PlannerContext plannerContext, DartQueryMaker queryMaker)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.server.DruidNode;

Expand Down Expand Up @@ -123,4 +124,15 @@ default File taskTempDir()
* Client for communicating with workers.
*/
WorkerClient newWorkerClient();

/**
* Maximum number of workers for non-leaf stages.
*/
int maxNonLeafWorkerCount();

/**
* Target number of partitions per worker for shuffle stages. Used at runtime to adjust
* shuffle specs that have {@link ShuffleSpec#isAdjustable()} set to true.
*/
int targetPartitionsPerWorker();
}
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ private QueryDefinition initializeQueryDefAndState()

final QueryContext queryContext = querySpec.getContext();

final QueryDefinition queryDef;
QueryDefinition queryDef;
if (legacyQuery != null) {
QueryKitBasedMSQPlanner qkPlanner = new QueryKitBasedMSQPlanner(
querySpec,
Expand Down Expand Up @@ -762,16 +762,22 @@ private QueryDefinition initializeQueryDefAndState()
}
}

QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef);

workerManager = context.newWorkerManager(
context.queryId(),
querySpec,
queryKernelConfig,
getWorkerFailureListener()
);

queryDef = queryDef.withRuntimeBounds(
workerManager.getMaxWorkerCount(),
context.maxNonLeafWorkerCount(),
context.targetPartitionsPerWorker()
);

QueryValidator.validateQueryDef(queryDef);
queryDefRef.set(queryDef);

if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof RetryCapableWorkerManager)) {
// Not expected to happen, since all WorkerManager impls are currently retry-capable. Defensive check
// for future-proofing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public interface WorkerManager
*/
Map<Integer, List<WorkerStats>> getWorkerStats();

/**
* Maximum number of workers that can be used by this manager. Used at runtime to cap
* {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}.
*/
int getMaxWorkerCount();

/**
* Stop all workers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public WorkerManager newWorkerManager(
makeTaskContext(querySpec, queryKernelConfig, taskContext),
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 + ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
task.getQuerySpec().getTuningConfig().getMaxNumWorkers(),
new MSQWorkerTaskLauncherConfig()
);
}
Expand All @@ -241,6 +242,23 @@ public File taskTempDir()
return toolbox.getIndexingTmpDir();
}

@Override
public int maxNonLeafWorkerCount()
{
return task.getQuerySpec().getTuningConfig().getMaxNumWorkers();
}

@Override
public int targetPartitionsPerWorker()
{
// Assume tasks are symmetric: workers have the same number of processors available as a controller.
// Create one partition per processor per worker, for maximum parallelism.
return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault(
taskQuerySpecContext,
memoryIntrospector.numProcessingThreads()
);
}

/**
* Helper method for {@link #queryKernelConfig(MSQSpec)}. Also used in tests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private enum State
private final String controllerTaskId;
private final String dataSource;
private final OverlordClient overlordClient;
private final int maxWorkerCount;
private final ExecutorService exec;
private final long maxTaskStartDelayMillis;
private final MSQWorkerTaskLauncherConfig config;
Expand Down Expand Up @@ -165,6 +166,7 @@ public MSQWorkerTaskLauncher(
final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
final long maxTaskStartDelayMillis,
final int maxWorkerCount,
final MSQWorkerTaskLauncherConfig config
)
{
Expand All @@ -175,6 +177,7 @@ public MSQWorkerTaskLauncher(
workerFailureListener,
taskContextOverrides,
maxTaskStartDelayMillis,
maxWorkerCount,
config,
TimeUnit.SECONDS.toMillis(60)
);
Expand All @@ -188,13 +191,15 @@ protected MSQWorkerTaskLauncher(
final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
final long maxTaskStartDelayMillis,
final int maxWorkerCount,
final MSQWorkerTaskLauncherConfig config,
final long taskIdsLockTimeout
)
{
this.controllerTaskId = controllerTaskId;
this.dataSource = dataSource;
this.overlordClient = overlordClient;
this.maxWorkerCount = maxWorkerCount;
this.workerFailureListener = workerFailureListener;
this.taskContextOverrides = taskContextOverrides;
this.exec = Execs.singleThreaded(
Expand Down Expand Up @@ -553,6 +558,12 @@ private void runNewTasks()
}
}

@Override
public int getMaxWorkerCount()
{
return maxWorkerCount;
}

/**
* Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are
* not yet fully started as left and right respectively.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public String toString()
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String toString()
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public String toString()
@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
return ShuffleSpecFactories.globalSortWithTargetSize(targetSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,31 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
private final int maxPartitions;
private final boolean aggregate;
private final long limitHint;
private final boolean adjustable;

@JsonCreator
public GlobalSortMaxCountShuffleSpec(
@JsonProperty("clusterBy") final ClusterBy clusterBy,
@JsonProperty("partitions") final int maxPartitions,
@JsonProperty("aggregate") final boolean aggregate,
@JsonProperty("limitHint") final Long limitHint
@JsonProperty("limitHint") final Long limitHint,
@JsonProperty("adjustable") final boolean adjustable
)
{
this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy");
this.maxPartitions = maxPartitions;
this.aggregate = aggregate;
this.limitHint = limitHint == null ? UNLIMITED : limitHint;
this.adjustable = adjustable;

if (maxPartitions < 1) {
throw new IAE("Partition count must be at least 1");
}

if (adjustable && maxPartitions != 1) {
throw new IAE("Partition count must be 1 when adjustable is true, but was [%d]", maxPartitions);
}

if (!clusterBy.sortable()) {
throw new IAE("ClusterBy key must be sortable");
}
Expand All @@ -72,6 +79,16 @@ public GlobalSortMaxCountShuffleSpec(
}
}

public GlobalSortMaxCountShuffleSpec(
final ClusterBy clusterBy,
final int maxPartitions,
final boolean aggregate,
final Long limitHint
)
{
this(clusterBy, maxPartitions, aggregate, limitHint, false);
}

@Override
public ShuffleKind kind()
{
Expand Down Expand Up @@ -144,6 +161,26 @@ public long limitHint()
return limitHint;
}

@Override
@JsonProperty("adjustable")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isAdjustable()
{
return adjustable;
}

@Override
public ShuffleSpec withPartitionCount(final int partitionCount)
{
return new GlobalSortMaxCountShuffleSpec(
clusterBy,
partitionCount,
aggregate,
limitHint == UNLIMITED ? null : limitHint,
false
);
}

@Override
public boolean equals(Object o)
{
Expand All @@ -156,14 +193,15 @@ public boolean equals(Object o)
GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o;
return maxPartitions == that.maxPartitions
&& aggregate == that.aggregate
&& adjustable == that.adjustable
&& Objects.equals(clusterBy, that.clusterBy)
&& Objects.equals(limitHint, that.limitHint);
}

@Override
public int hashCode()
{
return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint);
return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint, adjustable);
}

@Override
Expand All @@ -174,6 +212,7 @@ public String toString()
", maxPartitions=" + maxPartitions +
", aggregate=" + aggregate +
", limitHint=" + limitHint +
", adjustable=" + adjustable +
'}';
}
}
Loading
Loading