Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class CoordinatorDynamicConfig

private final Set<String> turboLoadingNodes;
private final Map<String, String> cloneServers;
private final Map<String, Double> tierServerFillThreshold;

/**
* Stale pending segments belonging to the data sources in this list are not killed by {@code
Expand Down Expand Up @@ -126,7 +127,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading,
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
@JsonProperty("tierServerFillThreshold") @Nullable Map<String, Double> tierServerFillThreshold
)
{
this.markSegmentAsUnusedDelayMillis =
Expand Down Expand Up @@ -172,6 +174,7 @@ public CoordinatorDynamicConfig(
this.validDebugDimensions = validateDebugDimensions(debugDimensions);
this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of());
this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of());
this.tierServerFillThreshold = Configs.valueOrDefault(tierServerFillThreshold, Map.of());
}

private Map<Dimension, String> validateDebugDimensions(Map<String, String> debugDimensions)
Expand Down Expand Up @@ -338,6 +341,12 @@ public Map<String, String> getCloneServers()
return cloneServers;
}

@JsonProperty
public Map<String, Double> getTierServerFillThreshold()
{
return tierServerFillThreshold;
}

/**
* List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load
* segments. This causes decreases the average time taken to load segments. However, this also means less resources
Expand Down Expand Up @@ -371,6 +380,7 @@ public String toString()
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
", turboLoadingNodes=" + turboLoadingNodes +
", cloneServers=" + cloneServers +
", tierServerFillThreshold=" + tierServerFillThreshold +
'}';
}

Expand Down Expand Up @@ -407,7 +417,8 @@ public boolean equals(Object o)
&& Objects.equals(decommissioningNodes, that.decommissioningNodes)
&& Objects.equals(turboLoadingNodes, that.turboLoadingNodes)
&& Objects.equals(debugDimensions, that.debugDimensions)
&& Objects.equals(cloneServers, that.cloneServers);
&& Objects.equals(cloneServers, that.cloneServers)
&& Objects.equals(tierServerFillThreshold, that.tierServerFillThreshold);
}

@Override
Expand All @@ -431,7 +442,8 @@ public int hashCode()
pauseCoordination,
debugDimensions,
turboLoadingNodes,
cloneServers
cloneServers,
tierServerFillThreshold
);
}

Expand Down Expand Up @@ -488,6 +500,7 @@ public static class Builder
private Boolean smartSegmentLoading;
private Set<String> turboLoadingNodes;
private Map<String, String> cloneServers;
private Map<String, Double> tierServerFillThreshold;

public Builder()
{
Expand All @@ -512,7 +525,8 @@ public Builder(
@JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading,
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers,
@JsonProperty("tierServerFillThreshold") @Nullable Map<String, Double> tierServerFillThreshold
)
{
this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
Expand All @@ -533,6 +547,7 @@ public Builder(
this.debugDimensions = debugDimensions;
this.turboLoadingNodes = turboLoadingNodes;
this.cloneServers = cloneServers;
this.tierServerFillThreshold = tierServerFillThreshold;
}

public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
Expand Down Expand Up @@ -631,6 +646,12 @@ public Builder withCloneServers(Map<String, String> cloneServers)
return this;
}

public Builder withTierServerFillThreshold(Map<String, Double> tierServerFillThreshold)
{
this.tierServerFillThreshold = tierServerFillThreshold;
return this;
}

/**
* Builds a CoordinatoryDynamicConfig using either the configured values, or
* the default value if not configured.
Expand Down Expand Up @@ -658,7 +679,8 @@ public CoordinatorDynamicConfig build()
valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING),
debugDimensions,
turboLoadingNodes,
cloneServers
cloneServers,
tierServerFillThreshold
);
}

Expand Down Expand Up @@ -690,7 +712,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()),
valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()),
valueOrDefault(cloneServers, defaults.getCloneServers())
valueOrDefault(cloneServers, defaults.getCloneServers()),
valueOrDefault(tierServerFillThreshold, defaults.getTierServerFillThreshold())
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ public void run()
.withDataSourcesSnapshot(dataSourcesSnapshot)
.withDynamicConfigs(metadataManager.configs().getCurrentDynamicConfig())
.withCompactionConfig(metadataManager.configs().getCurrentCompactionConfig())
.withDefaultServerFillThreshold(config.getDefaultServerFillThreshold())
.build();
dutyGroup.run(params);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public static class Builder
private CoordinatorRunStats stats;
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
private double defaultServerFillThreshold = 1.0;

private Builder()
{
Expand Down Expand Up @@ -288,7 +289,9 @@ private void initSegmentAssignerIfRequired()
druidCluster,
balancerStrategy,
segmentLoadingConfig,
stats
stats,
coordinatorDynamicConfig,
defaultServerFillThreshold
);
}

Expand Down Expand Up @@ -340,6 +343,12 @@ public Builder withDynamicConfigs(CoordinatorDynamicConfig configs)
return this;
}

public Builder withDefaultServerFillThreshold(double defaultServerFillThreshold)
{
this.defaultServerFillThreshold = defaultServerFillThreshold;
return this;
}

public Builder withSegmentLoadingConfig(SegmentLoadingConfig config)
{
this.segmentLoadingConfig = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ public long getAvailableSize()
return getMaxSize() - getSizeUsed();
}

public double getFillFraction()
{
final long maxSize = getMaxSize();
return maxSize > 0 ? (double) getSizeUsed() / maxSize : 0.0;
}

/**
* Checks if the server can load the given segment.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.config;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;

public class CoordinatorSegmentLoadConfigs
{
/**
* Default fill-fraction threshold applied to all tiers with no per-tier override in
* {@link org.apache.druid.server.coordinator.CoordinatorDynamicConfig#getTierServerFillThreshold()}.
* Servers whose fill fraction ({@code sizeUsed / maxSize}) exceeds this value are deprioritized
* for segment assignment. {@code 1.0} means no threshold (current behavior).
*/
@JsonProperty
private final double defaultServerFillThreshold;

@JsonCreator
public CoordinatorSegmentLoadConfigs(
@JsonProperty("defaultServerFillThreshold") Double defaultServerFillThreshold
)
{
this.defaultServerFillThreshold = Configs.valueOrDefault(defaultServerFillThreshold, 1.0);
}

public double getDefaultServerFillThreshold()
{
return defaultServerFillThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,24 @@ public class DruidCoordinatorConfig
private final CoordinatorKillConfigs killConfigs;
private final BalancerStrategyFactory balancerStrategyFactory;
private final HttpLoadQueuePeonConfig httpLoadQueuePeonConfig;
private final CoordinatorSegmentLoadConfigs coordinatorSegmentLoadConfigs;

@Inject
public DruidCoordinatorConfig(
CoordinatorRunConfig runConfig,
CoordinatorPeriodConfig periodConfig,
CoordinatorKillConfigs killConfigs,
BalancerStrategyFactory balancerStrategyFactory,
HttpLoadQueuePeonConfig httpLoadQueuePeonConfig
HttpLoadQueuePeonConfig httpLoadQueuePeonConfig,
CoordinatorSegmentLoadConfigs coordinatorSegmentLoadConfigs
)
{
this.killConfigs = killConfigs;
this.runConfig = runConfig;
this.periodConfig = periodConfig;
this.balancerStrategyFactory = balancerStrategyFactory;
this.httpLoadQueuePeonConfig = httpLoadQueuePeonConfig;
this.coordinatorSegmentLoadConfigs = coordinatorSegmentLoadConfigs;

validateKillConfigs();
}
Expand Down Expand Up @@ -88,6 +91,11 @@ public HttpLoadQueuePeonConfig getHttpLoadQueuePeonConfig()
return httpLoadQueuePeonConfig;
}

public double getDefaultServerFillThreshold()
{
return coordinatorSegmentLoadConfigs.getDefaultServerFillThreshold();
}

private void validateKillConfigs()
{
validateKillConfig(killConfigs.auditLogs(), "audit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator.loading;

import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -48,9 +49,13 @@
public class RoundRobinServerSelector
{
private final Map<String, CircularServerList> tierToServers = new HashMap<>();
private final CoordinatorDynamicConfig dynamicConfig;
private final double defaultServerFillThreshold;

public RoundRobinServerSelector(DruidCluster cluster)
public RoundRobinServerSelector(DruidCluster cluster, CoordinatorDynamicConfig dynamicConfig, double defaultServerFillThreshold)
{
this.dynamicConfig = dynamicConfig;
this.defaultServerFillThreshold = defaultServerFillThreshold;
cluster.getManagedHistoricals().forEach(
(tier, servers) -> tierToServers.put(tier, new CircularServerList(servers))
);
Expand All @@ -67,25 +72,46 @@ public Iterator<ServerHolder> getServersInTierToLoadSegment(String tier, DataSeg
return Collections.emptyIterator();
}

return new EligibleServerIterator(segment, iterator);
return new EligibleServerIterator(
segment,
iterator,
dynamicConfig.getTierServerFillThreshold().getOrDefault(tier, defaultServerFillThreshold)
);
}

/**
* Iterator over servers in a tier that are eligible to load a given segment.
* <p>
* Applies a fill-threshold preference: at construction time, scans all servers
* to determine if any eligible server is below the threshold. If so, only
* below-threshold servers are returned. If none qualify, the threshold is
* relaxed so all eligible servers are returned (fallback to original behavior).
* <p>
* The cursor advances through the circular list across calls so that
* subsequent invocations pick up where the last iterator left off.
*/
private static class EligibleServerIterator implements Iterator<ServerHolder>
{
final CircularServerList delegate;
final DataSegment segment;
final double effectiveFillThreshold;

ServerHolder nextEligible;
int remainingIterations;

EligibleServerIterator(DataSegment segment, CircularServerList delegate)
EligibleServerIterator(DataSegment segment, CircularServerList delegate, double fillThreshold)
{
this.delegate = delegate;
this.segment = segment;
this.remainingIterations = delegate.servers.size();

// Apply the threshold only if at least one eligible server is below it.
// Otherwise fall back: allow any eligible server (preference-with-fallback).
final boolean anyBelowThreshold = delegate.servers.stream()
.anyMatch(s -> s.canLoadSegment(segment)
&& s.getFillFraction() <= fillThreshold);
this.effectiveFillThreshold = anyBelowThreshold ? fillThreshold : Double.POSITIVE_INFINITY;

nextEligible = search();
}

Expand All @@ -112,7 +138,7 @@ ServerHolder search()
{
while (remainingIterations-- > 0) {
ServerHolder nextServer = delegate.peekNext();
if (nextServer.canLoadSegment(segment)) {
if (nextServer.canLoadSegment(segment) && nextServer.getFillFraction() <= effectiveFillThreshold) {
return nextServer;
} else {
delegate.advanceCursor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,28 @@
public class SegmentStatusInTier
{
private final DataSegment segment;
private final double fillThreshold;

private final List<ServerHolder> eligibleLoadServers = new ArrayList<>();
private final List<ServerHolder> aboveThresholdLoadServers = new ArrayList<>();
private final List<ServerHolder> eligibleDropServers = new ArrayList<>();

private final Map<SegmentAction, List<ServerHolder>> serversWithQueuedActions = new HashMap<>();

public SegmentStatusInTier(DataSegment segment, NavigableSet<ServerHolder> historicals)
public SegmentStatusInTier(DataSegment segment, NavigableSet<ServerHolder> historicals, double fillThreshold)
{
this.segment = segment;
this.fillThreshold = fillThreshold;
historicals.forEach(this::handleServer);
}

/**
* Returns servers eligible to load the segment, preferring those below the
* fill threshold. Falls back to above-threshold servers if none qualify.
*/
public List<ServerHolder> getServersEligibleToLoad()
{
return eligibleLoadServers;
return eligibleLoadServers.isEmpty() ? aboveThresholdLoadServers : eligibleLoadServers;
}

public List<ServerHolder> getServersEligibleToDrop()
Expand All @@ -68,7 +75,11 @@ private void handleServer(ServerHolder server)
if (server.isServingSegment(segment)) {
eligibleDropServers.add(server);
} else if (server.canLoadSegment(segment)) {
eligibleLoadServers.add(server);
if (server.getFillFraction() <= fillThreshold) {
eligibleLoadServers.add(server);
} else {
aboveThresholdLoadServers.add(server);
}
} else if (action != null) {
serversWithQueuedActions.computeIfAbsent(action, a -> new ArrayList<>())
.add(server);
Expand Down
Loading
Loading