Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Pass SidecarCdcClient as a constructor parameter to avoid thread/resource leaks (CASSANALYTICS-142)
* Support extended deletion time in CDC for Cassandra 5.0
* Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126)
* Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.cassandra.cdc.sidecar;

import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
Expand All @@ -30,7 +29,6 @@
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.api.TokenRangeSupplier;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.utils.FutureUtils;
Expand All @@ -50,17 +48,36 @@ protected SidecarCdc(@NotNull SidecarCdcBuilder builder)
initSchema();
}

/**
* Creates a new {@link SidecarCdcBuilder} pre-configured with the supplied parameters.
*
* <p><b>Lifecycle of {@code sidecarCdcClient}:</b> the supplied {@link SidecarCdcClient} is treated as
* an externally managed singleton. Neither the returned builder nor the {@link SidecarCdc} instance it
* produces will close the client. The caller is solely responsible for closing the
* {@code SidecarCdcClient} (e.g. during application shutdown) to release underlying resources such as
* thread pools and HTTP connections.
*
* @param jobId unique identifier for the CDC job
* @param partitionId partition index within the job
* @param cdcOptions CDC processing options
* @param clusterConfigProvider provider for cluster configuration (e.g. datacenter, hosts)
* @param eventConsumer consumer that receives CDC change events
* @param schemaSupplier supplier for CDC-enabled table schemas
* @param tokenRangeSupplier supplier for the token ranges assigned to this partition
* @param sidecarCdcClient externally managed Sidecar HTTP client; <em>not</em> closed by
* {@code SidecarCdc} or {@code SidecarCdcBuilder}
* @param cdcStats CDC statistics collector
* @return a new {@link SidecarCdcBuilder}
*/
public static SidecarCdcBuilder builder(@NotNull String jobId,
int partitionId,
CdcOptions cdcOptions,
ClusterConfigProvider clusterConfigProvider,
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
CdcSidecarInstancesProvider sidecarInstancesProvider,
SidecarCdcClient.ClientConfig clientConfig,
SecretsProvider secretsProvider,
ICdcStats cdcStats) throws IOException
SidecarCdcClient sidecarCdcClient,
ICdcStats cdcStats)
{
return new SidecarCdcBuilder(jobId,
partitionId,
Expand All @@ -69,9 +86,7 @@ public static SidecarCdcBuilder builder(@NotNull String jobId,
eventConsumer,
schemaSupplier,
tokenRangeSupplier,
sidecarInstancesProvider,
clientConfig,
secretsProvider,
sidecarCdcClient,
cdcStats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,14 @@

package org.apache.cassandra.cdc.sidecar;

import java.io.IOException;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;

import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.cdc.CdcBuilder;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.api.TokenRangeSupplier;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.secrets.SecretsProvider;
import o.a.c.sidecar.client.shaded.client.SidecarClient;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.jetbrains.annotations.NotNull;

Expand All @@ -56,42 +48,12 @@ public class SidecarCdcBuilder extends CdcBuilder
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
CdcSidecarInstancesProvider sidecarInstancesProvider,
SidecarCdcClient.ClientConfig clientConfig,
SecretsProvider secretsProvider,
ICdcStats cdcStats) throws IOException
{
this(
jobId,
partitionId,
cdcOptions,
clusterConfigProvider,
eventConsumer,
schemaSupplier,
tokenRangeSupplier,
clientConfig,
Sidecar.from(new SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream()
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
.collect(Collectors.toList())),
clientConfig.toGenericSidecarConfig(), secretsProvider),
cdcStats
);
}

SidecarCdcBuilder(@NotNull String jobId,
int partitionId,
CdcOptions cdcOptions,
ClusterConfigProvider clusterConfigProvider,
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
SidecarCdcClient.ClientConfig clientConfig,
SidecarClient sidecarClient,
SidecarCdcClient sidecarCdcClient,
ICdcStats cdcStats)
{
super(jobId, partitionId, eventConsumer, schemaSupplier);
this.clusterConfigProvider = clusterConfigProvider;
this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats);
this.sidecarCdcClient = sidecarCdcClient;
withCdcOptions(cdcOptions);
withTokenRangeSupplier(tokenRangeSupplier);
}
Expand All @@ -108,14 +70,6 @@ public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor)
return this;
}

public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig clientConfig,
SidecarClient sidecarClient,
ICdcStats cdcStats)
{
this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats);
return this;
}

public SidecarCdcBuilder withReplicationFactorSupplier(ReplicationFactorSupplier replicationFactorSupplier)
{
this.replicationFactorSupplier = replicationFactorSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.cassandra.cdc.sidecar;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -32,8 +33,11 @@
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.secrets.SecretsProvider;
import o.a.c.sidecar.client.shaded.client.SidecarClient;
import o.a.c.sidecar.client.shaded.client.SidecarInstance;
import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
import o.a.c.sidecar.client.shaded.client.StreamBuffer;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
Expand All @@ -52,21 +56,52 @@
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
import static org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;

public class SidecarCdcClient
public class SidecarCdcClient implements AutoCloseable
{
final ClientConfig config;
final SidecarClient sidecarClient;
final ICdcStats stats;

public SidecarCdcClient(ClientConfig config,
SidecarClient sidecarClient,
ICdcStats stats)
public SidecarCdcClient(ClientConfig clientConfig,
CdcSidecarInstancesProvider instancesProvider,
SecretsProvider secretsProvider,
ICdcStats cdcStats) throws IOException
{
this(clientConfig,
Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances()
.stream()
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
.collect(Collectors.toList())),
clientConfig.toGenericSidecarConfig(),
secretsProvider),
cdcStats);
}

private SidecarCdcClient(ClientConfig config,
SidecarClient sidecarClient,
ICdcStats stats)
{
this.config = config;
this.sidecarClient = sidecarClient;
this.stats = stats;
}

/**
* Closes the underlying {@link SidecarClient} and releases associated resources (e.g. thread pools,
* HTTP connections).
*
* <p>{@code SidecarCdcClient} is intended to be used as a singleton whose lifecycle is managed by the
* enclosing component. Callers should not create per-request instances; instead, a single instance
* should be constructed at startup and closed during shutdown to avoid thread and resource leaks.
*
* @throws Exception if the underlying client throws while closing
*/
@Override
public void close() throws Exception
{
sidecarClient.close();
}

public CompletableFuture<List<CommitLog>> listCdcCommitLogSegments(CassandraInstance instance)
{
return sidecarClient.listCdcSegments(toSidecarInstance(instance))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.junit.jupiter.api.Test;

import o.a.c.sidecar.client.shaded.client.SidecarClient;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
Expand All @@ -46,8 +45,7 @@ public void testBuilderMethodCreatesValidBuilder()
EventConsumer eventConsumer = mock(EventConsumer.class);
SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create();
SidecarClient mockSidecarClient = mock(SidecarClient.class);
SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class);
ICdcStats cdcStats = mock(ICdcStats.class);

SidecarCdcBuilder builder = new SidecarCdcBuilder(
Expand All @@ -58,22 +56,13 @@ public void testBuilderMethodCreatesValidBuilder()
eventConsumer,
schemaSupplier,
tokenRangeSupplier,
clientConfig,
mockSidecarClient,
mockSidecarCdcClient,
cdcStats
);

// Verify the builder is properly created and configured
assertThat(builder).isNotNull();
assertThat(builder).isInstanceOf(SidecarCdcBuilder.class);

// Verify the builder has the cluster config provider set
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);

// Verify the builder has a sidecar CDC client configured
assertThat(builder.sidecarCdcClient).isNotNull();
assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient);
assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig);
assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats);
assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient);
}
}
Loading