From 4a8a402c0fc07b4fffe405263fa8387703482d26 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 26 Feb 2026 17:03:50 +0800 Subject: [PATCH 01/10] RATIS-2403. Support leader batch write to improve linearizable follower read throughput --- .../src/site/markdown/configurations.md | 37 ++++- .../ratis/server/RaftServerConfigKeys.java | 33 ++++- .../ratis/server/impl/LeaderStateImpl.java | 130 ++++++++++++++++-- .../ratis/server/impl/PendingRequests.java | 12 ++ .../apache/ratis/LinearizableReadTests.java | 5 +- ...stLinearizableLeaderLeaseReadWithGrpc.java | 5 +- ...adAppliedIndexLeaderLeaseReadWithGrpc.java | 6 +- ...tLinearizableReadAppliedIndexWithGrpc.java | 6 +- ...leReadRepliedIndexLeaderLeaseWithGrpc.java | 29 ++++ ...tLinearizableReadRepliedIndexWithGrpc.java | 29 ++++ .../grpc/TestLinearizableReadWithGrpc.java | 5 +- 11 files changed, 267 insertions(+), 30 deletions(-) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 67e988348c..3d59c6b9f4 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti ### Read Index - Configurations related to ReadIndex used in linearizable read -| **Property** | `raft.server.read.read-index.applied-index.enabled` | -|:----------------|:----------------------------------------------------------------------| -| **Description** | whether applied index (instead of commit index) is used for ReadIndex | -| **Type** | boolean | -| **Default** | false | +| **Property** | `raft.server.read.read-index.type` | +|:----------------|:-----------------------------------------------------------------------------| +| **Description** | type of read index returned | +| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` | +| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` | + +* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4) + * The safest type as it is specified in the Raft dissertation + * This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations. + +* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex + * Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex + * This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex + * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high. + +* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex + * RepliedIndex is defined as the AppliedIndex of the last write request replied by the leader. + * Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`. + * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. + * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. + * There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write. + * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied requests. + * If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX` + +Note that theoretically all the ReadIndex types still guarantee linearizability, +but there are tradeoffs (e.g. Write and Read performance) between different types. + +| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | +|:----------------|:---------------------------------------------------------------------------------------------------------------------------------------------| +| **Description** | if `Read.ReadIndex.Type` is `REAPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | +| **Type** | TimeDuration | +| **Default** | 10ms | | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------| diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index ef16f67f67..2d55594782 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration interface ReadIndex { String PREFIX = Read.PREFIX + ".read-index"; - String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled"; - boolean APPLIED_INDEX_ENABLED_DEFAULT = false; - static boolean appliedIndexEnabled(RaftProperties properties) { - return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY, - APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog()); + enum Type { + /** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */ + COMMIT_INDEX, + + /** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */ + APPLIED_INDEX, + + /** ReadIndex returns leader's repliedIndex, the index of the last replied request. */ + REPLIED_INDEX + } + + String TYPE_KEY = PREFIX + ".type"; + Type TYPE_DEFAULT = Type.COMMIT_INDEX; + static Type type(RaftProperties properties) { + return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog()); + } + static void setType(RaftProperties properties, Type type) { + set(properties::setEnum, TYPE_KEY, type); } - static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) { - setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled); + String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval"; + TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS); + static TimeDuration repliedIndexBatchInterval(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()), + REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog()); + } + static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) { + setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ef0bb6b700..36291354d6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -39,6 +39,7 @@ import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener; import org.apache.ratis.server.leader.FollowerInfo; import org.apache.ratis.server.leader.LeaderState; @@ -68,6 +69,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -80,8 +82,11 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -224,6 +229,19 @@ CompletableFuture stopAll() { } } + /** A write reply that has been built but not yet sent to the client */ + private static class HeldReply { + private final PendingRequest pending; + private final RaftClientReply reply; + private final long index; + + HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + this.pending = pending; + this.reply = reply; + this.index = index; + } + } + /** For caching {@link FollowerInfo}s. This class is immutable. */ static class CurrentOldFollowerInfos { private final RaftConfigurationImpl conf; @@ -353,10 +371,21 @@ boolean isApplied() { private final PendingStepDown pendingStepDown; private final ReadIndexHeartbeats readIndexHeartbeats; - private final boolean readIndexAppliedIndexEnabled; + private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; + private final Supplier readIndexSupplier; + private final MemoizedSupplier readIndexLogPrefixSupplier; private final boolean leaderHeartbeatCheckEnabled; private final LeaderLease lease; + /** The interval at which held write replies are flushed. */ + private final TimeDuration repliedIndexBatchInterval; + /** The highest log index for which a write reply has been flushed (sent to the client). */ + private final AtomicLong repliedIndex; + /** Buffer holding write replies waiting to be flushed. Guarded by itself. */ + private final AtomicReference> heldReplies; + /** Daemon thread that periodically flushes held replies. */ + private volatile Daemon replyFlusher; + LeaderStateImpl(RaftServerImpl server) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); this.server = server; @@ -391,8 +420,33 @@ boolean isApplied() { } else { this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); } - this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex - .appliedIndexEnabled(properties); + this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); + + this.repliedIndexBatchInterval = + RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); + this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); + this.heldReplies = new AtomicReference<>(new LinkedList<>()); + + switch (readIndexType) { + case REPLIED_INDEX: + readIndexSupplier = repliedIndex::get; + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); + final Daemon flusher = Daemon.newBuilder() + .setName(name + "-ReplyFlusher") + .setRunnable(this::runReplyFlusher) + .build(); + this.replyFlusher = flusher; + flusher.start(); + break; + case APPLIED_INDEX: + readIndexSupplier = () -> server.getState().getLastAppliedIndex(); + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "applied"); + break; + case COMMIT_INDEX: + default: + readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex(); + readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "commit"); + } this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read .leaderHeartbeatCheckEnabled(properties); @@ -1140,14 +1194,13 @@ public boolean checkLeadership() { /** * Obtain the current readIndex for read only requests. See Raft paper section 6.4. * 1. Leader makes sure at least one log from current term is committed. - * 2. Leader record last committed index or applied index (depending on configuration) as readIndex. + * 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex. * 3. Leader broadcast heartbeats to followers and waits for acknowledgements. * 4. If majority respond success, returns readIndex. * @return current readIndex. */ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { - final long index = readIndexAppliedIndexEnabled ? - server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex(); + final long index = readIndexSupplier.get(); final long readIndex; if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) { readIndex = readAfterWriteConsistentIndex; @@ -1155,7 +1208,7 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { readIndex = index; } LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})", - readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit", + readIndex, readIndexLogPrefixSupplier.get(), index, readAfterWriteConsistentIndex); // if group contains only one member, fast path @@ -1218,9 +1271,70 @@ private boolean checkLeaderLease() { } void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { - pendingRequests.replyPendingRequest(termIndex, reply); + if (readIndexType == Type.REPLIED_INDEX) { + // Remove from pending map but hold the reply for batch flushing. + final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); + if (pending != null) { + holdReply(pending, reply, termIndex.getIndex()); + } + } else { + pendingRequests.replyPendingRequest(termIndex, reply); + } + } + + /** Hold a write reply for later batch flushing. */ + private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { + heldReplies.getAndUpdate(prev -> { + prev.add(new HeldReply(pending, reply, index)); + return prev; + }); } + /** Flush all held replies and advance {@link #repliedIndex}. */ + private void flushReplies() { + if (heldReplies.get().isEmpty()) { + return; + } + final List toFlush = heldReplies.getAndSet(new LinkedList<>()); + + long maxIndex = repliedIndex.get(); + for (HeldReply held : toFlush) { + held.pending.setReply(held.reply); + maxIndex = Math.max(maxIndex, held.index); + } + repliedIndex.set(maxIndex); + LOG.debug("{}: flushed {} replies, repliedIndex={}", name, toFlush.size(), maxIndex); + } + + /** The reply flusher daemon loop. */ + private void runReplyFlusher() { + while (isRunning()) { + try { + Thread.sleep(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + flushReplies(); + } + // Flush remaining on exit. + flushReplies(); + } + + /** Stop the reply flusher daemon. */ + private void stopReplyFlusher() { + final Daemon flusher = this.replyFlusher; + if (flusher != null) { + flusher.interrupt(); + try { + flusher.join(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS) * 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequests.getTransactionContext(termIndex); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index c6a9dd2794..6eb059a0a9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -272,6 +272,18 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { } } + /** + * Remove the {@link PendingRequest} for the given {@link TermIndex} without sending a reply. + * @return the removed {@link PendingRequest}, or null if not found. + */ + PendingRequest removePendingRequest(TermIndex termIndex) { + final PendingRequest pending = pendingRequests.remove(termIndex); + if (pending != null) { + Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex"); + } + return pending; + } + /** * The leader state is stopped. Send NotLeaderException to all the pending * requests since they have not got applied to the state machine yet. diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index b15ae3067f..07529d1050 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -27,6 +27,7 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; @@ -60,7 +61,7 @@ public abstract class LinearizableReadTests public abstract boolean isLeaderLeaseEnabled(); - public abstract boolean readIndexAppliedIndexEnabled(); + public abstract Type readIndexType(); public abstract void assertRaftProperties(RaftProperties properties); @@ -77,7 +78,7 @@ public void setup() { CounterStateMachine.setProperties(p); RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE); RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled()); - RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, readIndexAppliedIndexEnabled()); + RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType()); } @Test diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java index d637498d73..f17686e109 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableLeaderLeaseReadWithGrpc.java @@ -20,6 +20,7 @@ import org.apache.ratis.LinearizableReadTests; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; @@ -35,8 +36,8 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; + public Type readIndexType() { + return Type.COMMIT_INDEX; } @Override diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java index 9bf3e307be..3705fb3ffc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexLeaderLeaseReadWithGrpc extends TestLinearizableLeaderLeaseReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java index c019aac166..b119f32a6f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadAppliedIndexWithGrpc.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + public class TestLinearizableReadAppliedIndexWithGrpc extends TestLinearizableReadWithGrpc { @Override - public boolean readIndexAppliedIndexEnabled() { - return true; + public Type readIndexType() { + return Type.APPLIED_INDEX; } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java new file mode 100644 index 0000000000..92b158c56c --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + +public class TestLinearizableReadRepliedIndexLeaderLeaseWithGrpc + extends TestLinearizableLeaderLeaseReadWithGrpc { + + @Override + public Type readIndexType() { + return Type.REPLIED_INDEX; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java new file mode 100644 index 0000000000..f13ce82120 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadRepliedIndexWithGrpc.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; + +public class TestLinearizableReadRepliedIndexWithGrpc + extends TestLinearizableReadWithGrpc { + + @Override + public Type readIndexType() { + return Type.REPLIED_INDEX; + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java index 3e8860dd19..ce12050b19 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLinearizableReadWithGrpc.java @@ -20,6 +20,7 @@ import org.apache.ratis.LinearizableReadTests; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type; import static org.apache.ratis.ReadOnlyRequestTests.assertOption; import static org.apache.ratis.server.RaftServerConfigKeys.Read.Option.LINEARIZABLE; @@ -35,8 +36,8 @@ public boolean isLeaderLeaseEnabled() { } @Override - public boolean readIndexAppliedIndexEnabled() { - return false; + public Type readIndexType() { + return Type.COMMIT_INDEX; } @Override From 534240e0631906944d7e69c6ccd739aca622f22e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 26 Feb 2026 18:31:04 +0800 Subject: [PATCH 02/10] Move start daemon from constructor to start method to prevent race Generated-by: Cursor --- .../org/apache/ratis/server/impl/LeaderStateImpl.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 36291354d6..1b37435437 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -431,12 +431,10 @@ boolean isApplied() { case REPLIED_INDEX: readIndexSupplier = repliedIndex::get; readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); - final Daemon flusher = Daemon.newBuilder() + this.replyFlusher = Daemon.newBuilder() .setName(name + "-ReplyFlusher") .setRunnable(this::runReplyFlusher) .build(); - this.replyFlusher = flusher; - flusher.start(); break; case APPLIED_INDEX: readIndexSupplier = () -> server.getState().getLastAppliedIndex(); @@ -473,6 +471,10 @@ void start() { startupLogEntry.get(); processor.start(); senders.forEach(LogAppender::start); + + if (replyFlusher != null) { + replyFlusher.start(); + } } boolean isReady() { From 5898f390f0604cf26798cd15539b66b8b14f9aab Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 27 Feb 2026 16:10:35 +0800 Subject: [PATCH 03/10] Try to adapt test to the new REPLIED_INDEX guarantee. Generated-by: Cursor --- .../apache/ratis/LinearizableReadTests.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 07529d1050..63514ea602 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -144,10 +144,12 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); + final Type type = readIndexType(); + runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); } - static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { + static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) + throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); @@ -170,8 +172,17 @@ static void runTestFollowerReadOnlyParallel(C cluste writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + if (readIndexType == Type.REPLIED_INDEX) { + // With REPLIED_INDEX the read index only advances after the leader has applied the + // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in + // the state machine but we only waited 100 ms, so its reply has not been generated + // yet and the follower read may only see the preceding sync INCREMENT (count - 1). + assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); + } else { + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); + } } for (int i = 0; i < n; i++) { From 7f58060eae4314ae9a47a4153e803ca7cb83584c Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 27 Feb 2026 16:49:17 +0800 Subject: [PATCH 04/10] Revert AtomicReference since it's not atomic and use synchronized list swap, add stopReplyFlusher to stop Generated-by: Cursor --- .../ratis/server/impl/LeaderStateImpl.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1b37435437..211c0938fe 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -69,7 +69,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,7 +82,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; + import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; @@ -381,8 +380,10 @@ boolean isApplied() { private final TimeDuration repliedIndexBatchInterval; /** The highest log index for which a write reply has been flushed (sent to the client). */ private final AtomicLong repliedIndex; - /** Buffer holding write replies waiting to be flushed. Guarded by itself. */ - private final AtomicReference> heldReplies; + /** Guards {@link #heldReplies}. */ + private final Object heldRepliesLock = new Object(); + /** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */ + private List heldReplies = new ArrayList<>(); /** Daemon thread that periodically flushes held replies. */ private volatile Daemon replyFlusher; @@ -425,7 +426,6 @@ boolean isApplied() { this.repliedIndexBatchInterval = RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); - this.heldReplies = new AtomicReference<>(new LinkedList<>()); switch (readIndexType) { case REPLIED_INDEX: @@ -509,6 +509,7 @@ CompletableFuture stop() { startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( new ReadIndexException("failed to obtain read index since: ", nle)); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); + stopReplyFlusher(); logAppenderMetrics.unregister(); raftServerMetrics.unregister(); pendingRequests.close(); @@ -1286,18 +1287,21 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { /** Hold a write reply for later batch flushing. */ private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { - heldReplies.getAndUpdate(prev -> { - prev.add(new HeldReply(pending, reply, index)); - return prev; - }); + synchronized (heldRepliesLock) { + heldReplies.add(new HeldReply(pending, reply, index)); + } } /** Flush all held replies and advance {@link #repliedIndex}. */ private void flushReplies() { - if (heldReplies.get().isEmpty()) { - return; + final List toFlush; + synchronized (heldRepliesLock) { + if (heldReplies.isEmpty()) { + return; + } + toFlush = heldReplies; + heldReplies = new ArrayList<>(); } - final List toFlush = heldReplies.getAndSet(new LinkedList<>()); long maxIndex = repliedIndex.get(); for (HeldReply held : toFlush) { From 36f8903810de0be1d8f2bfeb8c67dbb8219d1c8f Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:31:55 +0800 Subject: [PATCH 05/10] Introduce ReplyFlusher and update log --- .../ratis/server/impl/LeaderStateImpl.java | 110 ++------------ .../ratis/server/impl/ReplyFlusher.java | 143 ++++++++++++++++++ 2 files changed, 154 insertions(+), 99 deletions(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 211c0938fe..8d3c07de8b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -81,7 +81,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -228,19 +227,6 @@ CompletableFuture stopAll() { } } - /** A write reply that has been built but not yet sent to the client */ - private static class HeldReply { - private final PendingRequest pending; - private final RaftClientReply reply; - private final long index; - - HeldReply(PendingRequest pending, RaftClientReply reply, long index) { - this.pending = pending; - this.reply = reply; - this.index = index; - } - } - /** For caching {@link FollowerInfo}s. This class is immutable. */ static class CurrentOldFollowerInfos { private final RaftConfigurationImpl conf; @@ -372,20 +358,10 @@ boolean isApplied() { private final ReadIndexHeartbeats readIndexHeartbeats; private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; private final Supplier readIndexSupplier; - private final MemoizedSupplier readIndexLogPrefixSupplier; private final boolean leaderHeartbeatCheckEnabled; private final LeaderLease lease; - /** The interval at which held write replies are flushed. */ - private final TimeDuration repliedIndexBatchInterval; - /** The highest log index for which a write reply has been flushed (sent to the client). */ - private final AtomicLong repliedIndex; - /** Guards {@link #heldReplies}. */ - private final Object heldRepliesLock = new Object(); - /** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */ - private List heldReplies = new ArrayList<>(); - /** Daemon thread that periodically flushes held replies. */ - private volatile Daemon replyFlusher; + private ReplyFlusher replyFlusher; LeaderStateImpl(RaftServerImpl server) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()); @@ -421,29 +397,20 @@ boolean isApplied() { } else { this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests); } - this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); - - this.repliedIndexBatchInterval = - RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties); - this.repliedIndex = new AtomicLong(state.getLastAppliedIndex()); + this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties); switch (readIndexType) { case REPLIED_INDEX: - readIndexSupplier = repliedIndex::get; - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "replied"); - this.replyFlusher = Daemon.newBuilder() - .setName(name + "-ReplyFlusher") - .setRunnable(this::runReplyFlusher) - .build(); + this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), + RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); + readIndexSupplier = replyFlusher::getRepliedIndex; break; case APPLIED_INDEX: readIndexSupplier = () -> server.getState().getLastAppliedIndex(); - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "applied"); break; case COMMIT_INDEX: default: readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex(); - readIndexLogPrefixSupplier = MemoizedSupplier.valueOf(() -> "commit"); } this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read .leaderHeartbeatCheckEnabled(properties); @@ -509,7 +476,9 @@ CompletableFuture stop() { startupLogEntry.get().getAppliedIndexFuture().completeExceptionally( new ReadIndexException("failed to obtain read index since: ", nle)); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); - stopReplyFlusher(); + if (replyFlusher != null) { + replyFlusher.stop(); + } logAppenderMetrics.unregister(); raftServerMetrics.unregister(); pendingRequests.close(); @@ -1210,9 +1179,8 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { } else { readIndex = index; } - LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})", - readIndex, readIndexLogPrefixSupplier.get(), - index, readAfterWriteConsistentIndex); + LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})", + readIndex, readIndexType, index, readAfterWriteConsistentIndex); // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { @@ -1278,69 +1246,13 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { // Remove from pending map but hold the reply for batch flushing. final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); if (pending != null) { - holdReply(pending, reply, termIndex.getIndex()); + replyFlusher.hold(pending, reply, termIndex.getIndex()); } } else { pendingRequests.replyPendingRequest(termIndex, reply); } } - /** Hold a write reply for later batch flushing. */ - private void holdReply(PendingRequest pending, RaftClientReply reply, long index) { - synchronized (heldRepliesLock) { - heldReplies.add(new HeldReply(pending, reply, index)); - } - } - - /** Flush all held replies and advance {@link #repliedIndex}. */ - private void flushReplies() { - final List toFlush; - synchronized (heldRepliesLock) { - if (heldReplies.isEmpty()) { - return; - } - toFlush = heldReplies; - heldReplies = new ArrayList<>(); - } - - long maxIndex = repliedIndex.get(); - for (HeldReply held : toFlush) { - held.pending.setReply(held.reply); - maxIndex = Math.max(maxIndex, held.index); - } - repliedIndex.set(maxIndex); - LOG.debug("{}: flushed {} replies, repliedIndex={}", name, toFlush.size(), maxIndex); - } - - /** The reply flusher daemon loop. */ - private void runReplyFlusher() { - while (isRunning()) { - try { - Thread.sleep(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - flushReplies(); - } - // Flush remaining on exit. - flushReplies(); - } - - /** Stop the reply flusher daemon. */ - private void stopReplyFlusher() { - final Daemon flusher = this.replyFlusher; - if (flusher != null) { - flusher.interrupt(); - try { - flusher.join(repliedIndexBatchInterval.toLong(TimeUnit.MILLISECONDS) * 2); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - TransactionContext getTransactionContext(TermIndex termIndex) { return pendingRequests.getTransactionContext(termIndex); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java new file mode 100644 index 0000000000..61a3429e45 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.server.raftlog.RaftLogIndex; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.concurrent.TimeUnit; + +/** + * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. + */ +public class ReplyFlusher { + static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); + + /** A write reply that has been built but not yet sent to the client */ + static class HeldReply { + private final PendingRequest pending; + private final RaftClientReply reply; + private final long index; + + HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + this.pending = pending; + this.reply = reply; + this.index = index; + } + + long release() { + pending.setReply(reply); + return index; + } + } + + static class Replies { + private LinkedList list = new LinkedList<>(); + + synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { + list.add(new HeldReply(pending, reply, index)); + } + + synchronized LinkedList getAndSetNewList() { + final LinkedList old = list; + list = new LinkedList<>(); + return old; + } + } + + private final String name; + private final LifeCycle lifeCycle; + private final Daemon daemon; + private Replies replies = new Replies(); + private final RaftLogIndex repliedIndex; + /** The interval at which held write replies are flushed. */ + private final TimeDuration batchInterval; + + ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { + this.name = name = "-ReplyFlusher"; + this.lifeCycle = new LifeCycle(this.name); + this.daemon = Daemon.newBuilder() + .setName(this.name) + .setRunnable(this::run) + .build(); + this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); + this.batchInterval = batchInterval; + } + + long getRepliedIndex() { + return repliedIndex.get(); + } + + /** Hold a write reply for later batch flushing */ + void hold(PendingRequest pending, RaftClientReply reply, long index) { + replies.add(pending, reply, index); + } + + void start() { + lifeCycle.startAndTransition(daemon::start); + } + + /** The reply flusher daemon loop. */ + private void run() { + try { + while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) { + try { + Thread.sleep(batchInterval.toLong(TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + flush(); + } + } finally { + // Flush remaining on exit + flush(); + } + } + + /** Flush all held replies and advance {@link #repliedIndex}. */ + private void flush() { + final LinkedList toFlush = replies.getAndSetNewList(); + if (toFlush.isEmpty()) { + return; + } + long maxIndex = toFlush.removeLast().release(); + for (HeldReply held : toFlush) { + maxIndex = Math.max(maxIndex, held.release()); + } + repliedIndex.updateToMax(maxIndex, s -> + LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); + } + + /** Stop the reply flusher daemon. */ + void stop() { + lifeCycle.checkStateAndClose(); + daemon.interrupt(); + try { + daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} From bfc51822dc0f2dd4f5b3b51e4d43a58d193a90e7 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:34:41 +0800 Subject: [PATCH 06/10] Remove unnecessary blank line --- ratis-docs/src/site/markdown/configurations.md | 2 +- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 3d59c6b9f4..31c66293fd 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -241,7 +241,7 @@ if it fails to receive any RPC responses from this peer within this specified ti * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. * There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write. - * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied requests. + * RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied request. * If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX` Note that theoretically all the ReadIndex types still guarantee linearizability, diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 8d3c07de8b..6a9d96bbc7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -81,7 +81,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; From 589117b65243ed3ae3def4d1ef7916474faff513 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 1 Mar 2026 20:38:18 +0800 Subject: [PATCH 07/10] Fix findbugs --- .../main/java/org/apache/ratis/server/impl/ReplyFlusher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 61a3429e45..6a6e922f89 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -75,7 +75,7 @@ synchronized LinkedList getAndSetNewList() { private final TimeDuration batchInterval; ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { - this.name = name = "-ReplyFlusher"; + this.name = name + "-ReplyFlusher"; this.lifeCycle = new LifeCycle(this.name); this.daemon = Daemon.newBuilder() .setName(this.name) From f1a2bae32dd2739177078f5ac0c32cebfaba9def Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:05:20 +0800 Subject: [PATCH 08/10] Use appliedIndex during flush instead --- .../ratis/server/impl/LeaderStateImpl.java | 3 +- .../ratis/server/impl/ReplyFlusher.java | 36 +++++++++---------- .../apache/ratis/LinearizableReadTests.java | 19 +++------- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 6a9d96bbc7..a3613f0796 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -401,6 +401,7 @@ boolean isApplied() { switch (readIndexType) { case REPLIED_INDEX: this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(), + () -> server.getState().getLastAppliedIndex(), RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties)); readIndexSupplier = replyFlusher::getRepliedIndex; break; @@ -1245,7 +1246,7 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) { // Remove from pending map but hold the reply for batch flushing. final PendingRequest pending = pendingRequests.removePendingRequest(termIndex); if (pending != null) { - replyFlusher.hold(pending, reply, termIndex.getIndex()); + replyFlusher.hold(pending, reply); } } else { pendingRequests.replyPendingRequest(termIndex, reply); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index 6a6e922f89..b4c37dac9f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; /** * Implements the reply flush logic as part of the leader batch write when RepliedIndex is used. @@ -34,29 +35,26 @@ public class ReplyFlusher { static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class); - /** A write reply that has been built but not yet sent to the client */ + /** A write reply that has been built but not yet sent to the client. */ static class HeldReply { private final PendingRequest pending; private final RaftClientReply reply; - private final long index; - HeldReply(PendingRequest pending, RaftClientReply reply, long index) { + HeldReply(PendingRequest pending, RaftClientReply reply) { this.pending = pending; this.reply = reply; - this.index = index; } - long release() { + void release() { pending.setReply(reply); - return index; } } static class Replies { private LinkedList list = new LinkedList<>(); - synchronized void add(PendingRequest pending, RaftClientReply reply, long index) { - list.add(new HeldReply(pending, reply, index)); + synchronized void add(PendingRequest pending, RaftClientReply reply) { + list.add(new HeldReply(pending, reply)); } synchronized LinkedList getAndSetNewList() { @@ -71,10 +69,12 @@ synchronized LinkedList getAndSetNewList() { private final Daemon daemon; private Replies replies = new Replies(); private final RaftLogIndex repliedIndex; + /** Supplies the last applied index from the state machine. */ + private final LongSupplier appliedIndexSupplier; /** The interval at which held write replies are flushed. */ private final TimeDuration batchInterval; - ReplyFlusher(String name, long repliedIndex, TimeDuration batchInterval) { + ReplyFlusher(String name, long repliedIndex, LongSupplier appliedIndexSupplier, TimeDuration batchInterval) { this.name = name + "-ReplyFlusher"; this.lifeCycle = new LifeCycle(this.name); this.daemon = Daemon.newBuilder() @@ -82,6 +82,7 @@ synchronized LinkedList getAndSetNewList() { .setRunnable(this::run) .build(); this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex); + this.appliedIndexSupplier = appliedIndexSupplier; this.batchInterval = batchInterval; } @@ -89,9 +90,9 @@ long getRepliedIndex() { return repliedIndex.get(); } - /** Hold a write reply for later batch flushing */ - void hold(PendingRequest pending, RaftClientReply reply, long index) { - replies.add(pending, reply, index); + /** Hold a write reply for later batch flushing. */ + void hold(PendingRequest pending, RaftClientReply reply) { + replies.add(pending, reply); } void start() { @@ -116,17 +117,14 @@ private void run() { } } - /** Flush all held replies and advance {@link #repliedIndex}. */ + /** Flush all held replies and advance {@link #repliedIndex} to the applied index. */ private void flush() { final LinkedList toFlush = replies.getAndSetNewList(); - if (toFlush.isEmpty()) { - return; - } - long maxIndex = toFlush.removeLast().release(); for (HeldReply held : toFlush) { - maxIndex = Math.max(maxIndex, held.release()); + held.release(); } - repliedIndex.updateToMax(maxIndex, s -> + final long appliedIndex = appliedIndexSupplier.getAsLong(); + repliedIndex.updateToMax(appliedIndex, s -> LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java index 63514ea602..07529d1050 100644 --- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java @@ -144,12 +144,10 @@ static void runTestFollowerLinearizableRead(C cluste @Test public void testFollowerLinearizableReadParallel() throws Exception { - final Type type = readIndexType(); - runWithNewCluster(cluster -> runTestFollowerReadOnlyParallel(type, cluster)); + runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel); } - static void runTestFollowerReadOnlyParallel(Type readIndexType, C cluster) - throws Exception { + static void runTestFollowerReadOnlyParallel(C cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); final List followers = cluster.getFollowers(); @@ -172,17 +170,8 @@ static void runTestFollowerReadOnlyParallel(Type rea writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT))); Thread.sleep(100); - if (readIndexType == Type.REPLIED_INDEX) { - // With REPLIED_INDEX the read index only advances after the leader has applied the - // transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in - // the state machine but we only waited 100 ms, so its reply has not been generated - // yet and the follower read may only see the preceding sync INCREMENT (count - 1). - assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1))); - } else { - assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); - f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); - } + assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0)); + f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1))); } for (int i = 0; i < n; i++) { From e595eab517d7bdb74c50c4d084b2fca89f90ea30 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:56:23 +0800 Subject: [PATCH 09/10] Fix replyFlusher never run issue --- .../java/org/apache/ratis/server/impl/ReplyFlusher.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java index b4c37dac9f..c98987db7c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReplyFlusher.java @@ -96,7 +96,11 @@ void hold(PendingRequest pending, RaftClientReply reply) { } void start() { - lifeCycle.startAndTransition(daemon::start); + lifeCycle.transition(LifeCycle.State.STARTING); + // We need to transition to RUNNING first so that ReplyFlusher#run always + // see that the lifecycle state is in RUNNING state. + lifeCycle.transition(LifeCycle.State.RUNNING); + daemon.start(); } /** The reply flusher daemon loop. */ From 60380eea78eb3710cf761c7d931a5b37fded4274 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 2 Mar 2026 12:58:11 +0800 Subject: [PATCH 10/10] Update documentation --- ratis-docs/src/site/markdown/configurations.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 31c66293fd..f5189ed862 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -236,7 +236,7 @@ if it fails to receive any RPC responses from this peer within this specified ti * This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high. * `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex - * RepliedIndex is defined as the AppliedIndex of the last write request replied by the leader. + * RepliedIndex is defined as the last AppliedIndex of the leader when returning the last batch. * Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`. * This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives. * This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice. @@ -247,11 +247,11 @@ if it fails to receive any RPC responses from this peer within this specified ti Note that theoretically all the ReadIndex types still guarantee linearizability, but there are tradeoffs (e.g. Write and Read performance) between different types. -| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | -|:----------------|:---------------------------------------------------------------------------------------------------------------------------------------------| -| **Description** | if `Read.ReadIndex.Type` is `REAPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | -| **Type** | TimeDuration | -| **Default** | 10ms | +| **Property** | `raft.server.read.read-index.replied-index.batch-interval` | +|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------| +| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced | +| **Type** | TimeDuration | +| **Default** | 10ms | | **Property** | `raft.server.read.leader.heartbeat-check.enabled` | |:----------------|:--------------------------------------------------|