Skip to content

CASSANALYTICS-24 and CASSANALYTICS-25 SSTable version based bridge determination#169

Open
skoppu22 wants to merge 13 commits intoapache:trunkfrom
skoppu22:bridged
Open

CASSANALYTICS-24 and CASSANALYTICS-25 SSTable version based bridge determination#169
skoppu22 wants to merge 13 commits intoapache:trunkfrom
skoppu22:bridged

Conversation

@skoppu22
Copy link
Copy Markdown
Contributor

Jira tasks: CASSANALYTICS-24 and CASSANALYTICS-25

This PR adds sstable version based bridge determination feature to C* analytics. This feature allows C* analytics to choose bridge (which C* version jar to load) based on sstable versions existing on the cluster, instead of strictly depending on running C* versions. This is needed especially to support C* 5.0, which supports multiple compatibility modes.

CI run: https://app.circleci.com/pipelines/github/skoppu22/cassandra-analytics/37/workflows/da025a29-00c0-41f2-95e8-63c2b7f1f276

Copy link
Copy Markdown
Contributor

@sarankk sarankk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Shailaja, took a first pass. Reviewed half the files.

this.name = name;
this.jarBaseName = jarBaseName;
this.sstableFormats = new HashSet<>(Arrays.asList(sstableFormats));
this.nativeSStableVersions = Collections.unmodifiableList(Arrays.asList(nativeSStableVersions));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit

Suggested change
this.nativeSStableVersions = Collections.unmodifiableList(Arrays.asList(nativeSStableVersions));
this.nativeSStableVersions = List.of(nativeSStableVersions);

THREEZERO(30, "3.0", "three-zero", new String[]{"big"},
new String[]{
// Cassandra 3.x native sstable versions
// order is important, used to determine the latest version
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array based ordering will be hard to debug in case of issues and hard to maintain as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading SSTableVersionAnalyzer we could do away without the internal ordering between the sstable versions. Since we are still interested in the associated cassandra version. We could treat the sstable versions within the Cassandra version on same level. Wdyt?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, regardless of which sstable version found in a given Cassandra version, we choose that cassandra version as long as there is no sstable version from higher cassandra version. So the order of sstable versions within a cassandra version doesn't matter. Updated the sorting logic not to depend on array index.


/**
* Analyzes SSTable versions on a cluster to determine the appropriate
* Cassandra bridge to load for bulk write operations.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using version analyzer for both read and write

Suggested change
* Cassandra bridge to load for bulk write operations.
* Cassandra bridge to load for bulk read/write operations.

if (!v2Opt.isPresent())
{
throw new IllegalArgumentException(
String.format("Unknown SSTable version: %s. Cannot determine Cassandra version.", v2));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of throwing even if one format is incorrect, we could log these as errors and throw in the end if no max version is found.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should error and abort job if encountered unrecognized sstable versions. We can log error and ask the user to run the job using fallback mechanism if the user thinks it is safe to run the job. I added error to log this.


// Calculate previous major version: (majorVersion - 1) * 10
// E.g., 5 -> 40, 4 -> 30, 3 -> 20
return (majorVersion - 1) * 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: How about we keep an ordered list of CassandraVersion enums and return the previous one, instead of computing the version.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C* 5.0 supports can read C* 4.0, 4.1, 4.2 ...
C* 4.x can read C* 3.0, 3.1, ..
We cannot add all minor versions and won't be able to keep up with C* release cycle if we do so. Also this PR goal is to make C* analytics independent of C* version numbers. Hence dynamically calculating this makes analytics independent of C* releases.

void testGetSSTableVersionIndexValidVersion()
{
int index = CassandraVersion.FOURZERO.getSSTableVersionIndex("big-na");
assertThat(index).isEqualTo(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid the index checks if we treat the SSTable versions within a Cassandra version on the same level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this test

public class SSTableVersionAnalyzerTest
{
@Test
void testDetermineBridgeVersionForWriteFallbackDisabledSingleVersion()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we make these tests parameterized, passing in expected result as well?

if (sstableVersionsOnCluster == null || sstableVersionsOnCluster.isEmpty())
{
throw new IllegalStateException(String.format(
"Unable to retrieve SSTable versions from cluster. " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not support fallback here and return Cassandra version based bridge?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are asking user to run the job using fallback mechanism if they thinks that's appropriate. We do not use fallback mechanism when spark.cassandra_analytics.bridge.disable_sstable_version_based is false. There must be a reason why sstable version based bridge selection is failing. User can evaluate the cluster state and try with fallback mechanism only if they think that's fine for their use case or cluster situation.

Set<SidecarInstance> instances)
{
return instances.stream()
.map(instance -> client
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alignment

* @param instances all Sidecar instances
* @return completable futures with GossipInfoResponse
*/
public static List<CompletableFuture<GossipInfoResponse>> gossipInfoFromAllNodes(SidecarClient client,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for addressing in this PR, but would be good to have a cluster wide gossip call available in Sidecar.

Comment on lines +63 to +75
return CassandraClusterInfo.getLowestCassandraVersion(conf, null);
}

@Override
protected Set<String> getSSTableVersionsOnCluster(@NotNull BulkSparkConf conf)
{
return CassandraClusterInfo.getSSTableVersionsOnCluster(conf, null);
}

@Override
protected ClusterInfo buildClusterInfo(CassandraVersion bridgeVersion)
{
return new CassandraClusterInfo(bulkSparkConf(), bridgeVersion);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to buildCassandraContext thrice among these 3 method, each cassandra context initializes its own sidecar client. We should avoid this and combine them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into the possibility, seems difficult as we need to build context once and store somewhere to reuse it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this. There is a chicken and egg problem here. Context is needed to determine the bridge, and context contains determined bridge version. Hence creating preliminary context initially (with null bridge version) which is used for bridge determination, then we create final context with the bridge value. Also made changes to reuse nodesettings.

for (String clusterId : coordinatedWriteConf.clusters().keySet())
{
Set<String> sstableVersions = CassandraClusterInfo.getSSTableVersionsOnCluster(conf, clusterId);
aggregatedSSTableVersions.addAll(sstableVersions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the check we do comparing lowest and highest Cassandra version in getLowestCassandraVersion is missing when we get bridge based on SSTable versions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do that above in getLowestCassandraVersion function, line 393 sorts versions, line 396 picks the lowest and line 397 picks the highest.
Whereas this function is getSSTableVersionsOnCluster, to get all sstable versions on the cluster. Then bridge determination logic sorts them, picks highest version and determines bridge version accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants