diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index b7e1b3c94bc0..a4a4ff8945a4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -236,4 +236,22 @@ public TaskLockType taskLockType() { throw DruidException.defensive("TaskLockType is not used with class[%s]", getClass().getName()); } + + @Override + public int maxNonLeafWorkerCount() + { + return context.getInt( + DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT, + DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT + ); + } + + @Override + public int targetPartitionsPerWorker() + { + return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + context, + DEFAULT_TARGET_PARTITIONS_PER_WORKER + ); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java index 05a26c393441..f6d68d57f424 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java @@ -155,6 +155,12 @@ public boolean isWorkerActive(String workerId) return workerIdToNumber.containsKey(workerId); } + @Override + public int getMaxWorkerCount() + { + return workerIds.size(); + } + @Override public Map> getWorkerStats() { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java index 5740142dd635..7ec616d212ae 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java @@ -51,8 +51,8 @@ */ class PrePlannedDartQueryMaker implements QueryMaker, QueryMaker.FromDruidLogical { - private PlannerContext plannerContext; - private DartQueryMaker dartQueryMaker; + private final PlannerContext plannerContext; + private final DartQueryMaker dartQueryMaker; public PrePlannedDartQueryMaker(PlannerContext plannerContext, DartQueryMaker queryMaker) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 008aae558180..2d14c10788b2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -29,6 +29,7 @@ import org.apache.druid.msq.input.InputSpecSlicer; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.msq.kernel.ShuffleSpec; import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig; import org.apache.druid.server.DruidNode; @@ -123,4 +124,15 @@ default File taskTempDir() * Client for communicating with workers. */ WorkerClient newWorkerClient(); + + /** + * Maximum number of workers for non-leaf stages. + */ + int maxNonLeafWorkerCount(); + + /** + * Target number of partitions per worker for shuffle stages. Used at runtime to adjust + * shuffle specs that have {@link ShuffleSpec#isAdjustable()} set to true. + */ + int targetPartitionsPerWorker(); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index e02d29095476..f616db7a032a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -728,7 +728,7 @@ private QueryDefinition initializeQueryDefAndState() final QueryContext queryContext = querySpec.getContext(); - final QueryDefinition queryDef; + QueryDefinition queryDef; if (legacyQuery != null) { QueryKitBasedMSQPlanner qkPlanner = new QueryKitBasedMSQPlanner( querySpec, @@ -762,9 +762,6 @@ private QueryDefinition initializeQueryDefAndState() } } - QueryValidator.validateQueryDef(queryDef); - queryDefRef.set(queryDef); - workerManager = context.newWorkerManager( context.queryId(), querySpec, @@ -772,6 +769,15 @@ private QueryDefinition initializeQueryDefAndState() getWorkerFailureListener() ); + queryDef = queryDef.withRuntimeBounds( + workerManager.getMaxWorkerCount(), + context.maxNonLeafWorkerCount(), + context.targetPartitionsPerWorker() + ); + + QueryValidator.validateQueryDef(queryDef); + queryDefRef.set(queryDef); + if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof RetryCapableWorkerManager)) { // Not expected to happen, since all WorkerManager impls are currently retry-capable. Defensive check // for future-proofing. diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java index ff76867dd302..01bcb81a756b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java @@ -89,6 +89,12 @@ public interface WorkerManager */ Map> getWorkerStats(); + /** + * Maximum number of workers that can be used by this manager. Used at runtime to cap + * {@link org.apache.druid.msq.kernel.StageDefinition#getMaxWorkerCount()}. + */ + int getMaxWorkerCount(); + /** * Stop all workers. * diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 3edd414b5155..cd91376e5c9f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -231,6 +231,7 @@ public WorkerManager newWorkerManager( makeTaskContext(querySpec, queryKernelConfig, taskContext), // 10 minutes +- 2 minutes jitter TimeUnit.SECONDS.toMillis(600 + ThreadLocalRandom.current().nextInt(-4, 5) * 30L), + task.getQuerySpec().getTuningConfig().getMaxNumWorkers(), new MSQWorkerTaskLauncherConfig() ); } @@ -241,6 +242,23 @@ public File taskTempDir() return toolbox.getIndexingTmpDir(); } + @Override + public int maxNonLeafWorkerCount() + { + return task.getQuerySpec().getTuningConfig().getMaxNumWorkers(); + } + + @Override + public int targetPartitionsPerWorker() + { + // Assume tasks are symmetric: workers have the same number of processors available as a controller. + // Create one partition per processor per worker, for maximum parallelism. + return MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( + taskQuerySpecContext, + memoryIntrospector.numProcessingThreads() + ); + } + /** * Helper method for {@link #queryKernelConfig(MSQSpec)}. Also used in tests. */ diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java index d773f225fac2..7ef30346d7ab 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java @@ -100,6 +100,7 @@ private enum State private final String controllerTaskId; private final String dataSource; private final OverlordClient overlordClient; + private final int maxWorkerCount; private final ExecutorService exec; private final long maxTaskStartDelayMillis; private final MSQWorkerTaskLauncherConfig config; @@ -165,6 +166,7 @@ public MSQWorkerTaskLauncher( final WorkerFailureListener workerFailureListener, final Map taskContextOverrides, final long maxTaskStartDelayMillis, + final int maxWorkerCount, final MSQWorkerTaskLauncherConfig config ) { @@ -175,6 +177,7 @@ public MSQWorkerTaskLauncher( workerFailureListener, taskContextOverrides, maxTaskStartDelayMillis, + maxWorkerCount, config, TimeUnit.SECONDS.toMillis(60) ); @@ -188,6 +191,7 @@ protected MSQWorkerTaskLauncher( final WorkerFailureListener workerFailureListener, final Map taskContextOverrides, final long maxTaskStartDelayMillis, + final int maxWorkerCount, final MSQWorkerTaskLauncherConfig config, final long taskIdsLockTimeout ) @@ -195,6 +199,7 @@ protected MSQWorkerTaskLauncher( this.controllerTaskId = controllerTaskId; this.dataSource = dataSource; this.overlordClient = overlordClient; + this.maxWorkerCount = maxWorkerCount; this.workerFailureListener = workerFailureListener; this.taskContextOverrides = taskContextOverrides; this.exec = Execs.singleThreaded( @@ -553,6 +558,12 @@ private void runNewTasks() } } + @Override + public int getMaxWorkerCount() + { + return maxWorkerCount; + } + /** * Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are * not yet fully started as left and right respectively. diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java index 02132ac2312d..63f163f82ca0 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java @@ -209,7 +209,7 @@ public String toString() @Override public ShuffleSpecFactory getShuffleSpecFactory(int targetSize) { - return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize); + return ShuffleSpecFactories.globalSortWithTargetSize(targetSize); } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java index 88fe5f58e5a1..eb124eda213d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DurableStorageMSQDestination.java @@ -56,7 +56,7 @@ public String toString() @Override public ShuffleSpecFactory getShuffleSpecFactory(int targetSize) { - return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize); + return ShuffleSpecFactories.globalSortWithTargetSize(targetSize); } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java index d6a78def63a8..d03c2238fdb1 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java @@ -108,7 +108,7 @@ public String toString() @Override public ShuffleSpecFactory getShuffleSpecFactory(int targetSize) { - return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize); + return ShuffleSpecFactories.globalSortWithTargetSize(targetSize); } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java index 35576474ff9d..bcb2718c4022 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/GlobalSortMaxCountShuffleSpec.java @@ -44,24 +44,31 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec private final int maxPartitions; private final boolean aggregate; private final long limitHint; + private final boolean adjustable; @JsonCreator public GlobalSortMaxCountShuffleSpec( @JsonProperty("clusterBy") final ClusterBy clusterBy, @JsonProperty("partitions") final int maxPartitions, @JsonProperty("aggregate") final boolean aggregate, - @JsonProperty("limitHint") final Long limitHint + @JsonProperty("limitHint") final Long limitHint, + @JsonProperty("adjustable") final boolean adjustable ) { this.clusterBy = Preconditions.checkNotNull(clusterBy, "clusterBy"); this.maxPartitions = maxPartitions; this.aggregate = aggregate; this.limitHint = limitHint == null ? UNLIMITED : limitHint; + this.adjustable = adjustable; if (maxPartitions < 1) { throw new IAE("Partition count must be at least 1"); } + if (adjustable && maxPartitions != 1) { + throw new IAE("Partition count must be 1 when adjustable is true, but was [%d]", maxPartitions); + } + if (!clusterBy.sortable()) { throw new IAE("ClusterBy key must be sortable"); } @@ -72,6 +79,16 @@ public GlobalSortMaxCountShuffleSpec( } } + public GlobalSortMaxCountShuffleSpec( + final ClusterBy clusterBy, + final int maxPartitions, + final boolean aggregate, + final Long limitHint + ) + { + this(clusterBy, maxPartitions, aggregate, limitHint, false); + } + @Override public ShuffleKind kind() { @@ -144,6 +161,26 @@ public long limitHint() return limitHint; } + @Override + @JsonProperty("adjustable") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean isAdjustable() + { + return adjustable; + } + + @Override + public ShuffleSpec withPartitionCount(final int partitionCount) + { + return new GlobalSortMaxCountShuffleSpec( + clusterBy, + partitionCount, + aggregate, + limitHint == UNLIMITED ? null : limitHint, + false + ); + } + @Override public boolean equals(Object o) { @@ -156,6 +193,7 @@ public boolean equals(Object o) GlobalSortMaxCountShuffleSpec that = (GlobalSortMaxCountShuffleSpec) o; return maxPartitions == that.maxPartitions && aggregate == that.aggregate + && adjustable == that.adjustable && Objects.equals(clusterBy, that.clusterBy) && Objects.equals(limitHint, that.limitHint); } @@ -163,7 +201,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint); + return Objects.hash(clusterBy, maxPartitions, aggregate, limitHint, adjustable); } @Override @@ -174,6 +212,7 @@ public String toString() ", maxPartitions=" + maxPartitions + ", aggregate=" + aggregate + ", limitHint=" + limitHint + + ", adjustable=" + adjustable + '}'; } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java index 69e66fffe263..23e9e98a217e 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/HashShuffleSpec.java @@ -20,25 +20,35 @@ package org.apache.druid.msq.kernel; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.java.util.common.IAE; +import java.util.Objects; + public class HashShuffleSpec implements ShuffleSpec { public static final String TYPE = "hash"; private final ClusterBy clusterBy; private final int numPartitions; + private final boolean adjustable; @JsonCreator public HashShuffleSpec( @JsonProperty("clusterBy") final ClusterBy clusterBy, - @JsonProperty("partitions") final int numPartitions + @JsonProperty("partitions") final int numPartitions, + @JsonProperty("adjustable") final boolean adjustable ) { this.clusterBy = clusterBy; this.numPartitions = numPartitions; + this.adjustable = adjustable; + + if (adjustable && numPartitions != 1) { + throw new IAE("Partition count must be 1 when adjustable is true, but was [%d]", numPartitions); + } if (clusterBy.getBucketByCount() > 0) { // Only GlobalSortTargetSizeShuffleSpec supports bucket-by. @@ -65,4 +75,50 @@ public int partitionCount() { return numPartitions; } + + @Override + @JsonProperty("adjustable") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean isAdjustable() + { + return adjustable; + } + + @Override + public ShuffleSpec withPartitionCount(final int partitionCount) + { + return new HashShuffleSpec( + clusterBy, + partitionCount, + false + ); + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) { + return false; + } + HashShuffleSpec that = (HashShuffleSpec) o; + return numPartitions == that.numPartitions + && adjustable == that.adjustable + && Objects.equals(clusterBy, that.clusterBy); + } + + @Override + public int hashCode() + { + return Objects.hash(clusterBy, numPartitions, adjustable); + } + + @Override + public String toString() + { + return "HashShuffleSpec{" + + "clusterBy=" + clusterBy + + ", numPartitions=" + numPartitions + + ", adjustable=" + adjustable + + '}'; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java index e171118be2c5..f315d029f7d4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/QueryDefinition.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -207,4 +208,37 @@ public QueryDefinition withOverriddenContext(Map contextOverride { return new QueryDefinition(stageDefinitions, finalStage, context.override(contextOverride)); } + + /** + * Returns a new {@link QueryDefinition} with runtime bounds applied: + *
    + *
  • All stages {@link StageDefinition#getMaxWorkerCount()} are capped to {@code maxWorkerCount}.
  • + *
  • All nonleaf stages {@link StageDefinition#getMaxWorkerCount()} are further capped to + * {@code maxNonLeafWorkerCount}.
  • + *
  • All stage shuffle specs, if {@link ShuffleSpec#isAdjustable()}, have their partition count set to + * the capped max worker count times {@code targetPartitionsPerWorker}.
  • + *
+ */ + public QueryDefinition withRuntimeBounds( + final int maxWorkerCount, + final int maxNonLeafWorkerCount, + final int targetPartitionsPerWorker + ) + { + boolean anyChanged = false; + final Map newStageDefinitions = new LinkedHashMap<>(); + for (Map.Entry entry : stageDefinitions.entrySet()) { + final StageDefinition stageDef = entry.getValue(); + final StageDefinition adjustedStageDef = + stageDef.withRuntimeBounds(maxWorkerCount, maxNonLeafWorkerCount, targetPartitionsPerWorker); + newStageDefinitions.put(entry.getKey(), adjustedStageDef); + if (!Objects.equals(adjustedStageDef, stageDef)) { + anyChanged = true; + } + } + if (!anyChanged) { + return this; + } + return new QueryDefinition(newStageDefinitions, finalStage, context); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java index 97f3e6db5473..2d02acb5ecc4 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/ShuffleSpec.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.error.NotYetImplemented; import org.apache.druid.frame.key.ClusterBy; /** @@ -83,4 +84,22 @@ default long limitHint() { return UNLIMITED; } + + /** + * Whether this shuffle spec's partition count should be adjusted at runtime by multiplying by + * {@code targetPartitionsPerWorker} from the controller context. + */ + default boolean isAdjustable() + { + return false; + } + + /** + * Returns a copy of this shuffle spec with the partition count set to {@code partitionCount} + * and {@code adjustable} set to false. Only meaningful when {@link #isAdjustable()} is true. + */ + default ShuffleSpec withPartitionCount(int partitionCount) + { + throw NotYetImplemented.ex(null, "withPartitionCount not implemented for [%s]", getClass().getSimpleName()); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java index f836a357d3cb..7fc3db19f1ab 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java @@ -175,6 +175,49 @@ public static StageDefinitionBuilder builder(final StageDefinition stageDef) .shuffleCheckHasMultipleValues(stageDef.getShuffleCheckHasMultipleValues()); } + /** + * Returns a new {@link StageDefinition} with runtime bounds applied. See {@link QueryDefinition#withRuntimeBounds} + * for details on the logic. + */ + public StageDefinition withRuntimeBounds( + final int maxWorkerCount, + final int maxNonLeafWorkerCount, + final int targetPartitionsPerWorker + ) + { + final int adjustedMaxWorkerCount; + final ShuffleSpec adjustedShuffleSpec; + + if (InputSpecs.hasLeafInputs(inputSpecs, getBroadcastInputNumbers())) { + // Leaf stage. + adjustedMaxWorkerCount = Math.min(this.maxWorkerCount, maxWorkerCount); + } else { + // Nonleaf stage. + adjustedMaxWorkerCount = Math.min(this.maxWorkerCount, Math.min(maxWorkerCount, maxNonLeafWorkerCount)); + } + + if (shuffleSpec != null && shuffleSpec.isAdjustable()) { + adjustedShuffleSpec = shuffleSpec.withPartitionCount(adjustedMaxWorkerCount * targetPartitionsPerWorker); + } else { + adjustedShuffleSpec = shuffleSpec; + } + + if (adjustedMaxWorkerCount == this.maxWorkerCount && Objects.equals(adjustedShuffleSpec, shuffleSpec)) { + return this; + } + + return new StageDefinition( + id, + inputSpecs, + broadcastInputNumbers, + processor, + signature, + adjustedShuffleSpec, + adjustedMaxWorkerCount, + shuffleCheckHasMultipleValues + ); + } + /** * Returns a unique stage identifier. */ diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java index ef825ed69d3f..92531eb1c002 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java @@ -77,48 +77,60 @@ public static WorkerInputs create( return new WorkerInputs(assignmentsMap); } - // Assign input slices to workers. + // Assign non-broadcast input slices to workers first, so we know the actual worker count. + // Then assign broadcast inputs to all assigned workers. for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) { + if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) { + continue; + } + final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber); final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber); - if (stageDef.getBroadcastInputNumbers().contains(inputNumber)) { - // Broadcast case: send everything everywhere. - final List broadcastSlices = slicer.sliceStatic(inputSpec, pruner, 1); - final InputSlice broadcastSlice = broadcastSlices.isEmpty() - ? NilInputSlice.INSTANCE - : Iterables.getOnlyElement(broadcastSlices); - - for (int workerNumber = 0; workerNumber < stageDef.getMaxWorkerCount(); workerNumber++) { - assignmentsMap.computeIfAbsent( - workerNumber, - ignored -> Arrays.asList(new InputSlice[numInputs]) - ).set(inputNumber, broadcastSlice); - } - } else { - // Non-broadcast case: split slices across workers. - List slices = assignmentStrategy.assign( - stageDef, - inputSpec, - stageWorkerCountMap, - slicer, - pruner, - maxInputFilesPerWorker, - maxInputBytesPerWorker - ); - - if (slices.isEmpty()) { - // Need at least one slice, so we can have at least one worker. It's OK if it has nothing to read. - slices = Collections.singletonList(NilInputSlice.INSTANCE); - } + List slices = assignmentStrategy.assign( + stageDef, + inputSpec, + stageWorkerCountMap, + slicer, + pruner, + maxInputFilesPerWorker, + maxInputBytesPerWorker + ); + + if (slices.isEmpty()) { + // Need at least one slice, so we can have at least one worker. It's OK if it has nothing to read. + slices = Collections.singletonList(NilInputSlice.INSTANCE); + } - // Flip the slices, so it's worker number -> slices for that worker. - for (int workerNumber = 0; workerNumber < slices.size(); workerNumber++) { - assignmentsMap.computeIfAbsent( - workerNumber, - ignored -> Arrays.asList(new InputSlice[numInputs]) - ).set(inputNumber, slices.get(workerNumber)); - } + // Flip the slices, so it's worker number -> slices for that worker. + for (int workerNumber = 0; workerNumber < slices.size(); workerNumber++) { + assignmentsMap.computeIfAbsent( + workerNumber, + ignored -> Arrays.asList(new InputSlice[numInputs]) + ).set(inputNumber, slices.get(workerNumber)); + } + } + + if (assignmentsMap.isEmpty()) { + // All inputs are broadcast. Use a single worker. + assignmentsMap.put(0, Arrays.asList(new InputSlice[numInputs])); + } + + // Assign broadcast inputs to all workers determined by non-broadcast assignment above. + for (int inputNumber = 0; inputNumber < numInputs; inputNumber++) { + if (!stageDef.getBroadcastInputNumbers().contains(inputNumber)) { + continue; + } + + final InputSpec inputSpec = stageDef.getInputSpecs().get(inputNumber); + final SegmentPruner pruner = stageDef.getSegmentPruner(inputNumber); + final List broadcastSlices = slicer.sliceStatic(inputSpec, pruner, 1); + final InputSlice broadcastSlice = broadcastSlices.isEmpty() + ? NilInputSlice.INSTANCE + : Iterables.getOnlyElement(broadcastSlices); + + for (int workerNumber : assignmentsMap.keySet()) { + assignmentsMap.get(workerNumber).set(inputNumber, broadcastSlice); } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java index 138bc1f29950..c46fa410cf6d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java @@ -122,12 +122,14 @@ private LogicalStage makeSequenceStage(LogicalStage inputStage, DruidNodeStack s DruidSort sort = (DruidSort) stack.getNode(); List orderBySpecs = DruidQuery.buildOrderByColumnSpecs(inputStage.getLogicalRowSignature(), sort); List keyColumns = Lists.transform(orderBySpecs, KeyColumn::fromOrderByColumnSpec); - SortStage sortStage = new SortStage(inputStage, keyColumns); if (sort.hasLimitOrOffset()) { - return new OffsetLimitStage(sortStage, sort.getOffsetLimit()); + return new OffsetLimitStage( + new SortStage(inputStage, keyColumns, sort.getOffsetLimit()), + sort.getOffsetLimit() + ); } else { - return sortStage; + return new SortStage(inputStage, keyColumns, null); } } if (stack.getNode() instanceof DruidUnnest) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java index 4885a7d3e7de..3b3222f88909 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.logical; import org.apache.druid.error.DruidException; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.StageProcessor; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; @@ -100,6 +101,7 @@ private StageDefinitionBuilder buildFrameProcessorStage(AbstractFrameProcessorSt sdb.signature(frameProcessorStage.getLogicalRowSignature()); sdb.processor(stageProcessor); sdb.shuffleSpec(MixShuffleSpec.instance()); + sdb.maxWorkerCount(maxWorkerCountFor(frameProcessorStage)); return sdb; } @@ -110,6 +112,7 @@ private StageDefinitionBuilder buildShuffleStage(AbstractShuffleStage stage) sdb.signature(stage.getRowSignature()); sdb.processor(makeScanStageProcessor(VirtualColumns.EMPTY, stage.getRowSignature(), null)); sdb.shuffleSpec(stage.buildShuffleSpec()); + sdb.maxWorkerCount(maxWorkerCountFor(stage)); return sdb; } @@ -163,4 +166,12 @@ private String getIdForBuilder() { return ScanQueryStageProcessor.makeSegmentMapFnProcessor(signature, dataSource); } + + /** + * Returns the {@code maxWorkerCount} to use for a given {@link LogicalStage}. + */ + private static int maxWorkerCountFor(LogicalStage stage) + { + return stage.isSingleWorker() ? 1 : Limits.MAX_WORKERS; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java index e955c6a4052e..809a33cfc2b6 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java @@ -91,7 +91,7 @@ public static LogicalStage buildStages(ProjectStage projectStage, Grouping group { GroupByQuery gby = makeGbyQuery(projectStage, grouping); PreShuffleStage aggStage = new PreShuffleStage(projectStage, gby.withPostAggregatorSpecs(Collections.emptyList())); - SortStage sortStage = new SortStage(aggStage, getKeyColumns(grouping.getDimensions())); + SortStage sortStage = new SortStage(aggStage, getKeyColumns(grouping.getDimensions()), null); PostShuffleStage finalAggStage = new PostShuffleStage(sortStage, gby, grouping.getOutputRowSignature()); return finalAggStage; } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java index 96cf3d3510b6..77cf2f2f740c 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java @@ -76,7 +76,7 @@ public RowSignature getLogicalRowSignature() public ShuffleSpec buildShuffleSpec() { final ClusterBy clusterBy = new ClusterBy(keyColumns, 0); - return new HashShuffleSpec(clusterBy, 1); + return new HashShuffleSpec(clusterBy, 1, true); } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java index 59f88264dbd3..992a3f439b7f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java @@ -59,4 +59,12 @@ public interface LogicalStage * Returns the inputs of this stage. */ List getInputSpecs(); + + /** + * Whether this stage must run on a single worker. + */ + default boolean isSingleWorker() + { + return false; + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java index 88496394c659..1950faa5511f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/OffsetLimitStage.java @@ -53,6 +53,12 @@ public LogicalStage extendWith(DruidNodeStack stack) ); } + @Override + public boolean isSingleWorker() + { + return true; + } + @Override public RowSignature getLogicalRowSignature() { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java index 834ffdcf6f35..d97f7397deed 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java @@ -26,21 +26,31 @@ import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.querykit.ShuffleSpecFactories; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.OffsetLimit; import org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack; +import javax.annotation.Nullable; import java.util.List; public class SortStage extends AbstractShuffleStage { protected final List keyColumns; - public SortStage(LogicalStage inputStage, List keyColumns) + /** + * Non-null when this sort feeds a downstream {@link OffsetLimitStage}, in which case the shuffle output + * is funneled into a single sorted partition with a {@code limitHint} derived from the offset and limit. + */ + @Nullable + protected final OffsetLimit offsetLimit; + + public SortStage(LogicalStage inputStage, List keyColumns, @Nullable OffsetLimit offsetLimit) { super( QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(), keyColumns), LogicalInputSpec.of(inputStage) ); this.keyColumns = keyColumns; + this.offsetLimit = offsetLimit; } @Override @@ -53,7 +63,26 @@ public RowSignature getLogicalRowSignature() public ShuffleSpec buildShuffleSpec() { final ClusterBy clusterBy = new ClusterBy(keyColumns, 0); - return ShuffleSpecFactories.globalSortWithMaxPartitionCount(1).build(clusterBy, false); + if (offsetLimit != null) { + // Funnel everything through a single sorted partition so the downstream OffsetLimitStage can apply the + // offset and limit. + return ShuffleSpecFactories.singlePartitionWithLimit(computeLimitHint(offsetLimit)).build(clusterBy, false); + } else { + return ShuffleSpecFactories.globalSortWithTargetPartitions().build(clusterBy, false); + } + } + + /** + * Computes the {@link ShuffleSpec#limitHint()} that the upstream sort can use to short-circuit work when the + * downstream stage applies an offset and limit. + */ + private static long computeLimitHint(OffsetLimit offsetLimit) + { + if (offsetLimit.hasLimit() && offsetLimit.getOffset() + offsetLimit.getLimit() > 0 /* overflow check */) { + return offsetLimit.getOffset() + offsetLimit.getLimit(); + } else { + return ShuffleSpec.UNLIMITED; + } } @Override diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index bdc4370f9e7f..014cf1956bbe 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -31,8 +31,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSpec; -import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.input.external.ExternalInputSpec; import org.apache.druid.msq.input.inline.InlineInputSpec; import org.apache.druid.msq.input.lookup.LookupInputSpec; @@ -267,14 +267,13 @@ public IntSet getBroadcastInputs() /** * Figure for {@link StageDefinition#getMaxWorkerCount()} that should be used when processing. */ - public int getMaxWorkerCount(final QueryKitSpec queryKitSpec) + public int getMaxWorkerCount() { if (isSingleWorker()) { return 1; - } else if (InputSpecs.hasLeafInputs(inputSpecs, broadcastInputs)) { - return queryKitSpec.getMaxLeafWorkerCount(); } else { - return queryKitSpec.getMaxNonLeafWorkerCount(); + // Use MAX_WORKERS as a high upper bound; capped at runtime by QueryDefinition.withRuntimeBounds. + return Limits.MAX_WORKERS; } } @@ -431,7 +430,7 @@ private static DataSourcePlan forQuery( // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the // outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong. dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY), - ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()), + ShuffleSpecFactories.globalSortWithTargetPartitions(), minStageNumber ); @@ -669,9 +668,8 @@ private static DataSourcePlan forSortMergeJoin( ((StageInputSpec) Iterables.getOnlyElement(leftPlan.getInputSpecs())).getStageNumber() ); - final int hashPartitionCount = queryKitSpec.getNumPartitionsForShuffle(); final List leftPartitionKey = partitionKeys.get(0); - leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), hashPartitionCount)); + leftBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(leftPartitionKey, 0), 1, true)); leftBuilder.signature(QueryKitUtils.sortableSignature(leftBuilder.getSignature(), leftPartitionKey)); // Build up the right stage. @@ -680,7 +678,7 @@ private static DataSourcePlan forSortMergeJoin( ); final List rightPartitionKey = partitionKeys.get(1); - rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), hashPartitionCount)); + rightBuilder.shuffleSpec(new HashShuffleSpec(new ClusterBy(rightPartitionKey, 0), 1, true)); rightBuilder.signature(QueryKitUtils.sortableSignature(rightBuilder.getSignature(), rightPartitionKey)); // Compute join signature. @@ -708,7 +706,7 @@ private static DataSourcePlan forSortMergeJoin( Iterables.getOnlyElement(rightPlan.getInputSpecs()) ) ) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) + .maxWorkerCount(Limits.MAX_WORKERS) .signature(joinSignatureBuilder.build()) .processor( new SortMergeJoinStageProcessor( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java index 2d0361a037f8..4454026fca9a 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitSpec.java @@ -19,45 +19,29 @@ package org.apache.druid.msq.querykit; -import org.apache.druid.msq.input.InputSpecs; import org.apache.druid.msq.kernel.QueryDefinition; -import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.query.Query; /** - * Collection of parameters for {@link QueryKit#makeQueryDefinition}. + * Container for {@link QueryKit} plus the queryId that we want to build. */ public class QueryKitSpec { private final QueryKit> queryKit; private final String queryId; - private final int maxLeafWorkerCount; - private final int maxNonLeafWorkerCount; - private final int targetPartitionsPerWorker; /** - * @param queryKit kit that is used to translate native subqueries; i.e., - * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. - * @param queryId queryId of the resulting {@link QueryDefinition} - * @param maxLeafWorkerCount maximum number of workers for leaf stages: becomes - * {@link StageDefinition#getMaxWorkerCount()} - * @param maxNonLeafWorkerCount maximum number of workers for non-leaf stages: becomes - * {@link StageDefinition#getMaxWorkerCount()} - * @param targetPartitionsPerWorker preferred number of partitions per worker for subqueries + * @param queryKit kit that is used to translate native subqueries; i.e., + * {@link org.apache.druid.query.QueryDataSource}. Typically a {@link MultiQueryKit}. + * @param queryId queryId of the resulting {@link QueryDefinition} */ public QueryKitSpec( QueryKit> queryKit, - String queryId, - int maxLeafWorkerCount, - int maxNonLeafWorkerCount, - int targetPartitionsPerWorker + String queryId ) { this.queryId = queryId; this.queryKit = queryKit; - this.maxLeafWorkerCount = maxLeafWorkerCount; - this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; - this.targetPartitionsPerWorker = targetPartitionsPerWorker; } /** @@ -75,28 +59,4 @@ public String getQueryId() { return queryId; } - - /** - * Maximum number of workers for leaf stages. See {@link InputSpecs#hasLeafInputs}. - */ - public int getMaxLeafWorkerCount() - { - return maxLeafWorkerCount; - } - - /** - * Maximum number of workers for non-leaf stages. See {@link InputSpecs#hasLeafInputs}. - */ - public int getMaxNonLeafWorkerCount() - { - return maxNonLeafWorkerCount; - } - - /** - * Number of partitions to generate during a shuffle. - */ - public int getNumPartitionsForShuffle() - { - return maxNonLeafWorkerCount * targetPartitionsPerWorker; - } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java index 24f6dfffe27a..11bfaad390a8 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java @@ -24,6 +24,7 @@ import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; /** * Static factory methods for common implementations of {@link ShuffleSpecFactory}. @@ -61,12 +62,13 @@ public static ShuffleSpecFactory singlePartitionWithLimit(final long limitHint) } /** - * Factory that produces a particular number of output partitions. + * Factory that produces an adjustable globally-sorted shuffle spec. The partition count is adjusted at + * runtime by {@link StageDefinition#withRuntimeBounds(int, int, int)}. */ - public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int partitions) + public static ShuffleSpecFactory globalSortWithTargetPartitions() { return (clusterBy, aggregate) -> - new GlobalSortMaxCountShuffleSpec(clusterBy, partitions, aggregate, ShuffleSpec.UNLIMITED); + new GlobalSortMaxCountShuffleSpec(clusterBy, 1, aggregate, ShuffleSpec.UNLIMITED, true); } /** @@ -75,7 +77,7 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti * * Produces {@link MixShuffleSpec}, ignoring the target size, if the provided {@link ClusterBy} is empty. */ - public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize) + public static ShuffleSpecFactory globalSortWithTargetSize(int targetSize) { return (clusterBy, aggregate) -> { if (clusterBy.isEmpty()) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index 7a2ec6f239a1..eebd97b607bc 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.HashShuffleSpec; import org.apache.druid.msq.kernel.MixShuffleSpec; @@ -93,14 +94,13 @@ public QueryDefinition makeQueryDefinition( final WindowStages windowStages = new WindowStages( originalQuery, jsonMapper, - queryKitSpec.getNumPartitionsForShuffle(), - queryKitSpec.getMaxNonLeafWorkerCount(), + Limits.MAX_WORKERS, resultShuffleSpecFactory, signatureFromInput, isOperatorTransformationEnabled ); - final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(queryKitSpec.getNumPartitionsForShuffle()); + final ShuffleSpec nextShuffleSpec = windowStages.getStages().get(0).findShuffleSpec(); final QueryDefinitionBuilder queryDefBuilder = makeQueryDefinitionBuilder(queryKitSpec.getQueryId(), dataSourcePlan, nextShuffleSpec); final int firstWindowStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); @@ -121,7 +121,6 @@ private static class WindowStages { private final List stages; private final WindowOperatorQuery query; - private final int numPartitionsForShuffle; private final int maxNonLeafWorkerCount; private final ShuffleSpec finalWindowStageShuffleSpec; private final RowSignature finalWindowStageRowSignature; @@ -131,7 +130,6 @@ private static class WindowStages private WindowStages( WindowOperatorQuery query, ObjectMapper jsonMapper, - int numPartitionsForShuffle, int maxNonLeafWorkerCount, ShuffleSpecFactory resultShuffleSpecFactory, RowSignature signatureFromInput, @@ -140,7 +138,6 @@ private WindowStages( { this.stages = new ArrayList<>(); this.query = query; - this.numPartitionsForShuffle = numPartitionsForShuffle; this.maxNonLeafWorkerCount = maxNonLeafWorkerCount; this.isOperatorTransformationEnabled = isOperatorTransformationEnabled; @@ -223,7 +220,7 @@ private StageDefinitionBuilder getStageDefinitionBuilder(int stageNumber, int wi final WindowStage stage = stages.get(windowStageIndex); final ShuffleSpec shuffleSpec = (windowStageIndex == stages.size() - 1) ? finalWindowStageShuffleSpec : - stages.get(windowStageIndex + 1).findShuffleSpec(numPartitionsForShuffle); + stages.get(windowStageIndex + 1).findShuffleSpec(); final RowSignature stageRowSignature = getRowSignatureForStage(windowStageIndex, shuffleSpec); final List operatorFactories = isOperatorTransformationEnabled @@ -362,7 +359,7 @@ private List getWindowOperatorFactories() return windowOperatorFactories; } - private ShuffleSpec findShuffleSpec(int partitionCount) + private ShuffleSpec findShuffleSpec() { Map sortColumnsMap = new HashMap<>(); if (sortOperatorFactory != null) { @@ -392,7 +389,7 @@ private ShuffleSpec findShuffleSpec(int partitionCount) keyColsOfWindow.add(kc); } - return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), partitionCount); + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 1, true); } private boolean canAccept(OperatorFactory operatorFactory) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 8ace680938c1..32dd480473e0 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; @@ -139,7 +140,7 @@ public QueryDefinition makeQueryDefinition( shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() - : ShuffleSpecFactories.globalSortWithMaxPartitionCount(queryKitSpec.getNumPartitionsForShuffle()); + : ShuffleSpecFactories.globalSortWithTargetPartitions(); if (doLimitOrOffset) { shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartitionWithLimit(postAggregationLimitHint); @@ -164,7 +165,7 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .signature(intermediateSignature) .shuffleSpec(shuffleSpecFactoryPreAggregation.build(intermediateClusterBy, true)) - .maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec)) + .maxWorkerCount(dataSourcePlan.getMaxWorkerCount()) .processor(new GroupByPreShuffleStageProcessor(queryToRun)) ); @@ -184,7 +185,7 @@ public QueryDefinition makeQueryDefinition( StageDefinition.builder(firstStageNumber + 1) .inputs(new StageInputSpec(firstStageNumber)) .signature(resultSignature) - .maxWorkerCount(queryKitSpec.getMaxNonLeafWorkerCount()) + .maxWorkerCount(Limits.MAX_WORKERS) .shuffleSpec( shuffleSpecFactoryPostAggregation != null ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java index f6cc9222df99..6f82fd087724 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java @@ -161,7 +161,7 @@ public QueryDefinition makeQueryDefinition( .broadcastInputs(dataSourcePlan.getBroadcastInputs()) .shuffleSpec(scanShuffleSpec) .signature(signatureToUse) - .maxWorkerCount(dataSourcePlan.getMaxWorkerCount(queryKitSpec)) + .maxWorkerCount(dataSourcePlan.getMaxWorkerCount()) .processor(new ScanQueryStageProcessor(queryToRun)) ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java index d76264621d51..44ade1f09e0d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java @@ -19,61 +19,23 @@ package org.apache.druid.msq.sql; -import com.google.inject.Inject; -import org.apache.druid.client.TimelineServerView; -import org.apache.druid.msq.dart.controller.DartControllerContext; import org.apache.druid.msq.exec.QueryKitSpecFactory; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.msq.querykit.QueryKitSpec; -import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; -import org.apache.druid.server.coordination.DruidServerMetadata; -import org.apache.druid.server.coordination.ServerType; public class DartQueryKitSpecFactory implements QueryKitSpecFactory { - private final TimelineServerView serverView; - - @Inject - public DartQueryKitSpecFactory(TimelineServerView serverView) - { - this.serverView = serverView; - } - @Override public QueryKitSpec makeQueryKitSpec( final QueryKit> queryKit, final String queryId, final MSQTuningConfig tuningConfig, - final QueryContext queryContext) + final QueryContext queryContext + ) { - return new QueryKitSpec( - queryKit, - queryId, - getNumWorkers(), - queryContext.getInt( - DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT, - DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT - ), - MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( - queryContext, - DartControllerContext.DEFAULT_TARGET_PARTITIONS_PER_WORKER - ) - ); - } - - private int getNumWorkers() - { - int cnt = 0; - for (DruidServerMetadata s : serverView.getDruidServerMetadatas()) { - if (s.getType() == ServerType.HISTORICAL) { - cnt++; - } - } - - // Even if all segments are realtime, launch at least one worker. - return Math.max(1, cnt); + return new QueryKitSpec(queryKit, queryId); } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java index 80af571b22ff..b1fe9c031571 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryKitSpecFactory.java @@ -19,46 +19,23 @@ package org.apache.druid.msq.sql; -import com.google.inject.Inject; import org.apache.druid.msq.exec.QueryKitSpecFactory; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.querykit.QueryKit; import org.apache.druid.msq.querykit.QueryKitSpec; -import org.apache.druid.msq.util.MultiStageQueryContext; -import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; public class MSQTaskQueryKitSpecFactory implements QueryKitSpecFactory { - private DruidProcessingConfig processingConfig; - - @Inject - public MSQTaskQueryKitSpecFactory(DruidProcessingConfig processingConfig) - { - this.processingConfig = processingConfig; - } - @Override public QueryKitSpec makeQueryKitSpec( QueryKit> queryKit, String queryId, MSQTuningConfig tuningConfig, - QueryContext queryContext) + QueryContext queryContext + ) { - return new QueryKitSpec( - queryKit, - queryId, - tuningConfig.getMaxNumWorkers(), - tuningConfig.getMaxNumWorkers(), - - // Assume tasks are symmetric: workers have the same number of processors available as a controller. - // Create one partition per processor per task, for maximum parallelism. - MultiStageQueryContext.getTargetPartitionsPerWorkerWithDefault( - queryContext, - processingConfig.getNumThreads() - ) - ); + return new QueryKitSpec(queryKit, queryId); } - } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java index d4116fed8635..999de322393c 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java @@ -83,7 +83,6 @@ import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.QueryFrameworkUtils; -import org.apache.druid.sql.calcite.util.TestTimelineServerView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.hook.DruidHookDispatcher; import org.apache.druid.sql.http.EngineInfo; @@ -271,7 +270,7 @@ public void register(ControllerHolder holder) ) ) ), - new DartQueryKitSpecFactory(new TestTimelineServerView(Collections.emptyList())), + new DartQueryKitSpecFactory(), injector.getInstance(MultiQueryKit.class), new ServerConfig(), new DefaultQueryConfig(ImmutableMap.of("foo", "bar")), diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java index 6291a9d7f351..28d40e087668 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java @@ -233,6 +233,7 @@ public void test_queryWithoutEnoughSlots_shouldThrowException() null, // WorkerFailureListener ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncherConfig() ); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java index 5223dae98344..0ca643f109f7 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.indexing.error.MSQFault; @@ -90,6 +91,7 @@ public void testLaunchWorkersIfNeeded_returnsFailedWorkers() throws InterruptedE workerFailureListener, ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(), 2 ); @@ -134,6 +136,7 @@ public void testWaitForWorkers_returnsFailedWorkers() throws InterruptedExceptio workerFailureListener, ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(), 2 ); @@ -180,6 +183,7 @@ public void testLaunchWorkersIfNeeded_returnsEmptySet_whenNoFailures() throws In workerFailureListener, ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(), 2 ); @@ -210,6 +214,7 @@ public void testWaitForWorkers_returnsEmptySet_whenNoFailures() throws Interrupt workerFailureListener, ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig(), 2 ); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java index 2e5c4859904f..7badce6168bf 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig; import org.apache.druid.rpc.indexing.OverlordClient; @@ -46,6 +47,7 @@ public void setUp() getWorkerFailureListener(), ImmutableMap.of(), TimeUnit.SECONDS.toMillis(5), + Limits.MAX_WORKERS, new MSQWorkerTaskLauncherConfig() ); } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java index bd1be760d27a..fcef8d627a16 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java @@ -183,7 +183,8 @@ public MockQueryDefinitionBuilder defineStage( ), 0 ), - MAX_NUM_PARTITIONS + MAX_NUM_PARTITIONS, + false ); break; diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index e7dda29cc55d..47f46e85fe6b 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -445,6 +445,7 @@ public WorkerManager newWorkerManager( workerFailureListener, IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig, ImmutableMap.of()), 0, + NUM_WORKERS, taskLauncherConfig ); } @@ -472,6 +473,18 @@ public WorkerClient newWorkerClient() return new MSQTestWorkerClient(inMemoryWorkers, mapper); } + @Override + public int maxNonLeafWorkerCount() + { + return NUM_WORKERS; + } + + @Override + public int targetPartitionsPerWorker() + { + return 1; + } + @Override public ControllerContext newContext(QueryContext context) {