Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .circleci/config.yml
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you wanna revert these changes or are these intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional, CI pipelines failing otherwise.

Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,13 @@ commands:
JDK_VERSION: "<<parameters.jdk>>"
INTEGRATION_MAX_PARALLEL_FORKS: 1
INTEGRATION_MAX_HEAP_SIZE: "1500M"
CORE_MAX_PARALLEL_FORKS: 2
CORE_TEST_MAX_HEAP_SIZE: "2048m"
CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
command: |
export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
# Run compile/unit tests, skipping integration tests
./gradlew --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble check -x cassandra-analytics-integration-tests:test -Dcassandra.analytics.bridges.sstable_format=<<parameters.sstable_format>>
Comment on lines +67 to +73
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making CI pipeline changes, can you try to tag the relevant tests with @Tag("Sequential")?

--no-daemon flag makes sense.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We don't know which tests causing this to tag them. Gradle getting killed while executing pipelines starting with spark3-* which run unit tests (gradlew check). Pipelines starting with int-* are running fine. This problem exists in trunk, not created by this PR. Running CI with --info didn't give any clues as it was bloating logs and CI killing gradle
  • And we will have the same problem of finding the culprit everytime CI is broken

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem exists in trunk, not created by this PR

All commits are made only after CI is green. I do not think the problem pre-exists in trunk. Likely relevant to the new tests. You will have to dig into the details to determine which test should be tagged with Sequential. The general idea is to run the resource consuming tests sequentially. @lukasz-antoniak, might have some tips.

we will have the same problem of finding the culprit everytime CI is broken

I share the same concern. This is why I am conservative on modifying the CI configuration, given that the CI issue is probably relevant to the current patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I see same failures on trunk as well without any changes https://app.circleci.com/pipelines/github/skoppu22/cassandra-analytics?branch=trunk
    I am seeing this consistency on trunk, not sure is Circle CI backend for Europe region has different setup by any chance.

  • Finding the test which went in trunk causing this will be a separate task and takes time. We won't be able to merge in this PR until we do that, as my trunk pipelines also never turning green without this fix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with @skoppu22 on this one; we should band-aid here to get things working again and open another JIRA to follow up and fix it the right way. Blocking all work on root causing a regression from another commit should either be solved by:
a. git revert on that commit, or
b. a band-aid workaround w/a follow up JIRA to unblock work

In my humble opinion. :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am good with having follow up work to address the root cause.

My reason was that all the prior commits were only merged with green CI. This is the policy we have been following. Admittedly, there could be reruns and some flakiness. If the build starts to fail consistently, it could mean something specific to this patch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the 5.0 support w/parameterized testing go in w/out green CI?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yifan-c definitely something went in without green CI or something changed in Circle CI side. As I shared link above, trunk pipelines consistently failing with the same problem.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something went in between March 2nd and March 13 broke this. My trunk pipeline didn't had this problem on March 2nd, then consistently failing from March 13th.


run_integration:
parameters:
Expand All @@ -93,10 +96,11 @@ commands:
INTEGRATION_MAX_HEAP_SIZE: "2500M"
CASSANDRA_USE_JDK11: <<parameters.use_jdk11>>
command: |
export GRADLE_OPTS="-Xmx2g -Dorg.gradle.jvmargs=-Xmx2g"
export DTEST_JAR="dtest-<< parameters.cassandra >>.jar"
export CASSANDRA_VERSION=$(echo << parameters.cassandra >> | cut -d'.' -f 1,2)
# Run compile but not unit tests (which are run in run_build)
./gradlew --stacktrace clean assemble
./gradlew --no-daemon --max-workers=2 --stacktrace clean assemble
# Run integration tests in parallel
cd cassandra-analytics-integration-tests/src/test/java
# Get list of classnames of tests that should run on this node
Expand Down Expand Up @@ -132,7 +136,7 @@ jobs:
name: Build dependencies for jdk11 builds
command: |
CASSANDRA_USE_JDK11=true ./scripts/build-dependencies.sh
./gradlew codeCheckTasks
./gradlew --no-daemon --max-workers=2 codeCheckTasks
- persist_to_workspace:
root: dependencies
paths:
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
0.4.0
-----
* Setup CI Pipeline with GitHub Actions (CASSANALYTICS-106)
* Support extended deletion time in CDC for Cassandra 5.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 to the removal. GitHub Actions is not user facing

* Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129)
* Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,14 @@ public static GenericData.Record getTTLAvro(CdcEvent event, Schema ttlSchema)
return ttlRecord;
}

public static Map<String, Integer> getTTL(CdcEvent event)
public static Map<String, Long> getTTL(CdcEvent event)
{
CdcEvent.TimeToLive ttl = event.getTtl();
if (ttl == null)
{
return null;
}
return mapOf(AvroConstants.TTL_KEY, ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
return mapOf(AvroConstants.TTL_KEY, (long) ttl.ttlInSec, AvroConstants.DELETED_AT_KEY, ttl.expirationTimeInSec);
Comment on lines +284 to +291
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why converting TTL to long, meanwhile the schema has it as int?
The change is probably unnecessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are creating map, one entry as (TTL_KEY, TTL value as int), second entry (DELETED_AT_KEY, expirationTimeInSec value as long), so we have to take higher of int and long. Otherwise we need to return Map<String, Number> so Number can represent both int and long.

}

public static UpdatedEvent getUpdatedEvent(CdcEvent event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
},
{
"name": "deletedAt",
"type": "int",
"type": "long",
"doc": "Future timestamp in seconds"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
},
{
"name": "deletedAt",
"type": "int",
"type": "long",
"doc": "Future timestamp in seconds"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testJsonSerializer() throws IOException
assertThat(root.has(AvroConstants.TTL_KEY)).isTrue();
JsonNode ttl = root.get(AvroConstants.TTL_KEY);
assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asInt()).isEqualTo(1658269);
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(1658269);
}

@Test
Expand Down Expand Up @@ -138,4 +138,29 @@ public void testJsonBinary() throws IOException
InetAddress address = InetAddress.getByAddress(Base64.getDecoder().decode(base64Str));
assertThat(address).isEqualTo(InetAddress.getByName("127.0.0.1"));
}

@Test
public void testJsonSerializerWithLongExpirationTime() throws IOException
{
long expirationTimePastIntMax = 2_147_483_648L;
CdcEventBuilder eventBuilder = CdcEventBuilder.of(CdcEvent.Kind.INSERT, TEST_KS, TEST_TBL_BASIC);
eventBuilder.setPartitionKeys(listOf(Value.of(TEST_KS, "a", "int", TYPES.aInt().serialize(1))));
eventBuilder.setValueColumns(listOf(
Value.of(TEST_KS, "b", "int", TYPES.aInt().serialize(2))
));
eventBuilder.setMaxTimestampMicros(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
eventBuilder.setTimeToLive(new CdcEvent.TimeToLive(10, expirationTimePastIntMax));

byte[] ar;
try (JsonSerializer serializer = new JsonSerializer(TYPE_LOOKUP))
{
ar = serializer.serialize("topic", eventBuilder.build());
}
assertThat(ar).isNotNull();

JsonNode root = MAPPER.readTree(ar);
JsonNode ttl = root.get(AvroConstants.TTL_KEY);
assertThat(ttl.get(AvroConstants.TTL_KEY).asInt()).isEqualTo(10);
assertThat(ttl.get(AvroConstants.DELETED_AT_KEY).asLong()).isEqualTo(expirationTimePastIntMax);
}
}
3 changes: 3 additions & 0 deletions cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ dependencies {
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"

testImplementation project(":cassandra-analytics-common")
testImplementation project(":cassandra-analytics-cdc-codec")
testImplementation "org.apache.avro:avro:${avroVersion}"
testImplementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}"

// pull in cassandra-bridge so we can re-use TestSchema to generate arbitrary schemas for the cdc tests
testImplementation project(":cassandra-bridge")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.cdc;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CdcBridgeFactory;
import org.apache.cassandra.cdc.api.KeyspaceTypeKey;
import org.apache.cassandra.cdc.avro.AvroByteRecordTransformer;
import org.apache.cassandra.cdc.avro.AvroConstants;
import org.apache.cassandra.cdc.avro.AvroSchemas;
import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
import org.apache.cassandra.cdc.avro.TestSchemaStore;
import org.apache.cassandra.cdc.msg.CdcEvent;
import org.apache.cassandra.cdc.test.CdcTestBase;
import org.apache.cassandra.cdc.test.CdcTester;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.utils.test.TestSchema;

import static org.apache.cassandra.cdc.test.CdcTester.testWith;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests that exercise the CDC-to-Avro byte-serialization pipeline ({@code cdc_bytes.avsc}),
*/
public class AvroByteRecordTransformerTest extends CdcTestBase
{
private static final int NUM_ROWS = 50;

private CqlToAvroSchemaConverter getConverter(CassandraVersion version)
{
CqlToAvroSchemaConverter converter = CdcBridgeFactory.getCqlToAvroSchemaConverter(version);
assertThat(converter).isNotNull();
return converter;
}

/**
* Build the Avro byte transformer by converting the CQL table schema
* into an Avro schema and registering it in the test schema store.
*/
private AvroByteRecordTransformer buildTransformer(CqlToAvroSchemaConverter converter,
CqlTable cqlTable,
TestSchemaStore schemaStore)
{
Schema avroSchema = converter.convert(cqlTable);
String namespace = cqlTable.keyspace() + "." + cqlTable.table();
schemaStore.registerSchema(namespace, avroSchema);

Function<KeyspaceTypeKey, CqlField.CqlType> typeLookup = key -> bridge.parseType(key.type);
return new AvroByteRecordTransformer(schemaStore, typeLookup);
}

/**
* Deserialize a byte payload using the table's Avro schema from the schema store.
*/
private GenericRecord deserializePayload(ByteBuffer payloadBytes, GenericDatumReader<GenericRecord> reader) throws IOException
{
byte[] bytes = new byte[payloadBytes.remaining()];
payloadBytes.get(bytes);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null));
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.cdc.test.TestVersionSupplier#testVersions")
public void testTtlDeletedAtByteAvroEncoding(CassandraVersion version)
{
AvroSchemas.registerLogicalTypes();
CqlToAvroSchemaConverter converter = getConverter(version);

int ttlSeconds = 3600;
long beforeTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());

TestSchema.Builder schemaBuilder = TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("c1", bridge.bigint())
.withColumn("c2", bridge.text());

AtomicReference<CqlTable> tableRef = new AtomicReference<>();
testWith(bridge, cdcBridge, commitLogDir, schemaBuilder)
.withNumRows(NUM_ROWS)
.clearWriters()
.withWriter((tester, rows, writer) -> {
tableRef.set(tester.cqlTable);
for (int i = 0; i < tester.numRows; i++)
{
TestSchema.TestRow testRow = CdcTester.newUniqueRow(tester.schema, rows);
testRow.setTTL(ttlSeconds);
writer.accept(testRow, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
}
})
.withCdcEventChecker((testRows, events) -> {
long afterTestEpochSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
assertThat(events).hasSize(NUM_ROWS);

TestSchemaStore schemaStore = new TestSchemaStore();
AvroByteRecordTransformer transformer = buildTransformer(converter, tableRef.get(), schemaStore);
String namespace = tableRef.get().keyspace() + "." + tableRef.get().table();

for (CdcEvent event : events)
{
GenericData.Record record = transformer.transform(event);
assertThat(record).isNotNull();

// Validate header-level fields
assertThat(record.get(AvroConstants.SOURCE_TABLE_KEY).toString()).isEqualTo(event.table);
assertThat(record.get(AvroConstants.SOURCE_KEYSPACE_KEY).toString()).isEqualTo(event.keyspace);
assertThat(record.get(AvroConstants.OPERATION_TYPE_KEY).toString()).isEqualTo("INSERT");
assertThat(record.get(AvroConstants.TIMESTAMP_KEY)).isNotNull();
assertThat(record.get(AvroConstants.VERSION_KEY).toString()).isEqualTo(AvroConstants.CURRENT_VERSION);

// TTL record should be present
Object ttlField = record.get(AvroConstants.TTL_KEY);
assertThat(ttlField).isNotNull();
GenericRecord ttlRecord = (GenericRecord) ttlField;

// Validate TTL value
assertThat(ttlRecord.get(AvroConstants.TTL_KEY)).isEqualTo(ttlSeconds);

// Validate deletedAt is a Long (confirms long type in cdc_bytes.avsc)
Object deletedAt = ttlRecord.get(AvroConstants.DELETED_AT_KEY);
assertThat(deletedAt).isInstanceOf(Long.class);

// Validate deletedAt is approximately nowInSeconds + TTL
long deletedAtValue = (Long) deletedAt;
assertThat(deletedAtValue)
.as("deletedAt should be approximately nowInSeconds + TTL")
.isBetween(beforeTestEpochSec + ttlSeconds, afterTestEpochSec + ttlSeconds);

// Validate payload: bytes in the byte schema need deserialization to verify content
Object payloadObj = record.get(AvroConstants.PAYLOAD_KEY);
assertThat(payloadObj).isInstanceOf(ByteBuffer.class);
GenericRecord payloadRecord;
try
{
payloadRecord = deserializePayload((ByteBuffer) payloadObj,
schemaStore.getReader(namespace, null));
}
catch (IOException e)
{
throw new RuntimeException("Failed to deserialize payload", e);
}
assertThat(payloadRecord.get("pk")).isNotNull();
}
})
.run();
}
}
Loading
Loading