From cdfccd115fd2e58aec3ab24c48ca94b029eba0c1 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Mon, 16 Mar 2026 13:22:14 +0000 Subject: [PATCH 1/9] absolute times need to be long --- .../cassandra/cdc/avro/CdcEventUtils.java | 4 +- .../src/main/resources/cdc_bytes.avsc | 2 +- .../main/resources/cdc_generic_record.avsc | 2 +- .../cdc/json/JsonSerializerTests.java | 27 ++- .../apache/cassandra/cdc/Y2038TimeTests.java | 207 ++++++++++++++++++ .../apache/cassandra/cdc/msg/CdcEvent.java | 4 +- .../cassandra/cdc/msg/CdcEventBuilder.java | 2 +- .../spark/utils/ReaderTimeProvider.java | 6 +- .../cassandra/spark/utils/TimeProvider.java | 10 +- .../spark/data/CassandraDataLayer.java | 8 +- .../cassandra/spark/SSTableReaderTests.java | 6 +- .../cdc/msg/FourZeroCdcEventBuilder.java | 6 +- .../java/org/apache/cassandra/db/DbUtils.java | 2 +- .../io/sstable/SSTableTombstoneWriter.java | 2 +- .../spark/reader/CompactionStreamScanner.java | 2 +- .../apache/cassandra/spark/data/CqlType.java | 2 +- .../java/org/apache/cassandra/db/DbUtils.java | 8 +- .../spark/reader/AbstractStreamScanner.java | 4 +- .../spark/reader/CompactionStreamScanner.java | 6 +- .../cassandra/spark/data/AbstractCqlType.java | 5 +- .../apache/cassandra/spark/data/CqlType.java | 2 + .../spark/data/complex/AbstractCqlList.java | 7 +- .../cassandra/spark/data/complex/CqlMap.java | 7 +- .../cassandra/spark/data/complex/CqlSet.java | 7 +- 24 files changed, 294 insertions(+), 44 deletions(-) create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java diff --git a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java index ddb1b48a8..fef91ab6b 100644 --- a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java +++ b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java @@ -281,14 +281,14 @@ public static GenericData.Record getTTLAvro(CdcEvent event, Schema ttlSchema) return ttlRecord; } - public static Map getTTL(CdcEvent event) + public static Map getTTL(CdcEvent event) { CdcEvent.TimeToLive ttl = event.getTtl(); if (ttl == null) { return null; } - return mapOf(AvroConstants.TTL_KEY, ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec); + return mapOf(AvroConstants.TTL_KEY, (long) ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec); } public static UpdatedEvent getUpdatedEvent(CdcEvent event, diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc index 8609b7bf3..551042987 100644 --- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc +++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc @@ -145,7 +145,7 @@ }, { "name": "deletedAt", - "type": "int", + "type": "long", "doc": "Future timestamp in seconds" } ] diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc index 9f4a3a47e..4535fe1dd 100644 --- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc +++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc @@ -145,7 +145,7 @@ }, { "name": "deletedAt", - "type": "int", + "type": "long", "doc": "Future timestamp in seconds" } ] diff --git a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java index 8a582f293..ee2aca021 100644 --- a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java +++ b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java @@ -101,7 +101,7 @@ public void testJsonSerializer() throws IOException assertThat(root.has(AvroConstants.TTL_KEY)).isTrue(); JsonNode ttl = root.get(AvroConstants.TTL_KEY); assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10); - assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asInt()).isEqualTo(1658269); + assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(1658269); } @Test @@ -138,4 +138,29 @@ public void testJsonBinary() throws IOException InetAddress address = InetAddress.getByAddress(Base64.getDecoder().decode(base64Str)); assertThat(address).isEqualTo(InetAddress.getByName("127.0.0.1")); } + + @Test + public void testJsonSerializerWithLongExpirationTime() throws IOException + { + long expirationTimePastIntMax = 2_147_483_648L; + CdcEventBuilder eventBuilder = CdcEventBuilder.of(CdcEvent.Kind.INSERT, TEST_KS, TEST_TBL_BASIC); + eventBuilder.setPartitionKeys(listOf(Value.of(TEST_KS, "a", "int", TYPES.aInt().serialize(1)))); + eventBuilder.setValueColumns(listOf( + Value.of(TEST_KS, "b", "int", TYPES.aInt().serialize(2)) + )); + eventBuilder.setMaxTimestampMicros(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + eventBuilder.setTimeToLive(new CdcEvent.TimeToLive(10, expirationTimePastIntMax)); + + byte[] ar; + try (JsonSerializer serializer = new JsonSerializer(TYPE_LOOKUP)) + { + ar = serializer.serialize("topic", eventBuilder.build()); + } + assertThat(ar).isNotNull(); + + JsonNode root = MAPPER.readTree(ar); + JsonNode ttl = root.get(AvroConstants.TTL_KEY); + assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10); + assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(expirationTimePastIntMax); + } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java new file mode 100644 index 000000000..636e3f5b9 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.test.CdcTestBase; +import org.apache.cassandra.cdc.test.CdcTester; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.cdc.test.CdcTester.newUniqueRow; +import static org.apache.cassandra.cdc.test.CdcTester.testWith; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests verifying Y2038 time handling in the CDC pipeline. + * Validates that TTL, expiration time, and deletion time values flow correctly + * through the commit log writing and reading pipeline for both Cassandra 4.0 and 5.0. + */ +public class Y2038TimeTests extends CdcTestBase +{ + private static final int TTL_SECONDS = 3600; // 1 hour + + /** + * Verifies that expirationTimeInSec is a valid long value and approximately equals nowInSeconds + ttl. + * This ensures the int-to-long widening in the CDC pipeline preserves time values correctly. + */ + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testTtlExpirationTimeIsLong(CassandraVersion version) + { + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt())) + .clearWriters() + .withNumRows(5) + .withWriter((tester, rows, writer) -> { + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); + testRow.setTTL(TTL_SECONDS); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + for (CdcEvent event : events) + { + assertThat(event.getTtl()).isNotNull(); + assertThat(event.getTtl().ttlInSec).isEqualTo(TTL_SECONDS); + // expirationTimeInSec should be approximately nowInSeconds + TTL + long expirationTime = event.getTtl().expirationTimeInSec; + assertThat(expirationTime) + .as("expirationTimeInSec should be between beforeTest+TTL and afterTest+TTL") + .isBetween(beforeTestEpochSec + TTL_SECONDS, afterTestEpochSec + TTL_SECONDS); + } + }) + .run(); + } + + /** + * Tests with maximum supported TTL value (max int seconds ~ 68 years). + * In Cassandra 5.0, the expiration time (nowInSeconds + ttl) can exceed Integer.MAX_VALUE. + * In Cassandra 4.0, the expiration time is limited to int range. + */ + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMaxSupportedTtl(CassandraVersion version) + { + // Use a large TTL value. The max TTL Cassandra supports is MAX_DELETION_TIME - nowInSeconds. + // We use 20 years which is safe for both 4.0 and 5.0. + int largeTtl = 20 * 365 * 24 * 3600; // ~20 years in seconds + + testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt())) + .clearWriters() + .withNumRows(3) + .withWriter((tester, rows, writer) -> { + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); + testRow.setTTL(largeTtl); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long nowEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + for (CdcEvent event : events) + { + assertThat(event.getTtl()).isNotNull(); + assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl); + long expirationTime = event.getTtl().expirationTimeInSec; + // The expiration time should be positive and in the future + assertThat(expirationTime) + .as("expirationTimeInSec should be positive and in the future") + .isGreaterThan(nowEpochSec); + } + }) + .run(); + } + + /** + * Tests that deletion time from partition deletions is handled correctly as a long value. + */ + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testDeletionTimeIsLong(CassandraVersion version) + { + long beforeTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt())) + .clearWriters() + .withNumRows(5) + .withWriter((tester, rows, writer) -> { + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniquePartitionDeletion(tester.schema, rows); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + for (CdcEvent event : events) + { + assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE); + // Deletion timestamp should be within the test window + long eventTimestampMicros = event.getTimestamp(TimeUnit.MICROSECONDS); + assertThat(eventTimestampMicros) + .as("deletion timestamp should be within the test time window") + .isBetween(beforeTestMicros, afterTestMicros); + } + }) + .run(); + } + + /** + * Tests that TTL and non-TTL rows can coexist correctly, + * verifying time handling does not corrupt non-TTL data. + */ + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMixedTtlAndNonTtlRows(CassandraVersion version) + { + testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt())) + .clearWriters() + .withNumRows(10) + .withWriter((tester, rows, writer) -> { + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); + // Set TTL on even rows only + if (i % 2 == 0) + { + testRow.setTTL(TTL_SECONDS); + } + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + int withTtl = 0; + int withoutTtl = 0; + for (CdcEvent event : events) + { + if (event.getTtl() != null) + { + withTtl++; + assertThat(event.getTtl().ttlInSec).isEqualTo(TTL_SECONDS); + assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L); + } + else + { + withoutTtl++; + } + } + assertThat(withTtl).isEqualTo(5); + assertThat(withoutTtl).isEqualTo(5); + }) + .run(); + } +} diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java index d3b5647c4..ae3687be4 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java @@ -50,9 +50,9 @@ public enum Kind public static class TimeToLive { public final int ttlInSec; - public final int expirationTimeInSec; + public final long expirationTimeInSec; - public TimeToLive(int ttlInSec, int expirationTimeInSec) + public TimeToLive(int ttlInSec, long expirationTimeInSec) { this.ttlInSec = ttlInSec; this.expirationTimeInSec = expirationTimeInSec; diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java index abf661510..60c760b02 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java @@ -107,7 +107,7 @@ public void setMaxTimestampMicros(long maxTimestampMicros) this.maxTimestampMicros = maxTimestampMicros; } - public void setTTL(int ttlInSec, int expirationTimeInSec) + public void setTTL(int ttlInSec, long expirationTimeInSec) { // Skip updating TTL if it already has been set. // For the same row, the upsert query can only set one TTL value. diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java index cbc175179..c6189bbb7 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java @@ -24,7 +24,7 @@ */ public class ReaderTimeProvider implements TimeProvider { - private final int referenceEpochInSeconds; + private final long referenceEpochInSeconds; public ReaderTimeProvider() { @@ -35,13 +35,13 @@ public ReaderTimeProvider() * Constructor used for deserialization * @param referenceEpochInSeconds reference epoch to set */ - public ReaderTimeProvider(int referenceEpochInSeconds) + public ReaderTimeProvider(long referenceEpochInSeconds) { this.referenceEpochInSeconds = referenceEpochInSeconds; } @Override - public int referenceEpochInSeconds() + public long referenceEpochInSeconds() { return referenceEpochInSeconds; } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java index 40a251254..1ea9db287 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java @@ -38,10 +38,10 @@ public interface TimeProvider @VisibleForTesting TimeProvider DEFAULT = new TimeProvider() { - private final int referenceEpochInSeconds = nowInSeconds(); + private final long referenceEpochInSeconds = nowInSeconds(); @Override - public int referenceEpochInSeconds() + public long referenceEpochInSeconds() { return referenceEpochInSeconds; } @@ -50,9 +50,9 @@ public int referenceEpochInSeconds() /** * @return current time in seconds */ - default int nowInSeconds() + default long nowInSeconds() { - return (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); } /** @@ -62,5 +62,5 @@ default int nowInSeconds() *

* Note that the actual constant value returned is implementation dependent */ - int referenceEpochInSeconds(); + long referenceEpochInSeconds(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index b6e84c6f5..99cbc6827 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -771,7 +771,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); } this.rfMap = (Map) in.readObject(); - this.timeProvider = new ReaderTimeProvider(in.readInt()); + this.timeProvider = new ReaderTimeProvider(in.readLong()); this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject(); this.maybeQuoteKeyspaceAndTable(); this.initSidecarClient(); @@ -817,7 +817,7 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeUTF(feature.optionName()); } out.writeObject(this.rfMap); - out.writeInt(timeProvider.referenceEpochInSeconds()); + out.writeLong(timeProvider.referenceEpochInSeconds()); out.writeObject(this.sstableTimeRangeFilter); } @@ -893,7 +893,7 @@ public void write(Kryo kryo, Output out, CassandraDataLayer dataLayer) .collect(Collectors.toList()); kryo.writeObject(out, listWrapper); kryo.writeObject(out, dataLayer.rfMap); - out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds()); + out.writeLong(dataLayer.timeProvider.referenceEpochInSeconds()); kryo.writeObject(out, dataLayer.sstableTimeRangeFilter); } @@ -936,7 +936,7 @@ public CassandraDataLayer read(Kryo kryo, Input in, Class ty in.readString(), kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(), kryo.readObject(in, HashMap.class), - new ReaderTimeProvider(in.readInt()), + new ReaderTimeProvider(in.readLong()), kryo.readObject(in, SSTableTimeRangeFilter.class)); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java index 1bec25eaf..53072e172 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java @@ -25,7 +25,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.google.common.util.concurrent.Uninterruptibles; @@ -73,7 +73,7 @@ private void testTtlUsingConstantReferenceTimeHelper(CassandraBridge bridgeForTe int rows, int expectedValues) { - AtomicInteger referenceEpoch = new AtomicInteger(0); + AtomicLong referenceEpoch = new AtomicLong(0); TimeProvider navigatableTimeProvider = referenceEpoch::get; Set expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3)); @@ -90,7 +90,7 @@ private void testTtlUsingConstantReferenceTimeHelper(CassandraBridge bridgeForTe } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); }); - int t1 = navigatableTimeProvider.nowInSeconds(); + long t1 = navigatableTimeProvider.nowInSeconds(); assertThat(countSSTables(dir)).isEqualTo(1); // open CompactionStreamScanner over SSTables diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java index b4e7d8b1c..62c9d6854 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java @@ -242,9 +242,7 @@ else if (complex.cellsCount() > 0) holder.add(makeValue(cell.buffer(), cell.column())); if (cell.isExpiring()) { - // TODO: CASSANDRA-14227 Support unit interpretation, - // so that TTL does not overflow and become negative. - setTTL(cell.ttl(), Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); + setTTL(cell.ttl(), cell.localDeletionTime()); } } } @@ -274,7 +272,7 @@ private void processComplexData(List holder, ComplexColumnData complex) buffer.addCell(cell); if (cell.isExpiring()) { - setTTL(cell.ttl(), Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); + setTTL(cell.ttl(), cell.localDeletionTime()); } } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index 963f3454b..f67446d97 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -31,7 +31,7 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { return DeletionTime.build(markedForDeleteAt, localDeletionTime); } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java index 63ab2fdbc..dfd18bb3e 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java @@ -187,7 +187,7 @@ private void rawAddRow(List values) throws InvalidRequestException, ClientState.forInternalCalls(), options, delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), - (int) TimeUnit.MILLISECONDS.toSeconds(now), + TimeUnit.MILLISECONDS.toSeconds(now), delete.getTimeToLive(options), Collections.emptyMap()); diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java index df6af1380..34426e632 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java @@ -117,7 +117,7 @@ protected void handleCellTombstoneInComplex(BigInteger token, Cell cell) @Override UnfilteredPartitionIterator initializePartitions() { - int nowInSec = timeProvider.referenceEpochInSeconds(); + long nowInSec = timeProvider.referenceEpochInSeconds(); Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace); ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(metadata.name); controller = new PurgingCompactionController(cfStore, CompactionParams.TombstoneOption.NONE); diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index 8dffb2960..72a656a39 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -48,7 +48,7 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil long deletionTime) { Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); - rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); + rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, TimeUnit.MICROSECONDS.toSeconds(deletionTime))); } public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index 7965272e3..283186f9c 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -33,23 +33,27 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + // C* 4.0 DeletionTime constructor requires int for localDeletionTime; checked cast will throw after Y2038 + public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { - return new DeletionTime(markedForDeleteAt, localDeletionTime); + return new DeletionTime(markedForDeleteAt, Ints.checkedCast(localDeletionTime)); } public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) { + // C* 4.0 LivenessInfo.create requires int for nowInSeconds; checked cast will throw after Y2038 return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds)); } public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) { + // C* 4.0 fullPartitionDelete requires int for nowInSec; checked cast will throw after Y2038 return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, Ints.checkedCast(nowInSec)); } public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) { + // C* 4.0 simpleBuilder.nowInSec requires int; checked cast will throw after Y2038 return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(Ints.checkedCast(nowInSec)); } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 05d9d45ec..25cda1867 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -25,6 +25,7 @@ import java.util.Iterator; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringPrefix; @@ -443,7 +444,8 @@ public void consume() Cell cell = cells.next(); // Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon, // it is handled as tombstone - if (cell.isLive(timeProvider.referenceEpochInSeconds())) + // C* 4.0 Cell.isLive requires int for nowInSec; checked cast will throw after Y2038 + if (cell.isLive(Ints.checkedCast(timeProvider.referenceEpochInSeconds()))) { buffer.addCell(cell); } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java index 89a2df3f2..2925eb590 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ColumnFamilyStore; @@ -117,7 +118,7 @@ protected void handleCellTombstoneInComplex(BigInteger token, Cell cell) @Override UnfilteredPartitionIterator initializePartitions() { - int nowInSec = timeProvider.referenceEpochInSeconds(); + long nowInSec = timeProvider.referenceEpochInSeconds(); Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace); ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(metadata.name); controller = new PurgingCompactionController(cfStore, CompactionParams.TombstoneOption.NONE); @@ -125,7 +126,8 @@ UnfilteredPartitionIterator initializePartitions() .map(Scannable::scanner) .collect(Collectors.toList()); scanners = new AbstractCompactionStrategy.ScannerList(scannerList); - ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, taskId); + // C* 4.0 CompactionIterator requires int for nowInSec; checked cast will throw after Y2038 + ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, Ints.checkedCast(nowInSec), taskId); return ci; } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java index e4425d753..dcffcd87b 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java @@ -138,7 +138,7 @@ public void addCell(org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { addCell(rowBuilder, cd, timestamp, ttl, now, value, null); @@ -149,7 +149,7 @@ public void addCell(org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value, CellPath cellPath) { @@ -199,6 +199,7 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil long deletionTime) { Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); + // C* 4.0 DeletionTime constructor requires int for localDeletionTime rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); } } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index e91027a68..3912d9a89 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -31,11 +31,13 @@ public abstract class CqlType extends AbstractCqlType { public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) { + // C* 4.0 BufferCell.tombstone requires int for nowInSec; checked cast will throw after Y2038 return BufferCell.tombstone(column, timestamp, Ints.checkedCast(nowInSec), path); } public static BufferCell expiring(ColumnMetadata column, long timestamp, int ttl, long nowInSec, ByteBuffer value, CellPath path) { + // C* 4.0 BufferCell.expiring requires int for nowInSec; checked cast will throw after Y2038 return BufferCell.expiring(column, timestamp, ttl, Ints.checkedCast(nowInSec), value, path); } } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java index a7cf8bea4..596954216 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java @@ -24,6 +24,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.google.common.primitives.Ints; + import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; import org.apache.cassandra.db.marshal.AbstractType; @@ -105,14 +107,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Object o : (List) value) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, type().serialize(o), + // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 + rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), type().serialize(o), CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); } else diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java index 6d0e49d35..49b6260ce 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java @@ -23,6 +23,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import com.google.common.primitives.Ints; + import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; import org.apache.cassandra.db.marshal.AbstractType; @@ -120,14 +122,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Map.Entry entry : ((Map) value).entrySet()) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, valueType().serialize(entry.getValue()), + // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 + rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), valueType().serialize(entry.getValue()), CellPath.create(keyType().serialize(entry.getKey())))); } else diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java index c50b80591..8bba65c95 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; +import com.google.common.primitives.Ints; + import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.DataType; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; @@ -104,14 +106,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Object o : (Set) value) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, ByteBufferUtil.EMPTY_BYTE_BUFFER, + // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 + rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.create(type().serialize(o)))); } else From a2180fbc84b04f9a4acb406ae490ede1b45e68f6 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 17 Mar 2026 13:37:57 +0000 Subject: [PATCH 2/9] integration tests --- CHANGES.txt | 1 + cassandra-analytics-cdc/build.gradle | 3 + .../cdc/AvroByteRecordTransformerTest.java | 178 +++++++ .../cdc/AvroGenericRecordTransformerTest.java | 502 ++++++++++++++++++ .../apache/cassandra/cdc/Y2038TimeTests.java | 207 -------- .../cassandra/cdc/avro/TestSchemaStore.java | 65 +++ .../BulkWriteComplexTypeTtlTest.java | 201 +++++++ .../cassandra/cdc/avro/AvroConstants.java | 2 +- .../io/sstable/SSTableTombstoneWriter.java | 3 +- .../spark/data/CqlTypeY2038Test.java | 95 ++++ .../cassandra/spark/data/AbstractCqlType.java | 3 +- .../spark/data/complex/AbstractCqlList.java | 7 +- .../cassandra/spark/data/complex/CqlMap.java | 7 +- .../cassandra/spark/data/complex/CqlSet.java | 7 +- 14 files changed, 1056 insertions(+), 225 deletions(-) create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java delete mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java create mode 100644 cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java diff --git a/CHANGES.txt b/CHANGES.txt index 2ade1c36e..a5bd587e2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Fix year 2038 problem using long for absolute times and support C* 5.0 extended localDeletionTime * Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129) * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60) * Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127) diff --git a/cassandra-analytics-cdc/build.gradle b/cassandra-analytics-cdc/build.gradle index 6fdd7910c..38c284065 100644 --- a/cassandra-analytics-cdc/build.gradle +++ b/cassandra-analytics-cdc/build.gradle @@ -89,6 +89,9 @@ dependencies { implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}" testImplementation project(":cassandra-analytics-common") + testImplementation project(":cassandra-analytics-cdc-codec") + testImplementation "org.apache.avro:avro:${avroVersion}" + testImplementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}" // pull in cassandra-bridge so we can re-use TestSchema to generate arbitrary schemas for the cdc tests testImplementation project(":cassandra-bridge") diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java new file mode 100644 index 000000000..985b3b1b8 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DecoderFactory; +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.cdc.api.KeyspaceTypeKey; +import org.apache.cassandra.cdc.avro.AvroByteRecordTransformer; +import org.apache.cassandra.cdc.avro.AvroConstants; +import org.apache.cassandra.cdc.avro.AvroSchemas; +import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; +import org.apache.cassandra.cdc.avro.TestSchemaStore; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.test.CdcTestBase; +import org.apache.cassandra.cdc.test.CdcTester; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.cdc.test.CdcTester.testWith; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that exercise the CDC-to-Avro byte-serialization pipeline ({@code cdc_bytes.avsc}), + */ +public class AvroByteRecordTransformerTest extends CdcTestBase +{ + private static final int NUM_ROWS = 50; + + private CqlToAvroSchemaConverter getConverter(CassandraVersion version) + { + CqlToAvroSchemaConverter converter = CdcBridgeFactory.getCqlToAvroSchemaConverter(version); + assertThat(converter).isNotNull(); + return converter; + } + + /** + * Build the Avro byte transformer by converting the CQL table schema + * into an Avro schema and registering it in the test schema store. + */ + private AvroByteRecordTransformer buildTransformer(CqlToAvroSchemaConverter converter, + CqlTable cqlTable, + TestSchemaStore schemaStore) + { + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + return new AvroByteRecordTransformer(schemaStore, typeLookup); + } + + /** + * Deserialize a byte payload using the table's Avro schema from the schema store. + */ + private GenericRecord deserializePayload(ByteBuffer payloadBytes, GenericDatumReader reader) throws IOException + { + byte[] bytes = new byte[payloadBytes.remaining()]; + payloadBytes.get(bytes); + return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testTtlDeletedAtByteAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + int ttlSeconds = 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow.setTTL(ttlSeconds); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + + TestSchemaStore schemaStore = new TestSchemaStore(); + AvroByteRecordTransformer transformer = buildTransformer(converter, tableRef.get(), schemaStore); + String namespace = tableRef.get().keyspace() + "." + tableRef.get().table(); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Validate header-level fields + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull(); + assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION); + + // TTL record should be present + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + + // Validate TTL value + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds); + + // Validate deletedAt is a Long (confirms long type in cdc_bytes.avsc) + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + + // Validate deletedAt is approximately nowInSeconds + TTL + long deletedAtValue = (Long) deletedAt; + assertThat(deletedAtValue) + .as("deletedAt should be approximately nowInSeconds + TTL") + .isBetween(beforeTestEpochSec + ttlSeconds, afterTestEpochSec + ttlSeconds); + + // Validate payload: bytes in the byte schema need deserialization to verify content + Object payloadObj = record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payloadObj).isInstanceOf(ByteBuffer.class); + GenericRecord payloadRecord; + try + { + payloadRecord = deserializePayload((ByteBuffer) payloadObj, + schemaStore.getReader(namespace, null)); + } + catch (IOException e) + { + throw new RuntimeException("Failed to deserialize payload", e); + } + assertThat(payloadRecord.get("pk")).isNotNull(); + } + }) + .run(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java new file mode 100644 index 000000000..2672e59df --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.IntStream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.cdc.api.KeyspaceTypeKey; +import org.apache.cassandra.cdc.avro.AvroConstants; +import org.apache.cassandra.cdc.avro.AvroGenericRecordTransformer; +import org.apache.cassandra.cdc.avro.AvroSchemas; +import org.apache.cassandra.cdc.avro.CdcEventUtils; +import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; +import org.apache.cassandra.cdc.avro.TestSchemaStore; +import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.test.CdcTestBase; +import org.apache.cassandra.cdc.test.CdcTester; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.cdc.test.CdcTester.newUniquePartitionDeletion; +import static org.apache.cassandra.cdc.test.CdcTester.testWith; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that exercise the full CDC-to-Avro pipeline: + * write mutations -> read CDC events from commit logs -> convert to Avro GenericRecord -> validate. + */ +@SuppressWarnings("DataFlowIssue") +public class AvroGenericRecordTransformerTest extends CdcTestBase +{ + private static final int NUM_ROWS = 50; + + private CqlToAvroSchemaConverter getConverter(CassandraVersion version) + { + CqlToAvroSchemaConverter converter = CdcBridgeFactory.getCqlToAvroSchemaConverter(version); + assertThat(converter).isNotNull(); + return converter; + } + + /** + * Build the Avro transformer by converting the CQL table schema (already registered by CdcTester) + * into an Avro schema and registering it in the test schema store. + */ + private AvroGenericRecordTransformer buildTransformer(CqlToAvroSchemaConverter converter, + CqlTable cqlTable) + { + TestSchemaStore schemaStore = new TestSchemaStore(); + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + return new AvroGenericRecordTransformer(schemaStore, typeLookup, ""); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testBasicInsertAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withClusteringKey("ck", bridge.bigint()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + // Capture CqlTable from the tester via the writer callback + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Validate header fields + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull(); + assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION); + + // Validate payload contains expected fields including clustering key + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + assertThat(payload.getSchema().getField("pk")).isNotNull(); + assertThat(payload.getSchema().getField("ck")).isNotNull(); + assertThat(payload.getSchema().getField("c1")).isNotNull(); + assertThat(payload.getSchema().getField("c2")).isNotNull(); + // Payload fields should have data + assertThat(payload.get("pk")).isNotNull(); + assertThat(payload.get("ck")).isNotNull(); + assertThat(payload.get("c1")).isNotNull(); + + // Validate updateFields lists all updated column names + List updateFields = CdcEventUtils.updatedFieldNames(event); + assertThat(updateFields).contains("pk", "ck", "c1", "c2"); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testCollectionTypesAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("m", bridge.map(bridge.text(), bridge.text())) + .withColumn("s", bridge.set(bridge.aInt())) + .withColumn("l", bridge.list(bridge.text())); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + assertThat(payload.getSchema().getField("m")).isNotNull(); + assertThat(payload.getSchema().getField("s")).isNotNull(); + assertThat(payload.getSchema().getField("l")).isNotNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testAvroSerializeDeserializeRoundTrip(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + TestSchemaStore schemaStore = new TestSchemaStore(); + CqlTable cqlTable = tableRef.get(); + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + AvroGenericRecordSerializer serializer = new AvroGenericRecordSerializer(schemaStore, typeLookup, ""); + + for (CdcEvent event : events) + { + // Serialize: CdcEvent -> bytes + byte[] bytes = serializer.serialize("test-topic", event); + assertThat(bytes).isNotNull(); + assertThat(bytes.length).isGreaterThan(0); + + // Get the Avro record's schema for deserialization + GenericData.Record record = serializer.getTransformer().transform(event); + Schema recordSchema = record.getSchema(); + + // Deserialize: bytes -> CdcEnvelope + org.apache.cassandra.cdc.avro.msg.CdcEnvelope envelope = + serializer.deserializer().deserialize(event.keyspace, event.table, bytes, recordSchema); + assertThat(envelope).isNotNull(); + assertThat(envelope.header).isNotNull(); + assertThat(envelope.payload).isNotNull(); + + // Validate round-trip preserves operation type and table info + assertThat(envelope.header.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(envelope.header.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(envelope.header.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + + // Validate payload fields survived round-trip + assertThat(envelope.payload.get("pk")).isNotNull(); + assertThat(envelope.payload.get("c1")).isNotNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testDeleteEventAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow = testRow.copy("c1", org.apache.cassandra.bridge.CdcBridge.UNSET_MARKER); + testRow = testRow.copy("c2", null); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Verify the Avro record correctly encodes the operation type from the event + CdcEventUtils.OperationType opType = CdcEventUtils.getOperationType(event); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo(opType.name()); + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + // pk should still be present + assertThat(payload.get("pk")).isNotNull(); + // c2 is deleted (null in the payload) + assertThat(payload.get("c2")).isNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMaxSupportedTtlAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + // Use a large TTL value (~20 years). Safe for both 4.0 and 5.0. + int largeTtl = 20 * 365 * 24 * 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow.setTTL(largeTtl); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + // Validate CdcEvent-level TTL + assertThat(event.getTtl()).isNotNull(); + assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl); + long expirationTime = event.getTtl().expirationTimeInSec; + long expectedLower = beforeTestEpochSec + largeTtl; + long expectedUpper = afterTestEpochSec + largeTtl; + if (expirationTime <= Integer.MAX_VALUE && expectedLower > Integer.MAX_VALUE) + { + // Cassandra 4.0 caps the value at Integer.MAX_VALUE + assertThat(expirationTime) + .as("expirationTimeInSec should be capped near Integer.MAX_VALUE on Cassandra 4.0") + .isBetween((long) Integer.MAX_VALUE - 1, (long) Integer.MAX_VALUE); + } + else + { + assertThat(expirationTime) + .as("expirationTimeInSec should be approximately nowInSeconds + largeTtl") + .isBetween(expectedLower, expectedUpper); + } + + // Validate Avro-level TTL encoding + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(largeTtl); + + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testPartitionDeleteAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + long beforeTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = newUniquePartitionDeletion(tester.schema, rows); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + // Validate CdcEvent-level partition delete + assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE); + long eventTimestampMicros = event.getTimestamp(TimeUnit.MICROSECONDS); + assertThat(eventTimestampMicros) + .as("deletion timestamp should be within the test time window") + .isBetween(beforeTestMicros, afterTestMicros); + + // Validate Avro-level encoding + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("DELETE_PARTITION"); + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMixedTtlAndNonTtlAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + int ttlSeconds = 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + if (i % 2 == 0) + { + testRow.setTTL(ttlSeconds); + } + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + int withTtl = 0; + int withoutTtl = 0; + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + if (event.getTtl() != null) + { + withTtl++; + assertThat(event.getTtl().ttlInSec).isEqualTo(ttlSeconds); + assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L); + + // Avro TTL field should be present + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).as("Avro TTL record should be present for TTL row").isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds); + + // Validate deletedAt is a Long with expected value + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + long deletedAtValue = (Long) deletedAt; + assertThat(deletedAtValue) + .as("deletedAt should be approximately nowInSeconds + TTL") + .isBetween(beforeTestEpochSec + ttlSeconds, afterTestEpochSec + ttlSeconds); + } + else + { + withoutTtl++; + // Avro TTL field should be absent + assertThat(record.get(AvroConstants.TTL_KEY)) + .as("Avro TTL record should be null for non-TTL row") + .isNull(); + } + } + assertThat(withTtl).isEqualTo(NUM_ROWS / 2); + assertThat(withoutTtl).isEqualTo(NUM_ROWS / 2); + }) + .run(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java deleted file mode 100644 index 636e3f5b9..000000000 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/Y2038TimeTests.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.cdc; - -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; - -import org.apache.cassandra.bridge.CassandraVersion; -import org.apache.cassandra.cdc.msg.CdcEvent; -import org.apache.cassandra.cdc.test.CdcTestBase; -import org.apache.cassandra.cdc.test.CdcTester; -import org.apache.cassandra.spark.utils.test.TestSchema; - -import static org.apache.cassandra.cdc.test.CdcTester.newUniqueRow; -import static org.apache.cassandra.cdc.test.CdcTester.testWith; -import static org.assertj.core.api.Assertions.assertThat; - -/** - * Tests verifying Y2038 time handling in the CDC pipeline. - * Validates that TTL, expiration time, and deletion time values flow correctly - * through the commit log writing and reading pipeline for both Cassandra 4.0 and 5.0. - */ -public class Y2038TimeTests extends CdcTestBase -{ - private static final int TTL_SECONDS = 3600; // 1 hour - - /** - * Verifies that expirationTimeInSec is a valid long value and approximately equals nowInSeconds + ttl. - * This ensures the int-to-long widening in the CDC pipeline preserves time values correctly. - */ - @ParameterizedTest - @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") - public void testTtlExpirationTimeIsLong(CassandraVersion version) - { - long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); - testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) - .withPartitionKey("pk", bridge.uuid()) - .withColumn("c1", bridge.aInt())) - .clearWriters() - .withNumRows(5) - .withWriter((tester, rows, writer) -> { - for (int i = 0; i < tester.numRows; i++) - { - TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); - testRow.setTTL(TTL_SECONDS); - writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); - } - }) - .withCdcEventChecker((testRows, events) -> { - long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); - for (CdcEvent event : events) - { - assertThat(event.getTtl()).isNotNull(); - assertThat(event.getTtl().ttlInSec).isEqualTo(TTL_SECONDS); - // expirationTimeInSec should be approximately nowInSeconds + TTL - long expirationTime = event.getTtl().expirationTimeInSec; - assertThat(expirationTime) - .as("expirationTimeInSec should be between beforeTest+TTL and afterTest+TTL") - .isBetween(beforeTestEpochSec + TTL_SECONDS, afterTestEpochSec + TTL_SECONDS); - } - }) - .run(); - } - - /** - * Tests with maximum supported TTL value (max int seconds ~ 68 years). - * In Cassandra 5.0, the expiration time (nowInSeconds + ttl) can exceed Integer.MAX_VALUE. - * In Cassandra 4.0, the expiration time is limited to int range. - */ - @ParameterizedTest - @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") - public void testMaxSupportedTtl(CassandraVersion version) - { - // Use a large TTL value. The max TTL Cassandra supports is MAX_DELETION_TIME - nowInSeconds. - // We use 20 years which is safe for both 4.0 and 5.0. - int largeTtl = 20 * 365 * 24 * 3600; // ~20 years in seconds - - testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) - .withPartitionKey("pk", bridge.uuid()) - .withColumn("c1", bridge.aInt())) - .clearWriters() - .withNumRows(3) - .withWriter((tester, rows, writer) -> { - for (int i = 0; i < tester.numRows; i++) - { - TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); - testRow.setTTL(largeTtl); - writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); - } - }) - .withCdcEventChecker((testRows, events) -> { - long nowEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); - for (CdcEvent event : events) - { - assertThat(event.getTtl()).isNotNull(); - assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl); - long expirationTime = event.getTtl().expirationTimeInSec; - // The expiration time should be positive and in the future - assertThat(expirationTime) - .as("expirationTimeInSec should be positive and in the future") - .isGreaterThan(nowEpochSec); - } - }) - .run(); - } - - /** - * Tests that deletion time from partition deletions is handled correctly as a long value. - */ - @ParameterizedTest - @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") - public void testDeletionTimeIsLong(CassandraVersion version) - { - long beforeTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); - testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) - .withPartitionKey("pk", bridge.uuid()) - .withColumn("c1", bridge.aInt())) - .clearWriters() - .withNumRows(5) - .withWriter((tester, rows, writer) -> { - for (int i = 0; i < tester.numRows; i++) - { - TestSchema.TestRow testRow = CdcTester.newUniquePartitionDeletion(tester.schema, rows); - writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); - } - }) - .withCdcEventChecker((testRows, events) -> { - long afterTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); - for (CdcEvent event : events) - { - assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE); - // Deletion timestamp should be within the test window - long eventTimestampMicros = event.getTimestamp(TimeUnit.MICROSECONDS); - assertThat(eventTimestampMicros) - .as("deletion timestamp should be within the test time window") - .isBetween(beforeTestMicros, afterTestMicros); - } - }) - .run(); - } - - /** - * Tests that TTL and non-TTL rows can coexist correctly, - * verifying time handling does not corrupt non-TTL data. - */ - @ParameterizedTest - @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") - public void testMixedTtlAndNonTtlRows(CassandraVersion version) - { - testWith(bridge, cdcBridge, commitLogDir, TestSchema.builder(bridge) - .withPartitionKey("pk", bridge.uuid()) - .withColumn("c1", bridge.aInt())) - .clearWriters() - .withNumRows(10) - .withWriter((tester, rows, writer) -> { - for (int i = 0; i < tester.numRows; i++) - { - TestSchema.TestRow testRow = newUniqueRow(tester.schema, rows); - // Set TTL on even rows only - if (i % 2 == 0) - { - testRow.setTTL(TTL_SECONDS); - } - writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); - } - }) - .withCdcEventChecker((testRows, events) -> { - int withTtl = 0; - int withoutTtl = 0; - for (CdcEvent event : events) - { - if (event.getTtl() != null) - { - withTtl++; - assertThat(event.getTtl().ttlInSec).isEqualTo(TTL_SECONDS); - assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L); - } - else - { - withoutTtl++; - } - } - assertThat(withTtl).isEqualTo(5); - assertThat(withoutTtl).isEqualTo(5); - }) - .run(); - } -} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java new file mode 100644 index 000000000..b9a5249a1 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc.avro; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.cassandra.cdc.schemastore.SchemaStore; + +/** + * In-memory {@link SchemaStore} implementation for tests. + * Schemas are registered manually via {@link #registerSchema(String, Schema)}. + */ +public class TestSchemaStore implements SchemaStore +{ + private final Map schemas = new HashMap<>(); + private final Map> writers = new HashMap<>(); + private final Map> readers = new HashMap<>(); + + public void registerSchema(String namespace, Schema schema) + { + schemas.put(namespace, schema); + writers.put(namespace, new GenericDatumWriter<>(schema)); + readers.put(namespace, new GenericDatumReader<>(schema)); + } + + @Override + public Schema getSchema(String namespace, String name) + { + return schemas.get(namespace); + } + + @Override + public GenericDatumWriter getWriter(String namespace, String name) + { + return writers.get(namespace); + } + + @Override + public GenericDatumReader getReader(String namespace, String name) + { + return readers.get(namespace); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java new file mode 100644 index 000000000..569eb031b --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.TTLOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.apache.spark.sql.types.DataTypes.createArrayType; +import static org.apache.spark.sql.types.DataTypes.createMapType; +import static org.apache.spark.sql.types.DataTypes.createStructType; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that bulk writes with TTL work correctly for complex types (list, set, map, UDT). + * This ensures the expiring cell path is invoked correctly for collection and composite types. + * TODO: add tuple ttl test after adding support for writing tuples + */ +class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase +{ + static final int ROW_COUNT = 100; + + static final QualifiedName LIST_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_list_ttl"); + static final QualifiedName SET_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_set_ttl"); + static final QualifiedName MAP_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_map_ttl"); + static final QualifiedName UDT_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_udt_ttl"); + + static final String SIMPLE_UDT_NAME = "simple_udt"; + + @Test + void testListWithTtl() + { + SparkSession spark = getOrCreateSparkSession(); + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("listdata", createArrayType(IntegerType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, Arrays.asList(i, i + 1))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, LIST_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + .save(); + + Dataset preExpiry = bulkReaderDataFrame(LIST_TTL_TABLE).load(); + assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); + + Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); + Dataset postExpiry = bulkReaderDataFrame(LIST_TTL_TABLE).load(); + assertThat(postExpiry.collectAsList()).isEmpty(); + } + + @Test + void testSetWithTtl() + { + SparkSession spark = getOrCreateSparkSession(); + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("setdata", createArrayType(StringType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, ImmutableSet.of("item" + i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + .save(); + + Dataset preExpiry = bulkReaderDataFrame(SET_TTL_TABLE).load(); + assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); + + Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); + Dataset postExpiry = bulkReaderDataFrame(SET_TTL_TABLE).load(); + assertThat(postExpiry.collectAsList()).isEmpty(); + } + + @Test + void testMapWithTtl() + { + SparkSession spark = getOrCreateSparkSession(); + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("mapdata", createMapType(StringType, IntegerType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, ImmutableMap.of("key" + i, i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + .save(); + + Dataset preExpiry = bulkReaderDataFrame(MAP_TTL_TABLE).load(); + assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); + + Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); + Dataset postExpiry = bulkReaderDataFrame(MAP_TTL_TABLE).load(); + assertThat(postExpiry.collectAsList()).isEmpty(); + } + + @Test + void testUdtWithTtl() + { + SparkSession spark = getOrCreateSparkSession(); + StructType udtType = createStructType(new StructField[]{ + new StructField("f1", StringType, true, Metadata.empty()), + new StructField("f2", IntegerType, true, Metadata.empty()) + }); + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("udtfield", udtType, false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, RowFactory.create("course" + i, i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + .save(); + + Dataset preExpiry = bulkReaderDataFrame(UDT_TTL_TABLE).load(); + assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); + + Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); + Dataset postExpiry = bulkReaderDataFrame(UDT_TTL_TABLE).load(); + assertThat(postExpiry.collectAsList()).isEmpty(); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(3); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(LIST_TTL_TABLE, DC1_RF3); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TYPE " + TEST_KEYSPACE + "." + SIMPLE_UDT_NAME + + " (f1 text, f2 int)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + LIST_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "listdata list)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + SET_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "setdata set)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + MAP_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "mapdata map)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + UDT_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "udtfield " + SIMPLE_UDT_NAME + ")"); + } +} diff --git a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java index f2aa80948..094a798d1 100644 --- a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java +++ b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java @@ -38,7 +38,7 @@ public final class AvroConstants public static final String FIELD_KEY = "field"; public static final String VALUE_KEY = "value"; - public static final String CURRENT_VERSION = "2"; + public static final String CURRENT_VERSION = "3"; public static final String ARRAY_BASED_MAP_KEY_NAME = "key"; public static final String ARRAY_BASED_MAP_VALUE_NAME = "value"; public static final String ARRAY_BASED_MAP_NAME = "array_map"; diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java index 32ff70533..57868d39f 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.cassandra.bridge.CassandraSchema; import org.apache.cassandra.config.DatabaseDescriptor; @@ -185,7 +186,7 @@ private void rawAddRow(List values) throws InvalidRequestException, delete.updatedColumns(), options, delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), - (int) TimeUnit.MILLISECONDS.toSeconds(now), + Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(now)), delete.getTimeToLive(options), Collections.emptyMap()); diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java new file mode 100644 index 000000000..c2d55c151 --- /dev/null +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data; + +import java.nio.ByteBuffer; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.reader.SchemaBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests verifying Y2038 boundary behavior for {@link CqlType#tombstone} and {@link CqlType#expiring} + * in Cassandra 4.0. + */ +class CqlTypeY2038Test +{ + private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + private static final int MAX_TTL_SECONDS = 20 * 365 * 24 * 60 * 60; // 20 years, Cassandra's max allowed TTL + private static final long NOW_IN_SEC_AT_BOUNDARY = Integer.MAX_VALUE - (long) MAX_TTL_SECONDS; // last nowInSec where max TTL fits in int + private static final long EXCEEDS_INT_RANGE = Integer.MAX_VALUE + 1L; + private static final long TIMESTAMP = 1000L; + private static final ByteBuffer VALUE = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}); + + private static ColumnMetadata regularColumn() + { + String createStatement = "CREATE TABLE test_ks.test_table (pk int PRIMARY KEY, v int)"; + TableMetadata metadata = new SchemaBuilder(createStatement, + "test_ks", + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + ImmutableMap.of("replication_factor", 1)), + Partitioner.Murmur3Partitioner).tableMetaData(); + return metadata.getColumn(ByteBuffer.wrap("v".getBytes())); + } + + @Test + void testExpiringWithinIntRange() + { + ColumnMetadata column = regularColumn(); + BufferCell cell = CqlType.expiring(column, TIMESTAMP, MAX_TTL_SECONDS, NOW_IN_SEC_AT_BOUNDARY, VALUE, null); + assertThat(cell).isNotNull(); + assertThat(cell.timestamp()).isEqualTo(TIMESTAMP); + assertThat(cell.ttl()).isEqualTo(MAX_TTL_SECONDS); + } + + @Test + void testExpiringExceedsIntRangeThrows() + { + ColumnMetadata column = regularColumn(); + assertThatThrownBy(() -> CqlType.expiring(column, TIMESTAMP, MAX_TTL_SECONDS, EXCEEDS_INT_RANGE, VALUE, null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void testTombstoneWithinIntRange() + { + ColumnMetadata column = regularColumn(); + BufferCell cell = CqlType.tombstone(column, TIMESTAMP, NOW_IN_SEC_AT_BOUNDARY, null); + assertThat(cell).isNotNull(); + assertThat(cell.timestamp()).isEqualTo(TIMESTAMP); + } + + @Test + void testTombstoneExceedsIntRangeThrows() + { + ColumnMetadata column = regularColumn(); + assertThatThrownBy(() -> CqlType.tombstone(column, TIMESTAMP, EXCEEDS_INT_RANGE, null)) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java index dcffcd87b..008d34cc8 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.api.Row; @@ -200,6 +201,6 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil { Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); // C* 4.0 DeletionTime constructor requires int for localDeletionTime - rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); + rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(deletionTime)))); } } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java index 596954216..43619e6f7 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java @@ -24,8 +24,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import com.google.common.primitives.Ints; - import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; import org.apache.cassandra.db.marshal.AbstractType; @@ -114,9 +112,8 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, { if (ttl != NO_TTL) { - // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), type().serialize(o), - CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, type().serialize(o), + CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); } else { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java index 49b6260ce..e9003b09f 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java @@ -23,8 +23,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import com.google.common.primitives.Ints; - import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; import org.apache.cassandra.db.marshal.AbstractType; @@ -129,9 +127,8 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, { if (ttl != NO_TTL) { - // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), valueType().serialize(entry.getValue()), - CellPath.create(keyType().serialize(entry.getKey())))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, valueType().serialize(entry.getValue()), + CellPath.create(keyType().serialize(entry.getKey())))); } else { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java index 8bba65c95..bbe504ae2 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java @@ -24,8 +24,6 @@ import java.util.Set; import java.util.stream.Collectors; -import com.google.common.primitives.Ints; - import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cql3.functions.types.DataType; import org.apache.cassandra.cql3.functions.types.SettableByIndexData; @@ -113,9 +111,8 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, { if (ttl != NO_TTL) { - // C* 4.0 BufferCell.expiring requires int for now; checked cast will throw after Y2038 - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, Ints.checkedCast(now), ByteBufferUtil.EMPTY_BYTE_BUFFER, - CellPath.create(type().serialize(o)))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, ByteBufferUtil.EMPTY_BYTE_BUFFER, + CellPath.create(type().serialize(o)))); } else { From 7459d2e3ca6e8b49a6a7f067402f4ec436a5afa0 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 17 Mar 2026 16:10:09 +0000 Subject: [PATCH 3/9] CI workaround --- .circleci/config.yml | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ff7eb5002..e9c4c7bcb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -66,8 +66,19 @@ commands: INTEGRATION_MAX_HEAP_SIZE: "1500M" CASSANDRA_USE_JDK11: <> command: | - # Run compile/unit tests, skipping integration tests - ./gradlew --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> + # Override test JVM settings to fit within CI memory (8GB large resource class) + cat > /tmp/ci-test-settings.gradle \<< 'EOF' + allprojects { + afterEvaluate { + tasks.withType(Test).configureEach { + maxHeapSize = '1536m' + minHeapSize = '512m' + maxParallelForks = Math.min(maxParallelForks, Math.max(Runtime.runtime.availableProcessors(), 2)) + } + } + } + EOF + ./gradlew --stacktrace --init-script /tmp/ci-test-settings.gradle clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> run_integration: parameters: From 2bacb9bcc1faf0f260edc60f630f3478338b53fb Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 17 Mar 2026 17:14:43 +0000 Subject: [PATCH 4/9] try CI split --- .circleci/config.yml | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e9c4c7bcb..25f36d1cf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -66,19 +66,9 @@ commands: INTEGRATION_MAX_HEAP_SIZE: "1500M" CASSANDRA_USE_JDK11: <> command: | - # Override test JVM settings to fit within CI memory (8GB large resource class) - cat > /tmp/ci-test-settings.gradle \<< 'EOF' - allprojects { - afterEvaluate { - tasks.withType(Test).configureEach { - maxHeapSize = '1536m' - minHeapSize = '512m' - maxParallelForks = Math.min(maxParallelForks, Math.max(Runtime.runtime.availableProcessors(), 2)) - } - } - } - EOF - ./gradlew --stacktrace --init-script /tmp/ci-test-settings.gradle clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> + export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" + # Run compile/unit tests, skipping integration tests + ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> run_integration: parameters: @@ -104,10 +94,11 @@ commands: INTEGRATION_MAX_HEAP_SIZE: "2500M" CASSANDRA_USE_JDK11: <> command: | + export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" export DTEST_JAR="dtest-<< parameters.cassandra >>.jar" export CASSANDRA_VERSION=$(echo << parameters.cassandra >> | cut -d'.' -f 1,2) # Run compile but not unit tests (which are run in run_build) - ./gradlew --stacktrace clean assemble + ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble # Run integration tests in parallel cd cassandra-analytics-integration-tests/src/test/java # Get list of classnames of tests that should run on this node @@ -143,7 +134,7 @@ jobs: name: Build dependencies for jdk11 builds command: | CASSANDRA_USE_JDK11=true ./scripts/build-dependencies.sh - ./gradlew codeCheckTasks + ./gradlew --no-daemon --max-workers=2 codeCheckTasks - persist_to_workspace: root: dependencies paths: From 4890d64f9fe271789cb0f56c8576646f508e61f4 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Wed, 18 Mar 2026 10:42:04 +0000 Subject: [PATCH 5/9] don't change avro schema version --- .../main/java/org/apache/cassandra/cdc/avro/AvroConstants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java index 094a798d1..f2aa80948 100644 --- a/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java +++ b/cassandra-avro-converter/src/main/java/org/apache/cassandra/cdc/avro/AvroConstants.java @@ -38,7 +38,7 @@ public final class AvroConstants public static final String FIELD_KEY = "field"; public static final String VALUE_KEY = "value"; - public static final String CURRENT_VERSION = "3"; + public static final String CURRENT_VERSION = "2"; public static final String ARRAY_BASED_MAP_KEY_NAME = "key"; public static final String ARRAY_BASED_MAP_VALUE_NAME = "value"; public static final String ARRAY_BASED_MAP_NAME = "array_map"; From fc56ccab395a87165727f740351b501f47a84943 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Wed, 18 Mar 2026 14:13:14 +0000 Subject: [PATCH 6/9] reduce gradle test paralleism --- .circleci/config.yml | 1 + cassandra-analytics-core/build.gradle | 3 ++- .../java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java | 2 -- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 25f36d1cf..c0415bf53 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -64,6 +64,7 @@ commands: JDK_VERSION: "<>" INTEGRATION_MAX_PARALLEL_FORKS: 1 INTEGRATION_MAX_HEAP_SIZE: "1500M" + CORE_MAX_PARALLEL_FORKS: 2 CASSANDRA_USE_JDK11: <> command: | export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index 259c32528..ff62dffa1 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -218,7 +218,8 @@ test { systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") minHeapSize = '1024m' maxHeapSize = '3072m' - maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8) + maxParallelForks = System.getenv('CORE_MAX_PARALLEL_FORKS')?.toInteger() + ?: Math.max(Runtime.runtime.availableProcessors() * 2, 8) forkEvery = 1 // Enables different end-to-end test classes use Spark contexts with different configurations // Make it so unit tests run on a Jar with Cassandra bridge implementations built in diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java index c2d55c151..39774487e 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; -import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; @@ -40,7 +39,6 @@ */ class CqlTypeY2038Test { - private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); private static final int MAX_TTL_SECONDS = 20 * 365 * 24 * 60 * 60; // 20 years, Cassandra's max allowed TTL private static final long NOW_IN_SEC_AT_BOUNDARY = Integer.MAX_VALUE - (long) MAX_TTL_SECONDS; // last nowInSec where max TTL fits in int private static final long EXCEEDS_INT_RANGE = Integer.MAX_VALUE + 1L; From 30d895f3ceeb9c8ca00df6408b13d2571db9e367 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Wed, 18 Mar 2026 15:22:15 +0000 Subject: [PATCH 7/9] try lower heap size for CI --- .circleci/config.yml | 1 + cassandra-analytics-core/build.gradle | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c0415bf53..0e9f576f1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -65,6 +65,7 @@ commands: INTEGRATION_MAX_PARALLEL_FORKS: 1 INTEGRATION_MAX_HEAP_SIZE: "1500M" CORE_MAX_PARALLEL_FORKS: 2 + CORE_TEST_MAX_HEAP_SIZE: "2048m" CASSANDRA_USE_JDK11: <> command: | export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index ff62dffa1..ffce66600 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -182,7 +182,7 @@ tasks.register('testSequential', Test) { systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") minHeapSize = '1024m' - maxHeapSize = '3072m' + maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m' maxParallelForks = 1 forkEvery = 1 // Enables different end-to-end test classes use Spark contexts with different configurations @@ -217,7 +217,7 @@ tasks.register('testSequential', Test) { test { systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") minHeapSize = '1024m' - maxHeapSize = '3072m' + maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m' maxParallelForks = System.getenv('CORE_MAX_PARALLEL_FORKS')?.toInteger() ?: Math.max(Runtime.runtime.availableProcessors() * 2, 8) forkEvery = 1 // Enables different end-to-end test classes use Spark contexts with different configurations From dc9d1843095ea70f663a1a7132693ab01ac0825e Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Thu, 19 Mar 2026 12:53:24 +0000 Subject: [PATCH 8/9] comments --- CHANGES.txt | 2 +- .../spark/reader/AbstractStreamScanner.java | 5 +- .../spark/data/CqlTypeY2038Test.java | 93 ------------------- 3 files changed, 4 insertions(+), 96 deletions(-) delete mode 100644 cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java diff --git a/CHANGES.txt b/CHANGES.txt index e09435045..35afa6cd2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 0.4.0 ----- - * Fix year 2038 problem using long for absolute times and support C* 5.0 extended localDeletionTime + * Support extended deletion time in CDC for Cassandra 5.0 * Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126) * Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129) * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 25cda1867..0853325cb 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -439,13 +439,14 @@ public void consume() { AbstractComplexTypeBuffer buffer = AbstractComplexTypeBuffer.newBuffer(column.type, cellCount); long maxTimestamp = Long.MIN_VALUE; + // C* 4.0 Cell.isLive requires int for nowInSec; checked cast will throw after Y2038 + int referenceEpochInSecondsAsInt = Ints.checkedCast(timeProvider.referenceEpochInSeconds()); while (cells.hasNext()) { Cell cell = cells.next(); // Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon, // it is handled as tombstone - // C* 4.0 Cell.isLive requires int for nowInSec; checked cast will throw after Y2038 - if (cell.isLive(Ints.checkedCast(timeProvider.referenceEpochInSeconds()))) + if (cell.isLive(referenceEpochInSecondsAsInt)) { buffer.addCell(cell); } diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java deleted file mode 100644 index 39774487e..000000000 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/CqlTypeY2038Test.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.data; - -import java.nio.ByteBuffer; - -import com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.Test; - -import org.apache.cassandra.db.rows.BufferCell; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.spark.reader.SchemaBuilder; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** - * Tests verifying Y2038 boundary behavior for {@link CqlType#tombstone} and {@link CqlType#expiring} - * in Cassandra 4.0. - */ -class CqlTypeY2038Test -{ - private static final int MAX_TTL_SECONDS = 20 * 365 * 24 * 60 * 60; // 20 years, Cassandra's max allowed TTL - private static final long NOW_IN_SEC_AT_BOUNDARY = Integer.MAX_VALUE - (long) MAX_TTL_SECONDS; // last nowInSec where max TTL fits in int - private static final long EXCEEDS_INT_RANGE = Integer.MAX_VALUE + 1L; - private static final long TIMESTAMP = 1000L; - private static final ByteBuffer VALUE = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}); - - private static ColumnMetadata regularColumn() - { - String createStatement = "CREATE TABLE test_ks.test_table (pk int PRIMARY KEY, v int)"; - TableMetadata metadata = new SchemaBuilder(createStatement, - "test_ks", - new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, - ImmutableMap.of("replication_factor", 1)), - Partitioner.Murmur3Partitioner).tableMetaData(); - return metadata.getColumn(ByteBuffer.wrap("v".getBytes())); - } - - @Test - void testExpiringWithinIntRange() - { - ColumnMetadata column = regularColumn(); - BufferCell cell = CqlType.expiring(column, TIMESTAMP, MAX_TTL_SECONDS, NOW_IN_SEC_AT_BOUNDARY, VALUE, null); - assertThat(cell).isNotNull(); - assertThat(cell.timestamp()).isEqualTo(TIMESTAMP); - assertThat(cell.ttl()).isEqualTo(MAX_TTL_SECONDS); - } - - @Test - void testExpiringExceedsIntRangeThrows() - { - ColumnMetadata column = regularColumn(); - assertThatThrownBy(() -> CqlType.expiring(column, TIMESTAMP, MAX_TTL_SECONDS, EXCEEDS_INT_RANGE, VALUE, null)) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - void testTombstoneWithinIntRange() - { - ColumnMetadata column = regularColumn(); - BufferCell cell = CqlType.tombstone(column, TIMESTAMP, NOW_IN_SEC_AT_BOUNDARY, null); - assertThat(cell).isNotNull(); - assertThat(cell.timestamp()).isEqualTo(TIMESTAMP); - } - - @Test - void testTombstoneExceedsIntRangeThrows() - { - ColumnMetadata column = regularColumn(); - assertThatThrownBy(() -> CqlType.tombstone(column, TIMESTAMP, EXCEEDS_INT_RANGE, null)) - .isInstanceOf(IllegalArgumentException.class); - } -} From 3c0270545856eeb93353510b295638801eb8af72 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Tue, 24 Mar 2026 14:08:05 +0000 Subject: [PATCH 9/9] comments --- .../BulkWriteComplexTypeTtlTest.java | 78 +++++++++---------- .../java/org/apache/cassandra/db/DbUtils.java | 6 ++ .../java/org/apache/cassandra/db/DbUtils.java | 8 +- 3 files changed, 49 insertions(+), 43 deletions(-) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java index 569eb031b..3ddb0353c 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java @@ -59,6 +59,7 @@ class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase { static final int ROW_COUNT = 100; + static final int TTL_SECONDS = 5; static final QualifiedName LIST_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_list_ttl"); static final QualifiedName SET_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_set_ttl"); @@ -68,9 +69,36 @@ class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase static final String SIMPLE_UDT_NAME = "simple_udt"; @Test - void testListWithTtl() + void testComplexTypesWithTtl() { SparkSession spark = getOrCreateSparkSession(); + + // Write all complex types with TTL + writeListData(spark); + writeSetData(spark); + writeMapData(spark); + writeUdtData(spark); + + // Wait for TTL to expire (TTL + 1 second margin) + Uninterruptibles.sleepUninterruptibly(TTL_SECONDS + 1, TimeUnit.SECONDS); + + // Verify all types have expired + assertThat(bulkReaderDataFrame(LIST_TTL_TABLE).load().collectAsList()) + .as("list TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(SET_TTL_TABLE).load().collectAsList()) + .as("set TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(MAP_TTL_TABLE).load().collectAsList()) + .as("map TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(UDT_TTL_TABLE).load().collectAsList()) + .as("UDT TTL post-expiry") + .isEmpty(); + } + + private void writeListData(SparkSession spark) + { StructType schema = new StructType() .add("id", IntegerType, false) .add("listdata", createArrayType(IntegerType), false); @@ -80,21 +108,12 @@ void testListWithTtl() .collect(Collectors.toList()); Dataset df = spark.createDataFrame(rows, schema); - bulkWriterDataFrameWriter(df, LIST_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + bulkWriterDataFrameWriter(df, LIST_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) .save(); - - Dataset preExpiry = bulkReaderDataFrame(LIST_TTL_TABLE).load(); - assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); - - Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); - Dataset postExpiry = bulkReaderDataFrame(LIST_TTL_TABLE).load(); - assertThat(postExpiry.collectAsList()).isEmpty(); } - @Test - void testSetWithTtl() + private void writeSetData(SparkSession spark) { - SparkSession spark = getOrCreateSparkSession(); StructType schema = new StructType() .add("id", IntegerType, false) .add("setdata", createArrayType(StringType), false); @@ -104,21 +123,12 @@ void testSetWithTtl() .collect(Collectors.toList()); Dataset df = spark.createDataFrame(rows, schema); - bulkWriterDataFrameWriter(df, SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + bulkWriterDataFrameWriter(df, SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) .save(); - - Dataset preExpiry = bulkReaderDataFrame(SET_TTL_TABLE).load(); - assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); - - Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); - Dataset postExpiry = bulkReaderDataFrame(SET_TTL_TABLE).load(); - assertThat(postExpiry.collectAsList()).isEmpty(); } - @Test - void testMapWithTtl() + private void writeMapData(SparkSession spark) { - SparkSession spark = getOrCreateSparkSession(); StructType schema = new StructType() .add("id", IntegerType, false) .add("mapdata", createMapType(StringType, IntegerType), false); @@ -128,21 +138,12 @@ void testMapWithTtl() .collect(Collectors.toList()); Dataset df = spark.createDataFrame(rows, schema); - bulkWriterDataFrameWriter(df, MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + bulkWriterDataFrameWriter(df, MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) .save(); - - Dataset preExpiry = bulkReaderDataFrame(MAP_TTL_TABLE).load(); - assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); - - Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); - Dataset postExpiry = bulkReaderDataFrame(MAP_TTL_TABLE).load(); - assertThat(postExpiry.collectAsList()).isEmpty(); } - @Test - void testUdtWithTtl() + private void writeUdtData(SparkSession spark) { - SparkSession spark = getOrCreateSparkSession(); StructType udtType = createStructType(new StructField[]{ new StructField("f1", StringType, true, Metadata.empty()), new StructField("f2", IntegerType, true, Metadata.empty()) @@ -156,15 +157,8 @@ void testUdtWithTtl() .collect(Collectors.toList()); Dataset df = spark.createDataFrame(rows, schema); - bulkWriterDataFrameWriter(df, UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(80)) + bulkWriterDataFrameWriter(df, UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) .save(); - - Dataset preExpiry = bulkReaderDataFrame(UDT_TTL_TABLE).load(); - assertThat(preExpiry.collectAsList()).hasSize(ROW_COUNT); - - Uninterruptibles.sleepUninterruptibly(80, TimeUnit.SECONDS); - Dataset postExpiry = bulkReaderDataFrame(UDT_TTL_TABLE).load(); - assertThat(postExpiry.collectAsList()).isEmpty(); } @Override diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index f67446d97..dd0cc1719 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableMetadata; @@ -31,21 +33,25 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } + @VisibleForTesting public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { return DeletionTime.build(markedForDeleteAt, localDeletionTime); } + @VisibleForTesting public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) { return LivenessInfo.create(timestamp, nowInSeconds); } + @VisibleForTesting public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) { return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, nowInSec); } + @VisibleForTesting public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) { return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(nowInSec); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index 283186f9c..4284b0cc1 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -33,24 +34,29 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - // C* 4.0 DeletionTime constructor requires int for localDeletionTime; checked cast will throw after Y2038 + + @VisibleForTesting public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { + // C* 4.0 DeletionTime constructor requires int for localDeletionTime; checked cast will throw after Y2038 return new DeletionTime(markedForDeleteAt, Ints.checkedCast(localDeletionTime)); } + @VisibleForTesting public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) { // C* 4.0 LivenessInfo.create requires int for nowInSeconds; checked cast will throw after Y2038 return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds)); } + @VisibleForTesting public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) { // C* 4.0 fullPartitionDelete requires int for nowInSec; checked cast will throw after Y2038 return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, Ints.checkedCast(nowInSec)); } + @VisibleForTesting public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) { // C* 4.0 simpleBuilder.nowInSec requires int; checked cast will throw after Y2038