diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index f34c3b8ae39e..525412c4b0ec 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -75,6 +75,7 @@ public class CoordinatorDynamicConfig private final Set turboLoadingNodes; private final Map cloneServers; + private final Map tierServerFillThreshold; /** * Stale pending segments belonging to the data sources in this list are not killed by {@code @@ -126,7 +127,8 @@ public CoordinatorDynamicConfig( @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map debugDimensions, @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, - @JsonProperty("cloneServers") @Nullable Map cloneServers + @JsonProperty("cloneServers") @Nullable Map cloneServers, + @JsonProperty("tierServerFillThreshold") @Nullable Map tierServerFillThreshold ) { this.markSegmentAsUnusedDelayMillis = @@ -172,6 +174,7 @@ public CoordinatorDynamicConfig( this.validDebugDimensions = validateDebugDimensions(debugDimensions); this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of()); this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of()); + this.tierServerFillThreshold = Configs.valueOrDefault(tierServerFillThreshold, Map.of()); } private Map validateDebugDimensions(Map debugDimensions) @@ -338,6 +341,12 @@ public Map getCloneServers() return cloneServers; } + @JsonProperty + public Map getTierServerFillThreshold() + { + return tierServerFillThreshold; + } + /** * List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load * segments. This causes decreases the average time taken to load segments. However, this also means less resources @@ -371,6 +380,7 @@ public String toString() ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", turboLoadingNodes=" + turboLoadingNodes + ", cloneServers=" + cloneServers + + ", tierServerFillThreshold=" + tierServerFillThreshold + '}'; } @@ -407,7 +417,8 @@ public boolean equals(Object o) && Objects.equals(decommissioningNodes, that.decommissioningNodes) && Objects.equals(turboLoadingNodes, that.turboLoadingNodes) && Objects.equals(debugDimensions, that.debugDimensions) - && Objects.equals(cloneServers, that.cloneServers); + && Objects.equals(cloneServers, that.cloneServers) + && Objects.equals(tierServerFillThreshold, that.tierServerFillThreshold); } @Override @@ -431,7 +442,8 @@ public int hashCode() pauseCoordination, debugDimensions, turboLoadingNodes, - cloneServers + cloneServers, + tierServerFillThreshold ); } @@ -488,6 +500,7 @@ public static class Builder private Boolean smartSegmentLoading; private Set turboLoadingNodes; private Map cloneServers; + private Map tierServerFillThreshold; public Builder() { @@ -512,7 +525,8 @@ public Builder( @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, @JsonProperty("debugDimensions") @Nullable Map debugDimensions, @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes, - @JsonProperty("cloneServers") @Nullable Map cloneServers + @JsonProperty("cloneServers") @Nullable Map cloneServers, + @JsonProperty("tierServerFillThreshold") @Nullable Map tierServerFillThreshold ) { this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis; @@ -533,6 +547,7 @@ public Builder( this.debugDimensions = debugDimensions; this.turboLoadingNodes = turboLoadingNodes; this.cloneServers = cloneServers; + this.tierServerFillThreshold = tierServerFillThreshold; } public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis) @@ -631,6 +646,12 @@ public Builder withCloneServers(Map cloneServers) return this; } + public Builder withTierServerFillThreshold(Map tierServerFillThreshold) + { + this.tierServerFillThreshold = tierServerFillThreshold; + return this; + } + /** * Builds a CoordinatoryDynamicConfig using either the configured values, or * the default value if not configured. @@ -658,7 +679,8 @@ public CoordinatorDynamicConfig build() valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING), debugDimensions, turboLoadingNodes, - cloneServers + cloneServers, + tierServerFillThreshold ); } @@ -690,7 +712,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()), valueOrDefault(debugDimensions, defaults.getDebugDimensions()), valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()), - valueOrDefault(cloneServers, defaults.getCloneServers()) + valueOrDefault(cloneServers, defaults.getCloneServers()), + valueOrDefault(tierServerFillThreshold, defaults.getTierServerFillThreshold()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 72f50a87a570..d9382d77e97d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -750,6 +750,7 @@ public void run() .withDataSourcesSnapshot(dataSourcesSnapshot) .withDynamicConfigs(metadataManager.configs().getCurrentDynamicConfig()) .withCompactionConfig(metadataManager.configs().getCurrentCompactionConfig()) + .withDefaultServerFillThreshold(config.getDefaultServerFillThreshold()) .build(); dutyGroup.run(params); } else { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 576b2155ac7e..e70442edb173 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -203,6 +203,7 @@ public static class Builder private CoordinatorRunStats stats; private BalancerStrategy balancerStrategy; private Set broadcastDatasources; + private double defaultServerFillThreshold = 1.0; private Builder() { @@ -288,7 +289,9 @@ private void initSegmentAssignerIfRequired() druidCluster, balancerStrategy, segmentLoadingConfig, - stats + stats, + coordinatorDynamicConfig, + defaultServerFillThreshold ); } @@ -340,6 +343,12 @@ public Builder withDynamicConfigs(CoordinatorDynamicConfig configs) return this; } + public Builder withDefaultServerFillThreshold(double defaultServerFillThreshold) + { + this.defaultServerFillThreshold = defaultServerFillThreshold; + return this; + } + public Builder withSegmentLoadingConfig(SegmentLoadingConfig config) { this.segmentLoadingConfig = config; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index de33e6c8a639..da762a61200e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -254,6 +254,12 @@ public long getAvailableSize() return getMaxSize() - getSizeUsed(); } + public double getFillFraction() + { + final long maxSize = getMaxSize(); + return maxSize > 0 ? (double) getSizeUsed() / maxSize : 0.0; + } + /** * Checks if the server can load the given segment. *

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/CoordinatorSegmentLoadConfigs.java b/server/src/main/java/org/apache/druid/server/coordinator/config/CoordinatorSegmentLoadConfigs.java new file mode 100644 index 000000000000..99e03455f096 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/config/CoordinatorSegmentLoadConfigs.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; + +public class CoordinatorSegmentLoadConfigs +{ + /** + * Default fill-fraction threshold applied to all tiers with no per-tier override in + * {@link org.apache.druid.server.coordinator.CoordinatorDynamicConfig#getTierServerFillThreshold()}. + * Servers whose fill fraction ({@code sizeUsed / maxSize}) exceeds this value are deprioritized + * for segment assignment. {@code 1.0} means no threshold (current behavior). + */ + @JsonProperty + private final double defaultServerFillThreshold; + + @JsonCreator + public CoordinatorSegmentLoadConfigs( + @JsonProperty("defaultServerFillThreshold") Double defaultServerFillThreshold + ) + { + this.defaultServerFillThreshold = Configs.valueOrDefault(defaultServerFillThreshold, 1.0); + } + + public double getDefaultServerFillThreshold() + { + return defaultServerFillThreshold; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/config/DruidCoordinatorConfig.java index 6004c5b1ba47..6a99443c61d9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/config/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/config/DruidCoordinatorConfig.java @@ -34,6 +34,7 @@ public class DruidCoordinatorConfig private final CoordinatorKillConfigs killConfigs; private final BalancerStrategyFactory balancerStrategyFactory; private final HttpLoadQueuePeonConfig httpLoadQueuePeonConfig; + private final CoordinatorSegmentLoadConfigs coordinatorSegmentLoadConfigs; @Inject public DruidCoordinatorConfig( @@ -41,7 +42,8 @@ public DruidCoordinatorConfig( CoordinatorPeriodConfig periodConfig, CoordinatorKillConfigs killConfigs, BalancerStrategyFactory balancerStrategyFactory, - HttpLoadQueuePeonConfig httpLoadQueuePeonConfig + HttpLoadQueuePeonConfig httpLoadQueuePeonConfig, + CoordinatorSegmentLoadConfigs coordinatorSegmentLoadConfigs ) { this.killConfigs = killConfigs; @@ -49,6 +51,7 @@ public DruidCoordinatorConfig( this.periodConfig = periodConfig; this.balancerStrategyFactory = balancerStrategyFactory; this.httpLoadQueuePeonConfig = httpLoadQueuePeonConfig; + this.coordinatorSegmentLoadConfigs = coordinatorSegmentLoadConfigs; validateKillConfigs(); } @@ -88,6 +91,11 @@ public HttpLoadQueuePeonConfig getHttpLoadQueuePeonConfig() return httpLoadQueuePeonConfig; } + public double getDefaultServerFillThreshold() + { + return coordinatorSegmentLoadConfigs.getDefaultServerFillThreshold(); + } + private void validateKillConfigs() { validateKillConfig(killConfigs.auditLogs(), "audit"); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java index cc007449b578..0a6a65602304 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelector.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.loading; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; @@ -48,9 +49,13 @@ public class RoundRobinServerSelector { private final Map tierToServers = new HashMap<>(); + private final CoordinatorDynamicConfig dynamicConfig; + private final double defaultServerFillThreshold; - public RoundRobinServerSelector(DruidCluster cluster) + public RoundRobinServerSelector(DruidCluster cluster, CoordinatorDynamicConfig dynamicConfig, double defaultServerFillThreshold) { + this.dynamicConfig = dynamicConfig; + this.defaultServerFillThreshold = defaultServerFillThreshold; cluster.getManagedHistoricals().forEach( (tier, servers) -> tierToServers.put(tier, new CircularServerList(servers)) ); @@ -67,25 +72,46 @@ public Iterator getServersInTierToLoadSegment(String tier, DataSeg return Collections.emptyIterator(); } - return new EligibleServerIterator(segment, iterator); + return new EligibleServerIterator( + segment, + iterator, + dynamicConfig.getTierServerFillThreshold().getOrDefault(tier, defaultServerFillThreshold) + ); } /** * Iterator over servers in a tier that are eligible to load a given segment. + *

+ * Applies a fill-threshold preference: at construction time, scans all servers + * to determine if any eligible server is below the threshold. If so, only + * below-threshold servers are returned. If none qualify, the threshold is + * relaxed so all eligible servers are returned (fallback to original behavior). + *

+ * The cursor advances through the circular list across calls so that + * subsequent invocations pick up where the last iterator left off. */ private static class EligibleServerIterator implements Iterator { final CircularServerList delegate; final DataSegment segment; + final double effectiveFillThreshold; ServerHolder nextEligible; int remainingIterations; - EligibleServerIterator(DataSegment segment, CircularServerList delegate) + EligibleServerIterator(DataSegment segment, CircularServerList delegate, double fillThreshold) { this.delegate = delegate; this.segment = segment; this.remainingIterations = delegate.servers.size(); + + // Apply the threshold only if at least one eligible server is below it. + // Otherwise fall back: allow any eligible server (preference-with-fallback). + final boolean anyBelowThreshold = delegate.servers.stream() + .anyMatch(s -> s.canLoadSegment(segment) + && s.getFillFraction() <= fillThreshold); + this.effectiveFillThreshold = anyBelowThreshold ? fillThreshold : Double.POSITIVE_INFINITY; + nextEligible = search(); } @@ -112,7 +138,7 @@ ServerHolder search() { while (remainingIterations-- > 0) { ServerHolder nextServer = delegate.peekNext(); - if (nextServer.canLoadSegment(segment)) { + if (nextServer.canLoadSegment(segment) && nextServer.getFillFraction() <= effectiveFillThreshold) { return nextServer; } else { delegate.advanceCursor(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentStatusInTier.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentStatusInTier.java index e9977829f5d1..d582d37bf922 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentStatusInTier.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentStatusInTier.java @@ -35,21 +35,28 @@ public class SegmentStatusInTier { private final DataSegment segment; + private final double fillThreshold; private final List eligibleLoadServers = new ArrayList<>(); + private final List aboveThresholdLoadServers = new ArrayList<>(); private final List eligibleDropServers = new ArrayList<>(); private final Map> serversWithQueuedActions = new HashMap<>(); - public SegmentStatusInTier(DataSegment segment, NavigableSet historicals) + public SegmentStatusInTier(DataSegment segment, NavigableSet historicals, double fillThreshold) { this.segment = segment; + this.fillThreshold = fillThreshold; historicals.forEach(this::handleServer); } + /** + * Returns servers eligible to load the segment, preferring those below the + * fill threshold. Falls back to above-threshold servers if none qualify. + */ public List getServersEligibleToLoad() { - return eligibleLoadServers; + return eligibleLoadServers.isEmpty() ? aboveThresholdLoadServers : eligibleLoadServers; } public List getServersEligibleToDrop() @@ -68,7 +75,11 @@ private void handleServer(ServerHolder server) if (server.isServingSegment(segment)) { eligibleDropServers.add(server); } else if (server.canLoadSegment(segment)) { - eligibleLoadServers.add(server); + if (server.getFillFraction() <= fillThreshold) { + eligibleLoadServers.add(server); + } else { + aboveThresholdLoadServers.add(server); + } } else if (action != null) { serversWithQueuedActions.computeIfAbsent(action, a -> new ArrayList<>()) .add(server); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index 2b7bbdb49e7f..144aa4d872b1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.client.DruidServer; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; @@ -58,6 +59,8 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private final SegmentLoadQueueManager loadQueueManager; private final DruidCluster cluster; private final CoordinatorRunStats stats; + private final CoordinatorDynamicConfig dynamicConfig; + private final double defaultServerFillThreshold; private final SegmentReplicaCountMap replicaCountMap; private final ReplicationThrottler replicationThrottler; private final RoundRobinServerSelector serverSelector; @@ -76,17 +79,23 @@ public StrategicSegmentAssigner( DruidCluster cluster, BalancerStrategy strategy, SegmentLoadingConfig loadingConfig, - CoordinatorRunStats stats + CoordinatorRunStats stats, + CoordinatorDynamicConfig dynamicConfig, + double defaultServerFillThreshold ) { this.stats = stats; this.cluster = cluster; this.strategy = strategy; this.loadQueueManager = loadQueueManager; + this.dynamicConfig = dynamicConfig; + this.defaultServerFillThreshold = defaultServerFillThreshold; this.replicaCountMap = SegmentReplicaCountMap.create(cluster); this.replicationThrottler = createReplicationThrottler(cluster, loadingConfig); this.useRoundRobinAssignment = loadingConfig.isUseRoundRobinSegmentAssignment(); - this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster) : null; + this.serverSelector = useRoundRobinAssignment + ? new RoundRobinServerSelector(cluster, dynamicConfig, defaultServerFillThreshold) + : null; cluster.getManagedHistoricals().forEach( (tier, historicals) -> tierToHistoricalCount.put(tier, historicals.size()) @@ -150,14 +159,24 @@ public boolean moveSegment( return false; } - // If the source server is not decommissioning, move can be skipped if the - // segment is already optimally placed - if (!sourceServer.isDecommissioning()) { - eligibleDestinationServers.add(sourceServer); + // Prefer servers below the fill threshold. Fall back to all eligible servers if none qualify. + final double fillThreshold = dynamicConfig.getTierServerFillThreshold() + .getOrDefault(tier, defaultServerFillThreshold); + List candidates = eligibleDestinationServers.stream() + .filter(s -> s.getFillFraction() <= fillThreshold) + .collect(Collectors.toList()); + if (candidates.isEmpty()) { + candidates = eligibleDestinationServers; + } + + // Allow "already optimally placed" only if the source is not above the fill threshold. + // If the source is over-threshold, force the move to drain it — do not add it to candidates. + if (!sourceServer.isDecommissioning() && sourceServer.getFillFraction() <= fillThreshold) { + candidates.add(sourceServer); } final ServerHolder destination = - strategy.findDestinationServerToMoveSegment(segment, sourceServer, eligibleDestinationServers); + strategy.findDestinationServerToMoveSegment(segment, sourceServer, candidates); if (destination == null || destination.getServer().equals(sourceServer.getServer())) { incrementSkipStat(Stats.Segments.MOVE_SKIPPED, "Optimally placed", segment, sourceServer); @@ -276,7 +295,11 @@ private int updateReplicasInTier( } final SegmentStatusInTier segmentStatus = - new SegmentStatusInTier(segment, cluster.getManagedHistoricalsByTier(tier)); + new SegmentStatusInTier( + segment, + cluster.getManagedHistoricalsByTier(tier), + dynamicConfig.getTierServerFillThreshold().getOrDefault(tier, defaultServerFillThreshold) + ); // Cancel all moves in this tier if it does not need to have replicas if (shouldCancelMoves) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index d91cb62050a1..e57e3ae67350 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -30,6 +30,7 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig; import org.apache.druid.server.coordinator.config.CoordinatorRunConfig; +import org.apache.druid.server.coordinator.config.CoordinatorSegmentLoadConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig; @@ -443,7 +444,8 @@ private void verifyCoordinatorConfigFailsWith( periodConfig, killConfig, null, - null + null, + new CoordinatorSegmentLoadConfigs(null) ) ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index d41e91e4474f..377f272fd739 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -54,6 +54,7 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig; import org.apache.druid.server.coordinator.config.CoordinatorRunConfig; +import org.apache.druid.server.coordinator.config.CoordinatorSegmentLoadConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; @@ -146,7 +147,8 @@ public void setUp() throws Exception new CoordinatorPeriodConfig(null, null), CoordinatorKillConfigs.DEFAULT, new CostBalancerStrategyFactory(), - null + null, + new CoordinatorSegmentLoadConfigs(null) ); druidNode = new DruidNode("hey", "what", false, 1234, null, true, false); scheduledExecutorFactory = ScheduledExecutors::fixed; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java index cbec62ee0e21..fb0ba6256789 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/RoundRobinServerSelectorTest.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.TestDataSource; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; @@ -64,7 +65,7 @@ public void testSingleIterator() .builder() .addTier(TIER, serverXL, serverM, serverXS, serverL) .build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, CoordinatorDynamicConfig.builder().build(), 1.0); // Verify that only eligible servers are returned in order of available size Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); @@ -91,7 +92,7 @@ public void testNextIteratorContinuesFromSamePosition() .builder() .addTier(TIER, serverXL, serverM, serverXS, serverL) .build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, CoordinatorDynamicConfig.builder().build(), 1.0); // Verify that only eligible servers are returned in order of available size Iterator pickedServers = selector.getServersInTierToLoadSegment(TIER, segment); @@ -113,7 +114,7 @@ public void testNextIteratorContinuesFromSamePosition() public void testNoServersInTier() { DruidCluster cluster = DruidCluster.builder().addTier(TIER).build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, CoordinatorDynamicConfig.builder().build(), 1.0); Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); Assert.assertFalse(eligibleServers.hasNext()); @@ -129,7 +130,7 @@ public void testNoEligibleServerInTier() createHistorical("server3", 10), createHistorical("server4", 20) ).build(); - final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster); + final RoundRobinServerSelector selector = new RoundRobinServerSelector(cluster, CoordinatorDynamicConfig.builder().build(), 1.0); // Verify that only eligible servers are returned in order of available size Iterator eligibleServers = selector.getServersInTierToLoadSegment(TIER, segment); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 166728d6d7e3..295e357ed24e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -58,6 +58,7 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig; import org.apache.druid.server.coordinator.config.CoordinatorRunConfig; +import org.apache.druid.server.coordinator.config.CoordinatorSegmentLoadConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; @@ -487,7 +488,8 @@ private Environment( new CoordinatorPeriodConfig(null, null), CoordinatorKillConfigs.DEFAULT, createBalancerStrategy(balancerStrategy), - new HttpLoadQueuePeonConfig(null, null, null) + new HttpLoadQueuePeonConfig(null, null, null), + new CoordinatorSegmentLoadConfigs(null) ); JacksonConfigManager jacksonConfigManager = mockConfigManager(); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 22f3ccbdd4db..cfc011254a73 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -279,6 +279,7 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() false, null, ImmutableSet.of("host1"), + null, null ); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); @@ -305,6 +306,7 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme false, null, ImmutableSet.of("host1"), + null, null ); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 848dbd5da303..4f30ab1cab58 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -79,6 +79,7 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.CoordinatorPeriodConfig; import org.apache.druid.server.coordinator.config.CoordinatorRunConfig; +import org.apache.druid.server.coordinator.config.CoordinatorSegmentLoadConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty; @@ -196,6 +197,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.coordinator.kill", CoordinatorKillConfigs.class); JsonConfigProvider.bind(binder, "druid.coordinator.period", CoordinatorPeriodConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.loadqueuepeon.http", HttpLoadQueuePeonConfig.class); + JsonConfigProvider.bind(binder, "druid.coordinator.segmentLoading", CoordinatorSegmentLoadConfigs.class); JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); JsonConfigProvider.bind(binder, "druid.coordinator.segment", CoordinatorSegmentWatcherConfig.class); JsonConfigProvider.bind(binder, "druid.coordinator.segmentMetadataCache", SegmentMetadataCacheConfig.class);