Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ public ExecutorService executor()
public void request(long start, long end, StreamConsumer consumer)
{
executor().submit(() -> {
boolean close = length <= end;
boolean close = length <= end + 1;
try
{
// Start-end range is inclusive but on the final request end == length so we need to exclude
int increment = close ? 0 : 1;
byte[] bytes = new byte[(int) (end - start + increment)];
// start-end range is inclusive
byte[] bytes = new byte[(int) (end - start + 1)];
if (file.getChannel().read(ByteBuffer.wrap(bytes), start) >= 0)
{
consumer.onRead(StreamBuffer.wrap(bytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ private void requestMore()
}

long chunkSize = rangeStart == 0 ? source.headerChunkSize() : source.chunkBufferSize();
long rangeEnd = Math.min(source.size(), rangeStart + chunkSize);
long rangeEnd = Math.min(source.size(), rangeStart + chunkSize) - 1; // start and end range are inclusive
if (rangeEnd >= rangeStart)
{
activeRequest = true;
source.request(rangeStart, rangeEnd, this);
rangeStart += chunkSize + 1; // Increment range start pointer for next request
rangeStart += chunkSize; // Increment range start pointer for next request
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.cassandra.spark.utils;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -33,6 +36,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -289,7 +293,7 @@ public Duration timeout()
};

int bytesToRead = chunkSize * numChunks;
long skipAhead = size - bytesToRead + 1;
long skipAhead = size - bytesToRead;
Copy link
Copy Markdown
Member Author

@lukasz-antoniak lukasz-antoniak Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how effective the change in BufferingInputStream would affect skip() used during BIG index reading. All integration tests pass though, and I think hereby unit test is just a simulation.

try (BufferingInputStream<SSTable> stream = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
{
// Skip ahead so we only read the final chunks
Expand Down Expand Up @@ -346,6 +350,77 @@ public Duration timeout()
}
}

@Test
public void testUnalignedEndReading() throws IOException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: We might want to assert on returnedBuffers.size() == 2 to catch regressions where extra or missing requests are issued

{
int dataSize = 2527234;
int chunkSize = 4096;
int remainingReadBytes = 2;
List<byte[]> returnedBuffers = new ArrayList<>();
CassandraFileSource<SSTable> source = new CassandraFileSource<SSTable>()
{
@Override
public void request(long start, long end, StreamConsumer consumer)
{
byte[] bytes = RandomUtils.randomBytes((int) (end - start + 1));
StreamBuffer buffer = StreamBuffer.wrap(bytes);
returnedBuffers.add(bytes);
consumer.onRead(buffer);
consumer.onEnd();
}

@Override
public SSTable cassandraFile()
{
return null;
}

@Override
public FileType fileType()
{
return FileType.PARTITIONS_INDEX;
}

@Override
public long size()
{
return dataSize;
}

@Override
@Nullable
public Duration timeout()
{
return Duration.ofSeconds(5);
}

@Override
public long chunkBufferSize()
{
return chunkSize;
}
};

try (BufferingInputStream<SSTable> stream1 = new BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
{
// move left from the file end by (chunkSize + remainingReadBytes)
try (BufferingInputStream<SSTable> stream2 = stream1.reBuffer(dataSize - chunkSize - remainingReadBytes))
{
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
int read = stream2.read(buffer); // read last full chunk
assertThat(returnedBuffers).hasSize(2);
assertThat(read).isEqualTo(chunkSize);
assertThat(buffer.array()).isEqualTo(returnedBuffers.get(0));

buffer.position(0);
buffer.limit(remainingReadBytes);
read = stream2.read(buffer); // read remaining bytes
assertThat(read).isEqualTo(remainingReadBytes);
assertThat(ArrayUtils.subarray(buffer.array(), 0, remainingReadBytes)).isEqualTo(returnedBuffers.get(1));
}
}
}

// Utils

private static ImmutableList<StreamBuffer> randomBuffers(int count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public void testCDCRebufferSequentialReading() throws IOException
long position = call.start;

// Deliver data in chunks until request is fulfilled
while (position < actualEnd)
while (position <= actualEnd) // range boundaries are inclusive
Copy link
Copy Markdown
Member Author

@lukasz-antoniak lukasz-antoniak Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to below JavaDoc, ranges should be considered inclusive.

/**
 * Asynchronously request bytes for the SSTable file component in the range start-end, and pass on to the StreamConsumer when available.
 * The start-end range is inclusive.
 *
 * @param start    the start of the bytes range
 * @param end      the end of the bytes range
 * @param consumer the StreamConsumer to return the bytes to when the request is complete
 */
void request(long start, long end, StreamConsumer consumer);

{
int chunkSize = (int) Math.min(actualEnd - position, bufferSize);
int chunkSize = (int) Math.min(actualEnd - position + 1, bufferSize);
Buffer data = Buffer.buffer();
for (int i = 0; i < chunkSize; i++)
{
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testCDCRebufferBackwardSeek() throws IOException
// Backward seek path has a bug: requests buffer.remaining() + 1 bytes
// We cap delivery at buffer capacity to work around this and test flip() behavior
long actualEnd = Math.min(call.end, testCommitLog.maxOffset());
long requestedBytes = actualEnd - call.start;
long requestedBytes = actualEnd - call.start + 1; // range boundaries are inclusive
long position = call.start;

// Cap delivery to buffer capacity to avoid BufferOverflowException from + 1 bug
Expand Down
Loading