From 38f0ac76c15cf4a30d506ff586b6362d91668473 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Thu, 2 Apr 2026 06:56:41 +0200 Subject: [PATCH 1/3] CASSANALYTICS-147: BufferingInputStream fails to read last unaligned chunk --- .../utils/streaming/BufferingInputStream.java | 4 +- .../utils/BufferingInputStreamTests.java | 76 ++++++++++++++++++- .../io/util/CdcRandomAccessReaderTest.java | 6 +- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java index ecfdb82df..6e8eab681 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java @@ -199,12 +199,12 @@ private void requestMore() } long chunkSize = rangeStart == 0 ? source.headerChunkSize() : source.chunkBufferSize(); - long rangeEnd = Math.min(source.size(), rangeStart + chunkSize); + long rangeEnd = Math.min(source.size(), rangeStart + chunkSize) - 1; // start and end range are inclusive if (rangeEnd >= rangeStart) { activeRequest = true; source.request(rangeStart, rangeEnd, this); - rangeStart += chunkSize + 1; // Increment range start pointer for next request + rangeStart += chunkSize; // Increment range start pointer for next request } else { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java index 1b0aace30..d22251a64 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java @@ -20,7 +20,10 @@ package org.apache.cassandra.spark.utils; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -33,6 +36,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.mutable.MutableInt; import org.junit.jupiter.api.Test; @@ -289,7 +293,7 @@ public Duration timeout() }; int bytesToRead = chunkSize * numChunks; - long skipAhead = size - bytesToRead + 1; + long skipAhead = size - bytesToRead; try (BufferingInputStream stream = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats())) { // Skip ahead so we only read the final chunks @@ -346,6 +350,76 @@ public Duration timeout() } } + @Test + public void testUnalignedEndReading() throws IOException + { + int dataSize = 2527234; + int chunkSize = 4096; + int remainingReadBytes = 2; + List returnedBuffers = new ArrayList<>(); + CassandraFileSource source = new CassandraFileSource() + { + @Override + public void request(long start, long end, StreamConsumer consumer) + { + byte[] bytes = RandomUtils.randomBytes((int) (end - start + 1)); + StreamBuffer buffer = StreamBuffer.wrap(bytes); + returnedBuffers.add(bytes); + consumer.onRead(buffer); + consumer.onEnd(); + } + + @Override + public SSTable cassandraFile() + { + return null; + } + + @Override + public FileType fileType() + { + return FileType.PARTITIONS_INDEX; + } + + @Override + public long size() + { + return dataSize; + } + + @Override + @Nullable + public Duration timeout() + { + return Duration.ofSeconds(5); + } + + @Override + public long chunkBufferSize() + { + return chunkSize; + } + }; + + try (BufferingInputStream stream1 = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats())) + { + // move left from the file end by (chunkSize + remainingReadBytes) + try (BufferingInputStream stream2 = stream1.reBuffer(dataSize - chunkSize - remainingReadBytes)) + { + ByteBuffer buffer = ByteBuffer.allocate(chunkSize); + int read = stream2.read(buffer); // read last full chunk + assertThat(read).isEqualTo(chunkSize); + assertThat(buffer.array()).isEqualTo(returnedBuffers.get(0)); + + buffer.position(0); + buffer.limit(remainingReadBytes); + read = stream2.read(buffer); // read remaining bytes + assertThat(read).isEqualTo(remainingReadBytes); + assertThat(ArrayUtils.subarray(buffer.array(), 0, remainingReadBytes)).isEqualTo(returnedBuffers.get(1)); + } + } + } + // Utils private static ImmutableList randomBuffers(int count) diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java index 28a9f902c..b9c28d9bc 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/io/util/CdcRandomAccessReaderTest.java @@ -92,9 +92,9 @@ public void testCDCRebufferSequentialReading() throws IOException long position = call.start; // Deliver data in chunks until request is fulfilled - while (position < actualEnd) + while (position <= actualEnd) // range boundaries are inclusive { - int chunkSize = (int) Math.min(actualEnd - position, bufferSize); + int chunkSize = (int) Math.min(actualEnd - position + 1, bufferSize); Buffer data = Buffer.buffer(); for (int i = 0; i < chunkSize; i++) { @@ -157,7 +157,7 @@ public void testCDCRebufferBackwardSeek() throws IOException // Backward seek path has a bug: requests buffer.remaining() + 1 bytes // We cap delivery at buffer capacity to work around this and test flip() behavior long actualEnd = Math.min(call.end, testCommitLog.maxOffset()); - long requestedBytes = actualEnd - call.start; + long requestedBytes = actualEnd - call.start + 1; // range boundaries are inclusive long position = call.start; // Cap delivery to buffer capacity to avoid BufferOverflowException from + 1 bug From 68f3b13e0a83239b80af30fc6eb10c3649023bc8 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 3 Apr 2026 09:28:00 +0200 Subject: [PATCH 2/3] Apply review comments --- .../org/apache/cassandra/spark/data/FileSystemSource.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSource.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSource.java index f18a3be3d..5fc736e6e 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSource.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSource.java @@ -91,12 +91,11 @@ public ExecutorService executor() public void request(long start, long end, StreamConsumer consumer) { executor().submit(() -> { - boolean close = length <= end; + boolean close = length <= end + 1; try { - // Start-end range is inclusive but on the final request end == length so we need to exclude - int increment = close ? 0 : 1; - byte[] bytes = new byte[(int) (end - start + increment)]; + // start-end range is inclusive + byte[] bytes = new byte[(int) (end - start + 1)]; if (file.getChannel().read(ByteBuffer.wrap(bytes), start) >= 0) { consumer.onRead(StreamBuffer.wrap(bytes)); From ffa112ca0158d54af55a3e233510d65e99024b3f Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 3 Apr 2026 09:29:30 +0200 Subject: [PATCH 3/3] Apply review comments --- .../apache/cassandra/spark/utils/BufferingInputStreamTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java index d22251a64..7489b26da 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java @@ -408,6 +408,7 @@ public long chunkBufferSize() { ByteBuffer buffer = ByteBuffer.allocate(chunkSize); int read = stream2.read(buffer); // read last full chunk + assertThat(returnedBuffers).hasSize(2); assertThat(read).isEqualTo(chunkSize); assertThat(buffer.array()).isEqualTo(returnedBuffers.get(0));