Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ jobs:
scala: "2.13"
jdk: "11"
use_jdk11: "true"
cassandra: "5.0.5"
cassandra: "5.0.7"

- store_artifacts:
path: build/test-reports
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ jobs:
strategy:
matrix:
scala: [ '2.12', '2.13' ]
cassandra: [ '4.0.17', '4.1.4', '5.0.5' ]
cassandra: [ '4.0.17', '4.1.4', '5.0.7' ]
job_index: [ 0, 1, 2, 3, 4 ]
job_total: [ 5 ]
exclude:
- scala: "2.12"
cassandra: "5.0.5"
cassandra: "5.0.7"
- scala: "2.12"
cassandra: "4.1.4"
- scala: "2.13"
Expand Down
2 changes: 1 addition & 1 deletion cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jar {
}

test {
systemProperty "cassandra.sidecar.versions_to_test", (System.getenv("CASSANDRA_VERSION") ?: "4.0.17,5.0.5")
systemProperty "cassandra.sidecar.versions_to_test", (System.getenv("CASSANDRA_VERSION") ?: "4.0.17,5.0.7")

minHeapSize = '1024m'
maxHeapSize = '3072m'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private TestVersionSupplier()

public static Stream<CassandraVersion> testVersions()
{
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.5");
String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.7");
return Arrays.stream(versions.split(","))
.map(String::trim)
.map(v -> CassandraVersion.fromVersion(v).orElseThrow(() -> new IllegalArgumentException("Unsupported version: " + v)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public String jarBaseName()
.toArray(CassandraVersion[]::new);

String providedSupportedVersionsOrDefault = System.getProperty("cassandra.analytics.bridges.supported_versions",
"cassandra-4.0.17,cassandra-5.0.5");
"cassandra-4.0.17,cassandra-5.0.7");
supportedVersions = Arrays.stream(providedSupportedVersionsOrDefault.split(","))
.filter(version -> CassandraVersion.fromVersion(version)
.filter(v -> v.sstableFormats.contains(sstableFormat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public abstract class CassandraTypes
{
public static final Pattern COLLECTION_PATTERN = Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern VECTOR_PATTERN = Pattern.compile("^(vector)<(.+),(.+)>$", Pattern.CASE_INSENSITIVE);
public static final Pattern FROZEN_PATTERN = Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE);

private final UDTs udts = new UDTs();
Expand Down Expand Up @@ -133,6 +134,8 @@ public List<CqlField.NativeType> supportedTypes()

public abstract CqlField.CqlList list(CqlField.CqlType type);

public abstract CqlField.CqlVector vector(CqlField.CqlType type, int dimensions);

public abstract CqlField.CqlSet set(CqlField.CqlType type);

public abstract CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType);
Expand Down Expand Up @@ -189,6 +192,14 @@ public CqlField.CqlType parseType(String type, Map<String, CqlField.CqlUdt> udts
.map(collectionType -> parseType(collectionType, udts))
.toArray(CqlField.CqlType[]::new));
}
Matcher vectorMatcher = VECTOR_PATTERN.matcher(type);
if (vectorMatcher.find())
{
// CQL vector
String subType = vectorMatcher.group(2);
int dimensions = Integer.parseInt(vectorMatcher.group(3).trim());
return vector(parseType(subType, udts), dimensions);
}
Matcher frozenMatcher = FROZEN_PATTERN.matcher(type);
if (frozenMatcher.find())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface CqlType extends Serializable
{
enum InternalType
{
NativeCql, Set, List, Map, Frozen, Udt, Tuple;
NativeCql, Set, List, Map, Frozen, Udt, Tuple, Vector;

public static InternalType fromString(String name)
{
Expand All @@ -77,6 +77,8 @@ public static InternalType fromString(String name)
return Set;
case "list":
return List;
case "vector":
return Vector;
case "map":
return Map;
case "tuple":
Expand Down Expand Up @@ -237,6 +239,10 @@ public interface CqlList extends CqlCollection
{
}

public interface CqlVector extends CqlCollection
{
}

public interface CqlTuple extends CqlCollection
{
ByteBuffer serializeTuple(Object[] values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public interface CommitResultSupplier extends BiFunction<List<String>, String, D
{
}

public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.5";
public static final String DEFAULT_CASSANDRA_VERSION = "cassandra-5.0.7";

private final UUID jobId;
private boolean skipClean = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void testWriteWithSubRanges(String version)
@MethodSource("data")
void testWriteWithDataInMultipleSubRanges(String version)
{
version = "cassandra-5.0.5";
version = "cassandra-5.0.7";
setUp(version);
MockBulkWriterContext m = Mockito.spy(writerContext);
TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private void setup(ConsistencyLevel.CL consistencyLevel)
{
digestAlgorithm = new XXHash32DigestAlgorithm();
tableWriter = new MockTableWriter(folder);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.5", consistencyLevel);
writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-5.0.7", consistencyLevel);
writerContext.setReplicationFactor(new ReplicationFactor(NetworkTopologyStrategy, rfOptions));
transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.spark.TestUtils;
import org.apache.cassandra.spark.Tester;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.utils.RandomUtils;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.apache.spark.sql.Row;
import org.quicktheories.core.Gen;
import scala.collection.mutable.AbstractSeq;

import static org.apache.cassandra.spark.utils.ScalaConversionUtils.mutableSeqAsJavaList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.quicktheories.QuickTheory.qt;
import static org.quicktheories.generators.SourceDSL.arbitrary;

@Tag("Sequential")
public class DataTypeTests
Expand Down Expand Up @@ -103,6 +107,103 @@ public void testSet(CassandraBridge bridge)
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVector(CassandraBridge bridge)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this test uses bridge to write sstables to disk and doesn't test complete bulk writer and bulk reader path. Can you add atleast one dtest using bulkWriterDataFrameWriter and bulkReaderDataFrame to test e2e writing and reading of vectors?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure do we need a vector converter in SqlToCqlTypeConverter.java similar to what we did for Tuples in this PR https://github.com/apache/cassandra-analytics/pull/174/changes. A dtest using bulk writer and reader will expose it if a converter is needed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good catch! Still working on it.

{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(type, 10)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorVector(CassandraBridge bridge)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.vector(type, 2), 5)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorList(CassandraBridge bridge)
{
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.list(type), 3)))
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorUDT(CassandraBridge bridge)
{
// pk -> a vector<frozen<nested_udt<x int, y type, z int>>, 10>
// Test vector of UDTs
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().withExamples(10)
.forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(
bridge.udt("keyspace", "nested_udt")
.withField("x", bridge.aInt())
.withField("y", type)
.withField("z", bridge.aInt())
.build().frozen(),
10)))
.run(bridge.getVersion())
);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testVectorTuple(CassandraBridge bridge)
{
// pk -> a vector<frozen<tuple<type, float, text>>, 7>
// Test tuple nested within vector
assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber());
qt().withExamples(10)
.forAll(supportedVectorTypes(bridge))
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
.withColumn("a", bridge.vector(bridge.tuple(type,
bridge.aFloat(),
bridge.text()).frozen(), 7)))
.run(bridge.getVersion())
);
}

private static Gen<CqlField.NativeType> supportedVectorTypes(CassandraBridge bridge)
{
// TODO: Vector of list of durations fail, because we cannot replace DurationSerializer with
// AnalyticsDurationSerializer across all serializers used by VectorType.
List<CqlField.NativeType> supportedTypes = bridge.supportedTypes().stream()
.filter(t -> !t.equals(bridge.duration()))
.collect(Collectors.toList());
return arbitrary().pick(supportedTypes);
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
public void testList(CassandraBridge bridge)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void testPartialRowClusteringKeys(CassandraBridge bridge)
public void testQuotedKeyspaceName(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withPartitionKey("pk", bridge.uuid())
.withColumn("c1", bridge.varint())
.withColumn("c2", bridge.text())
Expand All @@ -184,7 +184,7 @@ public void testReservedWordKeyspaceName(CassandraBridge bridge)
public void testQuotedTableName(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("pk", bridge.uuid())
.withColumn("c1", bridge.varint())
Expand All @@ -198,7 +198,7 @@ public void testQuotedTableName(CassandraBridge bridge)
public void testReservedWordTableName(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("table")
.withPartitionKey("pk", bridge.uuid())
.withColumn("c1", bridge.varint())
Expand All @@ -212,7 +212,7 @@ public void testReservedWordTableName(CassandraBridge bridge)
public void testQuotedPartitionKey(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("Partition_Key_0", bridge.uuid())
.withColumn("c1", bridge.varint())
Expand All @@ -226,7 +226,7 @@ public void testQuotedPartitionKey(CassandraBridge bridge)
public void testMultipleQuotedPartitionKeys(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("Partition_Key_0", bridge.uuid())
.withPartitionKey("Partition_Key_1", bridge.bigint())
Expand All @@ -243,7 +243,7 @@ public void testMultipleQuotedPartitionKeys(CassandraBridge bridge)
public void testQuotedPartitionClusteringKeys(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("a", bridge.uuid())
.withClusteringKey("Clustering_Key_0", bridge.bigint())
Expand All @@ -258,7 +258,7 @@ public void testQuotedPartitionClusteringKeys(CassandraBridge bridge)
public void testQuotedColumnNames(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("Partition_Key_0", bridge.uuid())
.withColumn("Column_1", bridge.varint())
Expand All @@ -272,7 +272,7 @@ public void testQuotedColumnNames(CassandraBridge bridge)
public void testQuotedColumnNamesWithColumnFilter(CassandraBridge bridge)
{
Tester.builder(keyspace1 -> TestSchema.builder(bridge)
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withKeyspace("Quoted_Keyspace_" + UUID.randomUUID().toString().replaceAll("-", ""))
.withTable("Quoted_Table_" + UUID.randomUUID().toString().replaceAll("-", "_"))
.withPartitionKey("Partition_Key_0", bridge.uuid())
.withColumn("Column_1", bridge.varint())
Expand Down
4 changes: 2 additions & 2 deletions cassandra-analytics-integration-framework/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if (propertyWithDefault("artifactType", null) == "spark")
apply from: "$rootDir/gradle/common/publishing.gradle"
}

ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.5.jar" // latest supported Cassandra build is 5.0
ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.7.jar" // latest supported Cassandra build is 5.0
def dtestJarFullPath = "${dependencyLocation}${ext.dtestJar}"

test {
Expand All @@ -50,7 +50,7 @@ dependencies {
// classpath while running integration tests. Instead, a dedicated classloader will load the
// dtest jar while provisioning the in-jvm dtest Cassandra cluster
compileOnly(files("${dtestJarFullPath}"))
api("org.apache.cassandra:dtest-api:0.0.16")
api("org.apache.cassandra:dtest-api:0.0.18")
// Needed by the Cassandra dtest framework
// JUnit
api("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
Expand Down
Loading
Loading