diff --git a/CHANGES.txt b/CHANGES.txt index 35afa6cd2..ce89887db 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Pass SidecarCdcClient as a constructor parameter to avoid thread/resource leaks (CASSANALYTICS-142) * Support extended deletion time in CDC for Cassandra 5.0 * Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126) * Fix ReadStatusTracker to distinguish clean completion from error termination in BufferingCommitLogReader (CASSANALYTICS-129) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java index e5ac33b6c..f7e0513cc 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cdc.sidecar; -import java.io.IOException; import java.util.Comparator; import java.util.Optional; import java.util.Set; @@ -30,7 +29,6 @@ import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.secrets.SecretsProvider; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.utils.FutureUtils; @@ -50,6 +48,27 @@ protected SidecarCdc(@NotNull SidecarCdcBuilder builder) initSchema(); } + /** + * Creates a new {@link SidecarCdcBuilder} pre-configured with the supplied parameters. + * + *

Lifecycle of {@code sidecarCdcClient}: the supplied {@link SidecarCdcClient} is treated as + * an externally managed singleton. Neither the returned builder nor the {@link SidecarCdc} instance it + * produces will close the client. The caller is solely responsible for closing the + * {@code SidecarCdcClient} (e.g. during application shutdown) to release underlying resources such as + * thread pools and HTTP connections. + * + * @param jobId unique identifier for the CDC job + * @param partitionId partition index within the job + * @param cdcOptions CDC processing options + * @param clusterConfigProvider provider for cluster configuration (e.g. datacenter, hosts) + * @param eventConsumer consumer that receives CDC change events + * @param schemaSupplier supplier for CDC-enabled table schemas + * @param tokenRangeSupplier supplier for the token ranges assigned to this partition + * @param sidecarCdcClient externally managed Sidecar HTTP client; not closed by + * {@code SidecarCdc} or {@code SidecarCdcBuilder} + * @param cdcStats CDC statistics collector + * @return a new {@link SidecarCdcBuilder} + */ public static SidecarCdcBuilder builder(@NotNull String jobId, int partitionId, CdcOptions cdcOptions, @@ -57,10 +76,8 @@ public static SidecarCdcBuilder builder(@NotNull String jobId, EventConsumer eventConsumer, SchemaSupplier schemaSupplier, TokenRangeSupplier tokenRangeSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - SecretsProvider secretsProvider, - ICdcStats cdcStats) throws IOException + SidecarCdcClient sidecarCdcClient, + ICdcStats cdcStats) { return new SidecarCdcBuilder(jobId, partitionId, @@ -69,9 +86,7 @@ public static SidecarCdcBuilder builder(@NotNull String jobId, eventConsumer, schemaSupplier, tokenRangeSupplier, - sidecarInstancesProvider, - clientConfig, - secretsProvider, + sidecarCdcClient, cdcStats); } diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index ecc5fa933..4027db2db 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,22 +19,14 @@ package org.apache.cassandra.cdc.sidecar; -import java.io.IOException; -import java.util.stream.Collectors; - import com.google.common.base.Preconditions; -import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl; -import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider; import org.apache.cassandra.cdc.CdcBuilder; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.clients.Sidecar; -import org.apache.cassandra.secrets.SecretsProvider; -import o.a.c.sidecar.client.shaded.client.SidecarClient; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.jetbrains.annotations.NotNull; @@ -56,42 +48,12 @@ public class SidecarCdcBuilder extends CdcBuilder EventConsumer eventConsumer, SchemaSupplier schemaSupplier, TokenRangeSupplier tokenRangeSupplier, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - SecretsProvider secretsProvider, - ICdcStats cdcStats) throws IOException - { - this( - jobId, - partitionId, - cdcOptions, - clusterConfigProvider, - eventConsumer, - schemaSupplier, - tokenRangeSupplier, - clientConfig, - Sidecar.from(new SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream() - .map(i -> new SidecarInstanceImpl(i.hostname(), i.port())) - .collect(Collectors.toList())), - clientConfig.toGenericSidecarConfig(), secretsProvider), - cdcStats - ); - } - - SidecarCdcBuilder(@NotNull String jobId, - int partitionId, - CdcOptions cdcOptions, - ClusterConfigProvider clusterConfigProvider, - EventConsumer eventConsumer, - SchemaSupplier schemaSupplier, - TokenRangeSupplier tokenRangeSupplier, - SidecarCdcClient.ClientConfig clientConfig, - SidecarClient sidecarClient, + SidecarCdcClient sidecarCdcClient, ICdcStats cdcStats) { super(jobId, partitionId, eventConsumer, schemaSupplier); this.clusterConfigProvider = clusterConfigProvider; - this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats); + this.sidecarCdcClient = sidecarCdcClient; withCdcOptions(cdcOptions); withTokenRangeSupplier(tokenRangeSupplier); } @@ -108,14 +70,6 @@ public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor) return this; } - public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig clientConfig, - SidecarClient sidecarClient, - ICdcStats cdcStats) - { - this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats); - return this; - } - public SidecarCdcBuilder withReplicationFactorSupplier(ReplicationFactorSupplier replicationFactorSupplier) { this.replicationFactorSupplier = replicationFactorSupplier; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index 2dc988038..d8aa09efa 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cdc.sidecar; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -32,8 +33,11 @@ import org.apache.cassandra.cdc.api.CommitLog; import org.apache.cassandra.cdc.stats.ICdcStats; import org.apache.cassandra.clients.Sidecar; +import org.apache.cassandra.secrets.SecretsProvider; import o.a.c.sidecar.client.shaded.client.SidecarClient; import o.a.c.sidecar.client.shaded.client.SidecarInstance; +import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl; +import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider; import o.a.c.sidecar.client.shaded.client.StreamBuffer; import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; @@ -52,21 +56,52 @@ import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS; -public class SidecarCdcClient +public class SidecarCdcClient implements AutoCloseable { final ClientConfig config; final SidecarClient sidecarClient; final ICdcStats stats; - public SidecarCdcClient(ClientConfig config, - SidecarClient sidecarClient, - ICdcStats stats) + public SidecarCdcClient(ClientConfig clientConfig, + CdcSidecarInstancesProvider instancesProvider, + SecretsProvider secretsProvider, + ICdcStats cdcStats) throws IOException + { + this(clientConfig, + Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances() + .stream() + .map(i -> new SidecarInstanceImpl(i.hostname(), i.port())) + .collect(Collectors.toList())), + clientConfig.toGenericSidecarConfig(), + secretsProvider), + cdcStats); + } + + private SidecarCdcClient(ClientConfig config, + SidecarClient sidecarClient, + ICdcStats stats) { this.config = config; this.sidecarClient = sidecarClient; this.stats = stats; } + /** + * Closes the underlying {@link SidecarClient} and releases associated resources (e.g. thread pools, + * HTTP connections). + * + *

{@code SidecarCdcClient} is intended to be used as a singleton whose lifecycle is managed by the + * enclosing component. Callers should not create per-request instances; instead, a single instance + * should be constructed at startup and closed during shutdown to avoid thread and resource leaks. + * + * @throws Exception if the underlying client throws while closing + */ + @Override + public void close() throws Exception + { + sidecarClient.close(); + } + public CompletableFuture> listCdcCommitLogSegments(CassandraInstance instance) { return sidecarClient.listCdcSegments(toSidecarInstance(instance)) diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index 3ad306d8e..b89134922 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test; -import o.a.c.sidecar.client.shaded.client.SidecarClient; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; @@ -46,8 +45,7 @@ public void testBuilderMethodCreatesValidBuilder() EventConsumer eventConsumer = mock(EventConsumer.class); SchemaSupplier schemaSupplier = mock(SchemaSupplier.class); TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class); - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(); - SidecarClient mockSidecarClient = mock(SidecarClient.class); + SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class); ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcBuilder builder = new SidecarCdcBuilder( @@ -58,22 +56,13 @@ public void testBuilderMethodCreatesValidBuilder() eventConsumer, schemaSupplier, tokenRangeSupplier, - clientConfig, - mockSidecarClient, + mockSidecarCdcClient, cdcStats ); - // Verify the builder is properly created and configured assertThat(builder).isNotNull(); assertThat(builder).isInstanceOf(SidecarCdcBuilder.class); - - // Verify the builder has the cluster config provider set assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); - - // Verify the builder has a sidecar CDC client configured - assertThat(builder.sidecarCdcClient).isNotNull(); - assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient); - assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig); - assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats); + assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); } }