diff --git a/.circleci/config.yml b/.circleci/config.yml index ff7eb5002..0e9f576f1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -64,10 +64,13 @@ commands: JDK_VERSION: "<>" INTEGRATION_MAX_PARALLEL_FORKS: 1 INTEGRATION_MAX_HEAP_SIZE: "1500M" + CORE_MAX_PARALLEL_FORKS: 2 + CORE_TEST_MAX_HEAP_SIZE: "2048m" CASSANDRA_USE_JDK11: <> command: | + export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" # Run compile/unit tests, skipping integration tests - ./gradlew --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> + ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<> run_integration: parameters: @@ -93,10 +96,11 @@ commands: INTEGRATION_MAX_HEAP_SIZE: "2500M" CASSANDRA_USE_JDK11: <> command: | + export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g" export DTEST_JAR="dtest-<< parameters.cassandra >>.jar" export CASSANDRA_VERSION=$(echo << parameters.cassandra >> | cut -d'.' -f 1,2) # Run compile but not unit tests (which are run in run_build) - ./gradlew --stacktrace clean assemble + ./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble # Run integration tests in parallel cd cassandra-analytics-integration-tests/src/test/java # Get list of classnames of tests that should run on this node @@ -132,7 +136,7 @@ jobs: name: Build dependencies for jdk11 builds command: | CASSANDRA_USE_JDK11=true ./scripts/build-dependencies.sh - ./gradlew codeCheckTasks + ./gradlew --no-daemon --max-workers=2 codeCheckTasks - persist_to_workspace: root: dependencies paths: diff --git a/CHANGES.txt b/CHANGES.txt index b2e71b6e4..35afa6cd2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 0.4.0 ----- - * Setup CI Pipeline with GitHub Actions (CASSANALYTICS-106) + * Support extended deletion time in CDC for Cassandra 5.0 * Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126) * Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129) * Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60) diff --git a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java index ddb1b48a8..fef91ab6b 100644 --- a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java +++ b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/avro/CdcEventUtils.java @@ -281,14 +281,14 @@ public static GenericData.Record getTTLAvro(CdcEvent event, Schema ttlSchema) return ttlRecord; } - public static Map getTTL(CdcEvent event) + public static Map getTTL(CdcEvent event) { CdcEvent.TimeToLive ttl = event.getTtl(); if (ttl == null) { return null; } - return mapOf(AvroConstants.TTL_KEY, ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec); + return mapOf(AvroConstants.TTL_KEY, (long) ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec); } public static UpdatedEvent getUpdatedEvent(CdcEvent event, diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc index 8609b7bf3..551042987 100644 --- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc +++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_bytes.avsc @@ -145,7 +145,7 @@ }, { "name": "deletedAt", - "type": "int", + "type": "long", "doc": "Future timestamp in seconds" } ] diff --git a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc index 9f4a3a47e..4535fe1dd 100644 --- a/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc +++ b/cassandra-analytics-cdc-codec/src/main/resources/cdc_generic_record.avsc @@ -145,7 +145,7 @@ }, { "name": "deletedAt", - "type": "int", + "type": "long", "doc": "Future timestamp in seconds" } ] diff --git a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java index 8a582f293..ee2aca021 100644 --- a/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java +++ b/cassandra-analytics-cdc-codec/src/test/java/org/apache/cassandra/cdc/json/JsonSerializerTests.java @@ -101,7 +101,7 @@ public void testJsonSerializer() throws IOException assertThat(root.has(AvroConstants.TTL_KEY)).isTrue(); JsonNode ttl = root.get(AvroConstants.TTL_KEY); assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10); - assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asInt()).isEqualTo(1658269); + assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(1658269); } @Test @@ -138,4 +138,29 @@ public void testJsonBinary() throws IOException InetAddress address = InetAddress.getByAddress(Base64.getDecoder().decode(base64Str)); assertThat(address).isEqualTo(InetAddress.getByName("127.0.0.1")); } + + @Test + public void testJsonSerializerWithLongExpirationTime() throws IOException + { + long expirationTimePastIntMax = 2_147_483_648L; + CdcEventBuilder eventBuilder = CdcEventBuilder.of(CdcEvent.Kind.INSERT, TEST_KS, TEST_TBL_BASIC); + eventBuilder.setPartitionKeys(listOf(Value.of(TEST_KS, "a", "int", TYPES.aInt().serialize(1)))); + eventBuilder.setValueColumns(listOf( + Value.of(TEST_KS, "b", "int", TYPES.aInt().serialize(2)) + )); + eventBuilder.setMaxTimestampMicros(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + eventBuilder.setTimeToLive(new CdcEvent.TimeToLive(10, expirationTimePastIntMax)); + + byte[] ar; + try (JsonSerializer serializer = new JsonSerializer(TYPE_LOOKUP)) + { + ar = serializer.serialize("topic", eventBuilder.build()); + } + assertThat(ar).isNotNull(); + + JsonNode root = MAPPER.readTree(ar); + JsonNode ttl = root.get(AvroConstants.TTL_KEY); + assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10); + assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(expirationTimePastIntMax); + } } diff --git a/cassandra-analytics-cdc/build.gradle b/cassandra-analytics-cdc/build.gradle index 6fdd7910c..38c284065 100644 --- a/cassandra-analytics-cdc/build.gradle +++ b/cassandra-analytics-cdc/build.gradle @@ -89,6 +89,9 @@ dependencies { implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}" testImplementation project(":cassandra-analytics-common") + testImplementation project(":cassandra-analytics-cdc-codec") + testImplementation "org.apache.avro:avro:${avroVersion}" + testImplementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}" // pull in cassandra-bridge so we can re-use TestSchema to generate arbitrary schemas for the cdc tests testImplementation project(":cassandra-bridge") diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java new file mode 100644 index 000000000..985b3b1b8 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroByteRecordTransformerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DecoderFactory; +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.cdc.api.KeyspaceTypeKey; +import org.apache.cassandra.cdc.avro.AvroByteRecordTransformer; +import org.apache.cassandra.cdc.avro.AvroConstants; +import org.apache.cassandra.cdc.avro.AvroSchemas; +import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; +import org.apache.cassandra.cdc.avro.TestSchemaStore; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.test.CdcTestBase; +import org.apache.cassandra.cdc.test.CdcTester; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.cdc.test.CdcTester.testWith; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that exercise the CDC-to-Avro byte-serialization pipeline ({@code cdc_bytes.avsc}), + */ +public class AvroByteRecordTransformerTest extends CdcTestBase +{ + private static final int NUM_ROWS = 50; + + private CqlToAvroSchemaConverter getConverter(CassandraVersion version) + { + CqlToAvroSchemaConverter converter = CdcBridgeFactory.getCqlToAvroSchemaConverter(version); + assertThat(converter).isNotNull(); + return converter; + } + + /** + * Build the Avro byte transformer by converting the CQL table schema + * into an Avro schema and registering it in the test schema store. + */ + private AvroByteRecordTransformer buildTransformer(CqlToAvroSchemaConverter converter, + CqlTable cqlTable, + TestSchemaStore schemaStore) + { + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + return new AvroByteRecordTransformer(schemaStore, typeLookup); + } + + /** + * Deserialize a byte payload using the table's Avro schema from the schema store. + */ + private GenericRecord deserializePayload(ByteBuffer payloadBytes, GenericDatumReader reader) throws IOException + { + byte[] bytes = new byte[payloadBytes.remaining()]; + payloadBytes.get(bytes); + return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testTtlDeletedAtByteAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + int ttlSeconds = 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow.setTTL(ttlSeconds); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + + TestSchemaStore schemaStore = new TestSchemaStore(); + AvroByteRecordTransformer transformer = buildTransformer(converter, tableRef.get(), schemaStore); + String namespace = tableRef.get().keyspace() + "." + tableRef.get().table(); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Validate header-level fields + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull(); + assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION); + + // TTL record should be present + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + + // Validate TTL value + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds); + + // Validate deletedAt is a Long (confirms long type in cdc_bytes.avsc) + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + + // Validate deletedAt is approximately nowInSeconds + TTL + long deletedAtValue = (Long) deletedAt; + assertThat(deletedAtValue) + .as("deletedAt should be approximately nowInSeconds + TTL") + .isBetween(beforeTestEpochSec + ttlSeconds, afterTestEpochSec + ttlSeconds); + + // Validate payload: bytes in the byte schema need deserialization to verify content + Object payloadObj = record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payloadObj).isInstanceOf(ByteBuffer.class); + GenericRecord payloadRecord; + try + { + payloadRecord = deserializePayload((ByteBuffer) payloadObj, + schemaStore.getReader(namespace, null)); + } + catch (IOException e) + { + throw new RuntimeException("Failed to deserialize payload", e); + } + assertThat(payloadRecord.get("pk")).isNotNull(); + } + }) + .run(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java new file mode 100644 index 000000000..2672e59df --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/AvroGenericRecordTransformerTest.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.IntStream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.bridge.CdcBridgeFactory; +import org.apache.cassandra.cdc.api.KeyspaceTypeKey; +import org.apache.cassandra.cdc.avro.AvroConstants; +import org.apache.cassandra.cdc.avro.AvroGenericRecordTransformer; +import org.apache.cassandra.cdc.avro.AvroSchemas; +import org.apache.cassandra.cdc.avro.CdcEventUtils; +import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; +import org.apache.cassandra.cdc.avro.TestSchemaStore; +import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer; +import org.apache.cassandra.cdc.msg.CdcEvent; +import org.apache.cassandra.cdc.test.CdcTestBase; +import org.apache.cassandra.cdc.test.CdcTester; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.utils.test.TestSchema; + +import static org.apache.cassandra.cdc.test.CdcTester.newUniquePartitionDeletion; +import static org.apache.cassandra.cdc.test.CdcTester.testWith; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that exercise the full CDC-to-Avro pipeline: + * write mutations -> read CDC events from commit logs -> convert to Avro GenericRecord -> validate. + */ +@SuppressWarnings("DataFlowIssue") +public class AvroGenericRecordTransformerTest extends CdcTestBase +{ + private static final int NUM_ROWS = 50; + + private CqlToAvroSchemaConverter getConverter(CassandraVersion version) + { + CqlToAvroSchemaConverter converter = CdcBridgeFactory.getCqlToAvroSchemaConverter(version); + assertThat(converter).isNotNull(); + return converter; + } + + /** + * Build the Avro transformer by converting the CQL table schema (already registered by CdcTester) + * into an Avro schema and registering it in the test schema store. + */ + private AvroGenericRecordTransformer buildTransformer(CqlToAvroSchemaConverter converter, + CqlTable cqlTable) + { + TestSchemaStore schemaStore = new TestSchemaStore(); + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + return new AvroGenericRecordTransformer(schemaStore, typeLookup, ""); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testBasicInsertAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withClusteringKey("ck", bridge.bigint()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + // Capture CqlTable from the tester via the writer callback + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Validate header fields + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull(); + assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION); + + // Validate payload contains expected fields including clustering key + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + assertThat(payload.getSchema().getField("pk")).isNotNull(); + assertThat(payload.getSchema().getField("ck")).isNotNull(); + assertThat(payload.getSchema().getField("c1")).isNotNull(); + assertThat(payload.getSchema().getField("c2")).isNotNull(); + // Payload fields should have data + assertThat(payload.get("pk")).isNotNull(); + assertThat(payload.get("ck")).isNotNull(); + assertThat(payload.get("c1")).isNotNull(); + + // Validate updateFields lists all updated column names + List updateFields = CdcEventUtils.updatedFieldNames(event); + assertThat(updateFields).contains("pk", "ck", "c1", "c2"); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testCollectionTypesAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("m", bridge.map(bridge.text(), bridge.text())) + .withColumn("s", bridge.set(bridge.aInt())) + .withColumn("l", bridge.list(bridge.text())); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + assertThat(payload.getSchema().getField("m")).isNotNull(); + assertThat(payload.getSchema().getField("s")).isNotNull(); + assertThat(payload.getSchema().getField("l")).isNotNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testAvroSerializeDeserializeRoundTrip(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + long timestampMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + IntStream.range(0, tester.numRows) + .forEach(i -> writer.accept(CdcTester.newUniqueRow(tester.schema, rows), timestampMicros)); + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + TestSchemaStore schemaStore = new TestSchemaStore(); + CqlTable cqlTable = tableRef.get(); + Schema avroSchema = converter.convert(cqlTable); + String namespace = cqlTable.keyspace() + "." + cqlTable.table(); + schemaStore.registerSchema(namespace, avroSchema); + + Function typeLookup = key -> bridge.parseType(key.type); + AvroGenericRecordSerializer serializer = new AvroGenericRecordSerializer(schemaStore, typeLookup, ""); + + for (CdcEvent event : events) + { + // Serialize: CdcEvent -> bytes + byte[] bytes = serializer.serialize("test-topic", event); + assertThat(bytes).isNotNull(); + assertThat(bytes.length).isGreaterThan(0); + + // Get the Avro record's schema for deserialization + GenericData.Record record = serializer.getTransformer().transform(event); + Schema recordSchema = record.getSchema(); + + // Deserialize: bytes -> CdcEnvelope + org.apache.cassandra.cdc.avro.msg.CdcEnvelope envelope = + serializer.deserializer().deserialize(event.keyspace, event.table, bytes, recordSchema); + assertThat(envelope).isNotNull(); + assertThat(envelope.header).isNotNull(); + assertThat(envelope.payload).isNotNull(); + + // Validate round-trip preserves operation type and table info + assertThat(envelope.header.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(envelope.header.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + assertThat(envelope.header.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT"); + + // Validate payload fields survived round-trip + assertThat(envelope.payload.get("pk")).isNotNull(); + assertThat(envelope.payload.get("c1")).isNotNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testDeleteEventAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.bigint()) + .withColumn("c2", bridge.text()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow = testRow.copy("c1", org.apache.cassandra.bridge.CdcBridge.UNSET_MARKER); + testRow = testRow.copy("c2", null); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + // Verify the Avro record correctly encodes the operation type from the event + CdcEventUtils.OperationType opType = CdcEventUtils.getOperationType(event); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo(opType.name()); + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + + GenericRecord payload = (GenericRecord) record.get(AvroConstants.PAYLOAD_KEY); + assertThat(payload).isNotNull(); + // pk should still be present + assertThat(payload.get("pk")).isNotNull(); + // c2 is deleted (null in the payload) + assertThat(payload.get("c2")).isNull(); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMaxSupportedTtlAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + // Use a large TTL value (~20 years). Safe for both 4.0 and 5.0. + int largeTtl = 20 * 365 * 24 * 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + testRow.setTTL(largeTtl); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + // Validate CdcEvent-level TTL + assertThat(event.getTtl()).isNotNull(); + assertThat(event.getTtl().ttlInSec).isEqualTo(largeTtl); + long expirationTime = event.getTtl().expirationTimeInSec; + long expectedLower = beforeTestEpochSec + largeTtl; + long expectedUpper = afterTestEpochSec + largeTtl; + if (expirationTime <= Integer.MAX_VALUE && expectedLower > Integer.MAX_VALUE) + { + // Cassandra 4.0 caps the value at Integer.MAX_VALUE + assertThat(expirationTime) + .as("expirationTimeInSec should be capped near Integer.MAX_VALUE on Cassandra 4.0") + .isBetween((long) Integer.MAX_VALUE - 1, (long) Integer.MAX_VALUE); + } + else + { + assertThat(expirationTime) + .as("expirationTimeInSec should be approximately nowInSeconds + largeTtl") + .isBetween(expectedLower, expectedUpper); + } + + // Validate Avro-level TTL encoding + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(largeTtl); + + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testPartitionDeleteAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + long beforeTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = newUniquePartitionDeletion(tester.schema, rows); + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestMicros = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + for (CdcEvent event : events) + { + // Validate CdcEvent-level partition delete + assertThat(event.getKind()).isEqualTo(CdcEvent.Kind.PARTITION_DELETE); + long eventTimestampMicros = event.getTimestamp(TimeUnit.MICROSECONDS); + assertThat(eventTimestampMicros) + .as("deletion timestamp should be within the test time window") + .isBetween(beforeTestMicros, afterTestMicros); + + // Validate Avro-level encoding + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("DELETE_PARTITION"); + assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table); + assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace); + } + }) + .run(); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions") + public void testMixedTtlAndNonTtlAvroEncoding(CassandraVersion version) + { + AvroSchemas.registerLogicalTypes(); + CqlToAvroSchemaConverter converter = getConverter(version); + + int ttlSeconds = 3600; + long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + TestSchema.Builder schemaBuilder = TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("c1", bridge.aInt()); + + AtomicReference tableRef = new AtomicReference<>(); + testWith(bridge, cdcBridge, commitLogDir, schemaBuilder) + .withNumRows(NUM_ROWS) + .clearWriters() + .withWriter((tester, rows, writer) -> { + tableRef.set(tester.cqlTable); + for (int i = 0; i < tester.numRows; i++) + { + TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows); + if (i % 2 == 0) + { + testRow.setTTL(ttlSeconds); + } + writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + }) + .withCdcEventChecker((testRows, events) -> { + long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + assertThat(events).hasSize(NUM_ROWS); + AvroGenericRecordTransformer transformer = buildTransformer(converter, tableRef.get()); + + int withTtl = 0; + int withoutTtl = 0; + for (CdcEvent event : events) + { + GenericData.Record record = transformer.transform(event); + assertThat(record).isNotNull(); + + if (event.getTtl() != null) + { + withTtl++; + assertThat(event.getTtl().ttlInSec).isEqualTo(ttlSeconds); + assertThat(event.getTtl().expirationTimeInSec).isGreaterThan(0L); + + // Avro TTL field should be present + Object ttlField = record.get(AvroConstants.TTL_KEY); + assertThat(ttlField).as("Avro TTL record should be present for TTL row").isNotNull(); + GenericRecord ttlRecord = (GenericRecord) ttlField; + assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds); + + // Validate deletedAt is a Long with expected value + Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY); + assertThat(deletedAt).isInstanceOf(Long.class); + long deletedAtValue = (Long) deletedAt; + assertThat(deletedAtValue) + .as("deletedAt should be approximately nowInSeconds + TTL") + .isBetween(beforeTestEpochSec + ttlSeconds, afterTestEpochSec + ttlSeconds); + } + else + { + withoutTtl++; + // Avro TTL field should be absent + assertThat(record.get(AvroConstants.TTL_KEY)) + .as("Avro TTL record should be null for non-TTL row") + .isNull(); + } + } + assertThat(withTtl).isEqualTo(NUM_ROWS / 2); + assertThat(withoutTtl).isEqualTo(NUM_ROWS / 2); + }) + .run(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java new file mode 100644 index 000000000..b9a5249a1 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/avro/TestSchemaStore.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc.avro; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.cassandra.cdc.schemastore.SchemaStore; + +/** + * In-memory {@link SchemaStore} implementation for tests. + * Schemas are registered manually via {@link #registerSchema(String, Schema)}. + */ +public class TestSchemaStore implements SchemaStore +{ + private final Map schemas = new HashMap<>(); + private final Map> writers = new HashMap<>(); + private final Map> readers = new HashMap<>(); + + public void registerSchema(String namespace, Schema schema) + { + schemas.put(namespace, schema); + writers.put(namespace, new GenericDatumWriter<>(schema)); + readers.put(namespace, new GenericDatumReader<>(schema)); + } + + @Override + public Schema getSchema(String namespace, String name) + { + return schemas.get(namespace); + } + + @Override + public GenericDatumWriter getWriter(String namespace, String name) + { + return writers.get(namespace); + } + + @Override + public GenericDatumReader getReader(String namespace, String name) + { + return readers.get(namespace); + } +} diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java index d3b5647c4..ae3687be4 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEvent.java @@ -50,9 +50,9 @@ public enum Kind public static class TimeToLive { public final int ttlInSec; - public final int expirationTimeInSec; + public final long expirationTimeInSec; - public TimeToLive(int ttlInSec, int expirationTimeInSec) + public TimeToLive(int ttlInSec, long expirationTimeInSec) { this.ttlInSec = ttlInSec; this.expirationTimeInSec = expirationTimeInSec; diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java index abf661510..60c760b02 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/CdcEventBuilder.java @@ -107,7 +107,7 @@ public void setMaxTimestampMicros(long maxTimestampMicros) this.maxTimestampMicros = maxTimestampMicros; } - public void setTTL(int ttlInSec, int expirationTimeInSec) + public void setTTL(int ttlInSec, long expirationTimeInSec) { // Skip updating TTL if it already has been set. // For the same row, the upsert query can only set one TTL value. diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java index cbc175179..c6189bbb7 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/ReaderTimeProvider.java @@ -24,7 +24,7 @@ */ public class ReaderTimeProvider implements TimeProvider { - private final int referenceEpochInSeconds; + private final long referenceEpochInSeconds; public ReaderTimeProvider() { @@ -35,13 +35,13 @@ public ReaderTimeProvider() * Constructor used for deserialization * @param referenceEpochInSeconds reference epoch to set */ - public ReaderTimeProvider(int referenceEpochInSeconds) + public ReaderTimeProvider(long referenceEpochInSeconds) { this.referenceEpochInSeconds = referenceEpochInSeconds; } @Override - public int referenceEpochInSeconds() + public long referenceEpochInSeconds() { return referenceEpochInSeconds; } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java index 40a251254..1ea9db287 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/TimeProvider.java @@ -38,10 +38,10 @@ public interface TimeProvider @VisibleForTesting TimeProvider DEFAULT = new TimeProvider() { - private final int referenceEpochInSeconds = nowInSeconds(); + private final long referenceEpochInSeconds = nowInSeconds(); @Override - public int referenceEpochInSeconds() + public long referenceEpochInSeconds() { return referenceEpochInSeconds; } @@ -50,9 +50,9 @@ public int referenceEpochInSeconds() /** * @return current time in seconds */ - default int nowInSeconds() + default long nowInSeconds() { - return (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); } /** @@ -62,5 +62,5 @@ default int nowInSeconds() *

* Note that the actual constant value returned is implementation dependent */ - int referenceEpochInSeconds(); + long referenceEpochInSeconds(); } diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index 259c32528..ffce66600 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -182,7 +182,7 @@ tasks.register('testSequential', Test) { systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") minHeapSize = '1024m' - maxHeapSize = '3072m' + maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m' maxParallelForks = 1 forkEvery = 1 // Enables different end-to-end test classes use Spark contexts with different configurations @@ -217,8 +217,9 @@ tasks.register('testSequential', Test) { test { systemProperty "cassandra.analytics.bridges.sstable_format", System.getProperty("cassandra.analytics.bridges.sstable_format", "big") minHeapSize = '1024m' - maxHeapSize = '3072m' - maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8) + maxHeapSize = System.getenv('CORE_TEST_MAX_HEAP_SIZE') ?: '3072m' + maxParallelForks = System.getenv('CORE_MAX_PARALLEL_FORKS')?.toInteger() + ?: Math.max(Runtime.runtime.availableProcessors() * 2, 8) forkEvery = 1 // Enables different end-to-end test classes use Spark contexts with different configurations // Make it so unit tests run on a Jar with Cassandra bridge implementations built in diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index b6e84c6f5..99cbc6827 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -771,7 +771,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); } this.rfMap = (Map) in.readObject(); - this.timeProvider = new ReaderTimeProvider(in.readInt()); + this.timeProvider = new ReaderTimeProvider(in.readLong()); this.sstableTimeRangeFilter = (SSTableTimeRangeFilter) in.readObject(); this.maybeQuoteKeyspaceAndTable(); this.initSidecarClient(); @@ -817,7 +817,7 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeUTF(feature.optionName()); } out.writeObject(this.rfMap); - out.writeInt(timeProvider.referenceEpochInSeconds()); + out.writeLong(timeProvider.referenceEpochInSeconds()); out.writeObject(this.sstableTimeRangeFilter); } @@ -893,7 +893,7 @@ public void write(Kryo kryo, Output out, CassandraDataLayer dataLayer) .collect(Collectors.toList()); kryo.writeObject(out, listWrapper); kryo.writeObject(out, dataLayer.rfMap); - out.writeInt(dataLayer.timeProvider.referenceEpochInSeconds()); + out.writeLong(dataLayer.timeProvider.referenceEpochInSeconds()); kryo.writeObject(out, dataLayer.sstableTimeRangeFilter); } @@ -936,7 +936,7 @@ public CassandraDataLayer read(Kryo kryo, Input in, Class ty in.readString(), kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(), kryo.readObject(in, HashMap.class), - new ReaderTimeProvider(in.readInt()), + new ReaderTimeProvider(in.readLong()), kryo.readObject(in, SSTableTimeRangeFilter.class)); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java index 1bec25eaf..53072e172 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/SSTableReaderTests.java @@ -25,7 +25,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import com.google.common.util.concurrent.Uninterruptibles; @@ -73,7 +73,7 @@ private void testTtlUsingConstantReferenceTimeHelper(CassandraBridge bridgeForTe int rows, int expectedValues) { - AtomicInteger referenceEpoch = new AtomicInteger(0); + AtomicLong referenceEpoch = new AtomicLong(0); TimeProvider navigatableTimeProvider = referenceEpoch::get; Set expectedColValue = new HashSet<>(Arrays.asList(1, 2, 3)); @@ -90,7 +90,7 @@ private void testTtlUsingConstantReferenceTimeHelper(CassandraBridge bridgeForTe } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); }); - int t1 = navigatableTimeProvider.nowInSeconds(); + long t1 = navigatableTimeProvider.nowInSeconds(); assertThat(countSSTables(dir)).isEqualTo(1); // open CompactionStreamScanner over SSTables diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java new file mode 100644 index 000000000..3ddb0353c --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteComplexTypeTtlTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.bulkwriter.TTLOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; +import org.apache.cassandra.testing.ClusterBuilderConfiguration; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.apache.spark.sql.types.DataTypes.createArrayType; +import static org.apache.spark.sql.types.DataTypes.createMapType; +import static org.apache.spark.sql.types.DataTypes.createStructType; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that bulk writes with TTL work correctly for complex types (list, set, map, UDT). + * This ensures the expiring cell path is invoked correctly for collection and composite types. + * TODO: add tuple ttl test after adding support for writing tuples + */ +class BulkWriteComplexTypeTtlTest extends SharedClusterSparkIntegrationTestBase +{ + static final int ROW_COUNT = 100; + static final int TTL_SECONDS = 5; + + static final QualifiedName LIST_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_list_ttl"); + static final QualifiedName SET_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_set_ttl"); + static final QualifiedName MAP_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_map_ttl"); + static final QualifiedName UDT_TTL_TABLE = new QualifiedName(TEST_KEYSPACE, "test_udt_ttl"); + + static final String SIMPLE_UDT_NAME = "simple_udt"; + + @Test + void testComplexTypesWithTtl() + { + SparkSession spark = getOrCreateSparkSession(); + + // Write all complex types with TTL + writeListData(spark); + writeSetData(spark); + writeMapData(spark); + writeUdtData(spark); + + // Wait for TTL to expire (TTL + 1 second margin) + Uninterruptibles.sleepUninterruptibly(TTL_SECONDS + 1, TimeUnit.SECONDS); + + // Verify all types have expired + assertThat(bulkReaderDataFrame(LIST_TTL_TABLE).load().collectAsList()) + .as("list TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(SET_TTL_TABLE).load().collectAsList()) + .as("set TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(MAP_TTL_TABLE).load().collectAsList()) + .as("map TTL post-expiry") + .isEmpty(); + assertThat(bulkReaderDataFrame(UDT_TTL_TABLE).load().collectAsList()) + .as("UDT TTL post-expiry") + .isEmpty(); + } + + private void writeListData(SparkSession spark) + { + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("listdata", createArrayType(IntegerType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, Arrays.asList(i, i + 1))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, LIST_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) + .save(); + } + + private void writeSetData(SparkSession spark) + { + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("setdata", createArrayType(StringType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, ImmutableSet.of("item" + i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, SET_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) + .save(); + } + + private void writeMapData(SparkSession spark) + { + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("mapdata", createMapType(StringType, IntegerType), false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, ImmutableMap.of("key" + i, i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, MAP_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) + .save(); + } + + private void writeUdtData(SparkSession spark) + { + StructType udtType = createStructType(new StructField[]{ + new StructField("f1", StringType, true, Metadata.empty()), + new StructField("f2", IntegerType, true, Metadata.empty()) + }); + StructType schema = new StructType() + .add("id", IntegerType, false) + .add("udtfield", udtType, false); + + List rows = IntStream.range(0, ROW_COUNT) + .mapToObj(i -> RowFactory.create(i, RowFactory.create("course" + i, i))) + .collect(Collectors.toList()); + Dataset df = spark.createDataFrame(rows, schema); + + bulkWriterDataFrameWriter(df, UDT_TTL_TABLE).option(WriterOptions.TTL.name(), TTLOption.constant(TTL_SECONDS)) + .save(); + } + + @Override + protected ClusterBuilderConfiguration testClusterConfiguration() + { + return super.testClusterConfiguration() + .nodesPerDc(3); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(LIST_TTL_TABLE, DC1_RF3); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TYPE " + TEST_KEYSPACE + "." + SIMPLE_UDT_NAME + + " (f1 text, f2 int)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + LIST_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "listdata list)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + SET_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "setdata set)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + MAP_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "mapdata map)"); + + cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + UDT_TTL_TABLE + " (" + + "id int PRIMARY KEY, " + + "udtfield " + SIMPLE_UDT_NAME + ")"); + } +} diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java index b4e7d8b1c..62c9d6854 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroCdcEventBuilder.java @@ -242,9 +242,7 @@ else if (complex.cellsCount() > 0) holder.add(makeValue(cell.buffer(), cell.column())); if (cell.isExpiring()) { - // TODO: CASSANDRA-14227 Support unit interpretation, - // so that TTL does not overflow and become negative. - setTTL(cell.ttl(), Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); + setTTL(cell.ttl(), cell.localDeletionTime()); } } } @@ -274,7 +272,7 @@ private void processComplexData(List holder, ComplexColumnData complex) buffer.addCell(cell); if (cell.isExpiring()) { - setTTL(cell.ttl(), Cell.deletionTimeLongToUnsignedInteger(cell.localDeletionTime())); + setTTL(cell.ttl(), cell.localDeletionTime()); } } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index 963f3454b..dd0cc1719 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableMetadata; @@ -31,21 +33,25 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + @VisibleForTesting + public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { return DeletionTime.build(markedForDeleteAt, localDeletionTime); } + @VisibleForTesting public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) { return LivenessInfo.create(timestamp, nowInSeconds); } + @VisibleForTesting public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) { return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, nowInSec); } + @VisibleForTesting public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) { return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(nowInSec); diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java index 63ab2fdbc..dfd18bb3e 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java @@ -187,7 +187,7 @@ private void rawAddRow(List values) throws InvalidRequestException, ClientState.forInternalCalls(), options, delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), - (int) TimeUnit.MILLISECONDS.toSeconds(now), + TimeUnit.MILLISECONDS.toSeconds(now), delete.getTimeToLive(options), Collections.emptyMap()); diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java index df6af1380..34426e632 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java @@ -117,7 +117,7 @@ protected void handleCellTombstoneInComplex(BigInteger token, Cell cell) @Override UnfilteredPartitionIterator initializePartitions() { - int nowInSec = timeProvider.referenceEpochInSeconds(); + long nowInSec = timeProvider.referenceEpochInSeconds(); Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace); ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(metadata.name); controller = new PurgingCompactionController(cfStore, CompactionParams.TombstoneOption.NONE); diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index 8dffb2960..72a656a39 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -48,7 +48,7 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil long deletionTime) { Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); - rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); + rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, TimeUnit.MICROSECONDS.toSeconds(deletionTime))); } public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java index 7965272e3..4284b0cc1 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -33,23 +34,32 @@ private DbUtils() throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + + @VisibleForTesting + public static DeletionTime deletionTime(long markedForDeleteAt, long localDeletionTime) { - return new DeletionTime(markedForDeleteAt, localDeletionTime); + // C* 4.0 DeletionTime constructor requires int for localDeletionTime; checked cast will throw after Y2038 + return new DeletionTime(markedForDeleteAt, Ints.checkedCast(localDeletionTime)); } + @VisibleForTesting public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) { + // C* 4.0 LivenessInfo.create requires int for nowInSeconds; checked cast will throw after Y2038 return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds)); } + @VisibleForTesting public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) { + // C* 4.0 fullPartitionDelete requires int for nowInSec; checked cast will throw after Y2038 return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, Ints.checkedCast(nowInSec)); } + @VisibleForTesting public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) { + // C* 4.0 simpleBuilder.nowInSec requires int; checked cast will throw after Y2038 return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(Ints.checkedCast(nowInSec)); } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java index 32ff70533..57868d39f 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.cassandra.bridge.CassandraSchema; import org.apache.cassandra.config.DatabaseDescriptor; @@ -185,7 +186,7 @@ private void rawAddRow(List values) throws InvalidRequestException, delete.updatedColumns(), options, delete.getTimestamp(TimeUnit.MILLISECONDS.toMicros(now), options), - (int) TimeUnit.MILLISECONDS.toSeconds(now), + Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(now)), delete.getTimeToLive(options), Collections.emptyMap()); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 05d9d45ec..0853325cb 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -25,6 +25,7 @@ import java.util.Iterator; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ClusteringPrefix; @@ -438,12 +439,14 @@ public void consume() { AbstractComplexTypeBuffer buffer = AbstractComplexTypeBuffer.newBuffer(column.type, cellCount); long maxTimestamp = Long.MIN_VALUE; + // C* 4.0 Cell.isLive requires int for nowInSec; checked cast will throw after Y2038 + int referenceEpochInSecondsAsInt = Ints.checkedCast(timeProvider.referenceEpochInSeconds()); while (cells.hasNext()) { Cell cell = cells.next(); // Re: isLive vs. isTombstone - isLive considers TTL so that if a cell is expiring soon, // it is handled as tombstone - if (cell.isLive(timeProvider.referenceEpochInSeconds())) + if (cell.isLive(referenceEpochInSecondsAsInt)) { buffer.addCell(cell); } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java index 89a2df3f2..2925eb590 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/CompactionStreamScanner.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; import org.apache.cassandra.db.AbstractCompactionController; import org.apache.cassandra.db.ColumnFamilyStore; @@ -117,7 +118,7 @@ protected void handleCellTombstoneInComplex(BigInteger token, Cell cell) @Override UnfilteredPartitionIterator initializePartitions() { - int nowInSec = timeProvider.referenceEpochInSeconds(); + long nowInSec = timeProvider.referenceEpochInSeconds(); Keyspace keyspace = Keyspace.openWithoutSSTables(metadata.keyspace); ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore(metadata.name); controller = new PurgingCompactionController(cfStore, CompactionParams.TombstoneOption.NONE); @@ -125,7 +126,8 @@ UnfilteredPartitionIterator initializePartitions() .map(Scannable::scanner) .collect(Collectors.toList()); scanners = new AbstractCompactionStrategy.ScannerList(scannerList); - ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, nowInSec, taskId); + // C* 4.0 CompactionIterator requires int for nowInSec; checked cast will throw after Y2038 + ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners, controller, Ints.checkedCast(nowInSec), taskId); return ci; } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java index e4425d753..008d34cc8 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.cdc.api.Row; @@ -138,7 +139,7 @@ public void addCell(org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { addCell(rowBuilder, cd, timestamp, ttl, now, value, null); @@ -149,7 +150,7 @@ public void addCell(org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value, CellPath cellPath) { @@ -199,6 +200,7 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil long deletionTime) { Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); - rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); + // C* 4.0 DeletionTime constructor requires int for localDeletionTime + rowBuilder.addComplexDeletion(cd, new DeletionTime(deletionTime, Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(deletionTime)))); } } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index e91027a68..3912d9a89 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -31,11 +31,13 @@ public abstract class CqlType extends AbstractCqlType { public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) { + // C* 4.0 BufferCell.tombstone requires int for nowInSec; checked cast will throw after Y2038 return BufferCell.tombstone(column, timestamp, Ints.checkedCast(nowInSec), path); } public static BufferCell expiring(ColumnMetadata column, long timestamp, int ttl, long nowInSec, ByteBuffer value, CellPath path) { + // C* 4.0 BufferCell.expiring requires int for nowInSec; checked cast will throw after Y2038 return BufferCell.expiring(column, timestamp, ttl, Ints.checkedCast(nowInSec), value, path); } } diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java index a7cf8bea4..43619e6f7 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/AbstractCqlList.java @@ -105,15 +105,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Object o : (List) value) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, type().serialize(o), - CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, type().serialize(o), + CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); } else { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java index 6d0e49d35..e9003b09f 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java @@ -120,15 +120,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Map.Entry entry : ((Map) value).entrySet()) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, valueType().serialize(entry.getValue()), - CellPath.create(keyType().serialize(entry.getKey())))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, valueType().serialize(entry.getValue()), + CellPath.create(keyType().serialize(entry.getKey())))); } else { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java index c50b80591..bbe504ae2 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java @@ -104,15 +104,15 @@ public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, ColumnMetadata cd, long timestamp, int ttl, - int now, + long now, Object value) { for (Object o : (Set) value) { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, ByteBufferUtil.EMPTY_BYTE_BUFFER, - CellPath.create(type().serialize(o)))); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, ByteBufferUtil.EMPTY_BYTE_BUFFER, + CellPath.create(type().serialize(o)))); } else {