From 09b8e28ecd83c9f1bafce6f022d0e47e80e3d953 Mon Sep 17 00:00:00 2001 From: yafeng Date: Thu, 26 Mar 2026 11:11:59 -0700 Subject: [PATCH 1/6] CASSANALYTICS-130 Support per-instance sidecar port resolution in CDC client --- CHANGES.txt | 1 + .../cdc/sidecar/SidecarCdcBuilder.java | 25 +++++ .../cdc/sidecar/SidecarCdcClient.java | 15 ++- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 106 ++++++++++++++++++ 4 files changed, 146 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index ce89887d..0d24aa57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * Support per-instance sidecar port resolution in CDC client (CASSANALYTICS-130) * Pass SidecarCdcClient as a constructor parameter to avoid thread/resource leaks (CASSANALYTICS-142) * Support extended deletion time in CDC for Cassandra 5.0 * Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index 4027db2d..8ce87916 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,6 +19,8 @@ package org.apache.cassandra.cdc.sidecar; +import java.util.function.Function; + import com.google.common.base.Preconditions; import org.apache.cassandra.cdc.CdcBuilder; @@ -35,6 +37,8 @@ public class SidecarCdcBuilder extends CdcBuilder { protected ClusterConfigProvider clusterConfigProvider; protected SidecarCdcClient sidecarCdcClient; + @NotNull + protected Function portResolver; protected SidecarDownMonitor downMonitor = SidecarDownMonitor.STUB; protected ReplicationFactorSupplier replicationFactorSupplier = ReplicationFactorSupplier.DEFAULT; protected SidecarCdcStats sidecarCdcStats = SidecarCdcStats.STUB; @@ -49,15 +53,36 @@ public class SidecarCdcBuilder extends CdcBuilder SchemaSupplier schemaSupplier, TokenRangeSupplier tokenRangeSupplier, SidecarCdcClient sidecarCdcClient, + @NotNull Function portResolver, ICdcStats cdcStats) { super(jobId, partitionId, eventConsumer, schemaSupplier); this.clusterConfigProvider = clusterConfigProvider; this.sidecarCdcClient = sidecarCdcClient; + this.portResolver = portResolver; withCdcOptions(cdcOptions); withTokenRangeSupplier(tokenRangeSupplier); } + /** + * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver looks up + * the port for a given hostname from the provider, falling back to the configured effective port. + */ + static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, + @NotNull SidecarCdcClient.ClientConfig clientConfig) + { + return hostname -> { + for (CdcSidecarInstance instance : provider.instances()) + { + if (hostname.equals(instance.hostname())) + { + return instance.port(); + } + } + return clientConfig.effectivePort(); + }; + } + public SidecarCdcBuilder withClusterConfigProvider(ClusterConfigProvider clusterConfigProvider) { this.clusterConfigProvider = clusterConfigProvider; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index d8aa09ef..17b97893 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import o.a.c.sidecar.client.shaded.common.utils.HttpRange; @@ -45,6 +46,7 @@ import org.apache.cassandra.spark.utils.MapUtils; import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.streaming.StreamConsumer; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE; @@ -61,6 +63,8 @@ public class SidecarCdcClient implements AutoCloseable final ClientConfig config; final SidecarClient sidecarClient; final ICdcStats stats; + @NotNull + final Function portResolver; public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, @@ -80,10 +84,19 @@ public SidecarCdcClient(ClientConfig clientConfig, private SidecarCdcClient(ClientConfig config, SidecarClient sidecarClient, ICdcStats stats) + { + this(config, sidecarClient, stats, hostname -> config.effectivePort()); + } + + public SidecarCdcClient(ClientConfig config, + SidecarClient sidecarClient, + ICdcStats stats, + @NotNull Function portResolver) { this.config = config; this.sidecarClient = sidecarClient; this.stats = stats; + this.portResolver = portResolver; } /** @@ -190,7 +203,7 @@ protected SidecarInstance toSidecarInstance(CassandraInstance instance) @Override public int port() { - return config.effectivePort(); + return portResolver.apply(instance.nodeName()); } @Override diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index b8913492..1b6871cf 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -19,6 +19,12 @@ package org.apache.cassandra.cdc.sidecar; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + import org.junit.jupiter.api.Test; import org.apache.cassandra.cdc.api.CdcOptions; @@ -26,6 +32,7 @@ import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -65,4 +72,103 @@ public void testBuilderMethodCreatesValidBuilder() assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); } + + @Test + public void testPerInstancePortResolution() + { + Map portMapping = new HashMap<>(); + portMapping.put("host1", 9043); + portMapping.put("host2", 9044); + portMapping.put("host3", 9045); + Function portResolver = hostname -> portMapping.getOrDefault(hostname, 9043); + + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(); + SidecarClient mockSidecarClient = mock(SidecarClient.class); + ICdcStats cdcStats = mock(ICdcStats.class); + + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, portResolver); + + SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); + assertThat(si1.port()).isEqualTo(9043); + assertThat(si1.hostname()).isEqualTo("host1"); + + SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")); + assertThat(si2.port()).isEqualTo(9044); + assertThat(si2.hostname()).isEqualTo("host2"); + + SidecarInstance si3 = client.toSidecarInstance(new CassandraInstance("200", "host3", "DC1")); + assertThat(si3.port()).isEqualTo(9045); + assertThat(si3.hostname()).isEqualTo("host3"); + } + + @Test + public void testFallbackToEffectivePortWhenHostNotFound() + { + Map portMapping = new HashMap<>(); + portMapping.put("host1", 9043); + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(8888, 3, 100L); + Function portResolver = hostname -> portMapping.getOrDefault(hostname, clientConfig.effectivePort()); + + SidecarClient mockSidecarClient = mock(SidecarClient.class); + ICdcStats cdcStats = mock(ICdcStats.class); + + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, portResolver); + + SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); + assertThat(si1.port()).isEqualTo(9043); + + SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")); + assertThat(si2.port()).isEqualTo(8888); + } + + @Test + public void testDefaultPortResolverUsesEffectivePort() + { + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(7777, 3, 100L); + SidecarClient mockSidecarClient = mock(SidecarClient.class); + ICdcStats cdcStats = mock(ICdcStats.class); + + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats); + + SidecarInstance si = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); + assertThat(si.port()).isEqualTo(7777); + } + + @Test + public void testBuildPortResolverFromProvider() + { + List instances = Arrays.asList( + cdcSidecarInstance("host1", 9043), + cdcSidecarInstance("host2", 9044), + cdcSidecarInstance("host3", 9045) + ); + CdcSidecarInstancesProvider provider = () -> instances; + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(5555, 3, 100L); + + Function resolver = SidecarCdcBuilder.buildPortResolver(provider, clientConfig); + + assertThat(resolver.apply("host1")).isEqualTo(9043); + assertThat(resolver.apply("host2")).isEqualTo(9044); + assertThat(resolver.apply("host3")).isEqualTo(9045); + // Unknown host falls back to clientConfig.effectivePort() + assertThat(resolver.apply("unknown-host")).isEqualTo(5555); + } + + private static CdcSidecarInstance cdcSidecarInstance(String hostname, int port) + { + return new CdcSidecarInstance() + { + @Override + public int port() + { + return port; + } + + @Override + public String hostname() + { + return hostname; + } + }; + } } From 1419fdb0959ec7640af0b6c5c6578907a3b1ff58 Mon Sep 17 00:00:00 2001 From: yafeng Date: Wed, 1 Apr 2026 15:42:47 -0700 Subject: [PATCH 2/6] refactor based on comments --- .../cdc/sidecar/SidecarCdcBuilder.java | 83 ++++++++++--- .../cdc/sidecar/SidecarCdcClient.java | 8 +- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 116 ++++++++---------- 3 files changed, 125 insertions(+), 82 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index 8ce87916..d32b96b2 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,6 +19,8 @@ package org.apache.cassandra.cdc.sidecar; +import java.io.IOException; +import java.util.Map; import java.util.function.Function; import com.google.common.base.Preconditions; @@ -29,16 +31,22 @@ import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; +import org.apache.cassandra.clients.Sidecar; +import org.apache.cassandra.secrets.SecretsProvider; +import o.a.c.sidecar.client.shaded.client.SidecarClient; +import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; @SuppressWarnings("unused") public class SidecarCdcBuilder extends CdcBuilder { protected ClusterConfigProvider clusterConfigProvider; protected SidecarCdcClient sidecarCdcClient; + protected SidecarCdcClient.ClientConfig clientConfig; @NotNull - protected Function portResolver; + protected Function portResolver; protected SidecarDownMonitor downMonitor = SidecarDownMonitor.STUB; protected ReplicationFactorSupplier replicationFactorSupplier = ReplicationFactorSupplier.DEFAULT; protected SidecarCdcStats sidecarCdcStats = SidecarCdcStats.STUB; @@ -54,33 +62,63 @@ public class SidecarCdcBuilder extends CdcBuilder TokenRangeSupplier tokenRangeSupplier, SidecarCdcClient sidecarCdcClient, @NotNull Function portResolver, + CdcSidecarInstancesProvider sidecarInstancesProvider, + SidecarCdcClient.ClientConfig clientConfig, + SecretsProvider secretsProvider, + ICdcStats cdcStats) throws IOException + { + this( + jobId, + partitionId, + cdcOptions, + clusterConfigProvider, + eventConsumer, + schemaSupplier, + tokenRangeSupplier, + buildPortResolver(sidecarInstancesProvider, clientConfig), + clientConfig, + Sidecar.from(new SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream() + .map(i -> new SidecarInstanceImpl(i.hostname(), i.port())) + .collect(Collectors.toList())), + clientConfig.toGenericSidecarConfig(), secretsProvider), + cdcStats + ); + } + + SidecarCdcBuilder(@NotNull String jobId, + int partitionId, + CdcOptions cdcOptions, + ClusterConfigProvider clusterConfigProvider, + EventConsumer eventConsumer, + SchemaSupplier schemaSupplier, + TokenRangeSupplier tokenRangeSupplier, + @Nullable Function portResolver, + SidecarCdcClient.ClientConfig clientConfig, + SidecarClient sidecarClient, ICdcStats cdcStats) { super(jobId, partitionId, eventConsumer, schemaSupplier); this.clusterConfigProvider = clusterConfigProvider; this.sidecarCdcClient = sidecarCdcClient; this.portResolver = portResolver; + this.clientConfig = clientConfig; + this.portResolver = portResolver != null ? portResolver : ignored -> clientConfig.effectivePort(); + this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats, this.portResolver); withCdcOptions(cdcOptions); withTokenRangeSupplier(tokenRangeSupplier); } /** - * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver looks up - * the port for a given hostname from the provider, falling back to the configured effective port. + * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver performs + * an O(1) map lookup keyed by hostname, falling back to the configured effective port. */ - static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, - @NotNull SidecarCdcClient.ClientConfig clientConfig) + static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, + @NotNull SidecarCdcClient.ClientConfig clientConfig) { - return hostname -> { - for (CdcSidecarInstance instance : provider.instances()) - { - if (hostname.equals(instance.hostname())) - { - return instance.port(); - } - } - return clientConfig.effectivePort(); - }; + Map portMap = provider.instances().stream() + .collect(Collectors.toMap(CdcSidecarInstance::hostname, + CdcSidecarInstance::port)); + return instance -> portMap.getOrDefault(instance.nodeName(), clientConfig.effectivePort()); } public SidecarCdcBuilder withClusterConfigProvider(ClusterConfigProvider clusterConfigProvider) @@ -95,6 +133,21 @@ public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor) return this; } + public SidecarCdcBuilder withPortResolver(@NotNull Function portResolver) + { + this.portResolver = portResolver; + return this; + } + + public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig clientConfig, + SidecarClient sidecarClient, + ICdcStats cdcStats) + { + this.clientConfig = clientConfig; + this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats, portResolver); + return this; + } + public SidecarCdcBuilder withReplicationFactorSupplier(ReplicationFactorSupplier replicationFactorSupplier) { this.replicationFactorSupplier = replicationFactorSupplier; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index 17b97893..c6c96a8d 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -64,7 +64,7 @@ public class SidecarCdcClient implements AutoCloseable final SidecarClient sidecarClient; final ICdcStats stats; @NotNull - final Function portResolver; + final Function portResolver; public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, @@ -85,13 +85,13 @@ private SidecarCdcClient(ClientConfig config, SidecarClient sidecarClient, ICdcStats stats) { - this(config, sidecarClient, stats, hostname -> config.effectivePort()); + this(config, sidecarClient, stats, ignored -> config.effectivePort()); } public SidecarCdcClient(ClientConfig config, SidecarClient sidecarClient, ICdcStats stats, - @NotNull Function portResolver) + @NotNull Function portResolver) { this.config = config; this.sidecarClient = sidecarClient; @@ -203,7 +203,7 @@ protected SidecarInstance toSidecarInstance(CassandraInstance instance) @Override public int port() { - return portResolver.apply(instance.nodeName()); + return portResolver.apply(instance); } @Override diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index 1b6871cf..63b5feb3 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -19,12 +19,10 @@ package org.apache.cassandra.cdc.sidecar; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.apache.cassandra.cdc.api.CdcOptions; @@ -42,8 +40,11 @@ */ public class SidecarCdcTest { - @Test - public void testBuilderMethodCreatesValidBuilder() + private SidecarClient mockSidecarClient; + private ICdcStats cdcStats; + + @BeforeEach + public void setup() { String jobId = "test-job-123"; int partitionId = 0; @@ -71,104 +72,93 @@ public void testBuilderMethodCreatesValidBuilder() assertThat(builder).isInstanceOf(SidecarCdcBuilder.class); assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); + mockSidecarClient = mock(SidecarClient.class); + cdcStats = mock(ICdcStats.class); } + @Test public void testPerInstancePortResolution() { - Map portMapping = new HashMap<>(); - portMapping.put("host1", 9043); - portMapping.put("host2", 9044); - portMapping.put("host3", 9045); - Function portResolver = hostname -> portMapping.getOrDefault(hostname, 9043); - + Map portMapping = Map.of("host1", 9043, "host2", 9044, "host3", 9045); SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(); - SidecarClient mockSidecarClient = mock(SidecarClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); - SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, portResolver); + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + instance -> portMapping.getOrDefault(instance.nodeName(), 9043)); SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); - assertThat(si1.port()).isEqualTo(9043); assertThat(si1.hostname()).isEqualTo("host1"); + assertThat(si1.port()).isEqualTo(9043); SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")); - assertThat(si2.port()).isEqualTo(9044); assertThat(si2.hostname()).isEqualTo("host2"); + assertThat(si2.port()).isEqualTo(9044); SidecarInstance si3 = client.toSidecarInstance(new CassandraInstance("200", "host3", "DC1")); - assertThat(si3.port()).isEqualTo(9045); assertThat(si3.hostname()).isEqualTo("host3"); + assertThat(si3.port()).isEqualTo(9045); } @Test public void testFallbackToEffectivePortWhenHostNotFound() { - Map portMapping = new HashMap<>(); - portMapping.put("host1", 9043); SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(8888, 3, 100L); - Function portResolver = hostname -> portMapping.getOrDefault(hostname, clientConfig.effectivePort()); + Map portMapping = Map.of("host1", 9043); - SidecarClient mockSidecarClient = mock(SidecarClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + instance -> portMapping.getOrDefault(instance.nodeName(), + clientConfig.effectivePort())); - SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, portResolver); - - SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); - assertThat(si1.port()).isEqualTo(9043); - - SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")); - assertThat(si2.port()).isEqualTo(8888); + assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(9043); + assertThat(client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")).port()).isEqualTo(8888); } @Test public void testDefaultPortResolverUsesEffectivePort() { SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(7777, 3, 100L); - SidecarClient mockSidecarClient = mock(SidecarClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats); - SidecarInstance si = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); - assertThat(si.port()).isEqualTo(7777); + assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(7777); } @Test - public void testBuildPortResolverFromProvider() + public void testWithPortResolverOverridesDefault() { - List instances = Arrays.asList( - cdcSidecarInstance("host1", 9043), - cdcSidecarInstance("host2", 9044), - cdcSidecarInstance("host3", 9045) + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(9042, 3, 100L); + + SidecarCdcBuilder builder = new SidecarCdcBuilder( + "test-job", + 0, + mock(CdcOptions.class), + mock(ClusterConfigProvider.class), + mock(EventConsumer.class), + mock(SchemaSupplier.class), + mock(TokenRangeSupplier.class), + null, // use default portResolver: ignored -> clientConfig.effectivePort() + clientConfig, + mockSidecarClient, + cdcStats ); - CdcSidecarInstancesProvider provider = () -> instances; - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(5555, 3, 100L); - Function resolver = SidecarCdcBuilder.buildPortResolver(provider, clientConfig); + // Before override: default resolver returns effectivePort for every host + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) + .isEqualTo(9042); - assertThat(resolver.apply("host1")).isEqualTo(9043); - assertThat(resolver.apply("host2")).isEqualTo(9044); - assertThat(resolver.apply("host3")).isEqualTo(9045); - // Unknown host falls back to clientConfig.effectivePort() - assertThat(resolver.apply("unknown-host")).isEqualTo(5555); - } + // Apply custom resolver then rebuild the client so it picks up the new resolver + Map portMapping = Map.of("host1", 9100, "host2", 9200); + builder.withPortResolver(instance -> portMapping.getOrDefault(instance.nodeName(), clientConfig.effectivePort())) + .withSidecarClient(clientConfig, mockSidecarClient, cdcStats); - private static CdcSidecarInstance cdcSidecarInstance(String hostname, int port) - { - return new CdcSidecarInstance() - { - @Override - public int port() - { - return port; - } - - @Override - public String hostname() - { - return hostname; - } - }; + // Known hosts now resolve to their mapped ports + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) + .isEqualTo(9100); + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")).port()) + .isEqualTo(9200); + + // Unknown host still falls back to effectivePort + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("200", "unknown-host", "DC1")).port()) + .isEqualTo(9042); } } From 347a59cbc8134f3c79f77f2dfc255515bc21a20b Mon Sep 17 00:00:00 2001 From: yafeng Date: Wed, 1 Apr 2026 16:49:16 -0700 Subject: [PATCH 3/6] rebase on the latest and add the changes --- .../cdc/sidecar/SidecarCdcBuilder.java | 78 ---------------- .../cdc/sidecar/SidecarCdcClient.java | 40 +++++--- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 93 ++++++++----------- 3 files changed, 68 insertions(+), 143 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index d32b96b2..4027db2d 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,10 +19,6 @@ package org.apache.cassandra.cdc.sidecar; -import java.io.IOException; -import java.util.Map; -import java.util.function.Function; - import com.google.common.base.Preconditions; import org.apache.cassandra.cdc.CdcBuilder; @@ -31,22 +27,14 @@ import org.apache.cassandra.cdc.api.SchemaSupplier; import org.apache.cassandra.cdc.api.TokenRangeSupplier; import org.apache.cassandra.cdc.stats.ICdcStats; -import org.apache.cassandra.clients.Sidecar; -import org.apache.cassandra.secrets.SecretsProvider; -import o.a.c.sidecar.client.shaded.client.SidecarClient; -import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; @SuppressWarnings("unused") public class SidecarCdcBuilder extends CdcBuilder { protected ClusterConfigProvider clusterConfigProvider; protected SidecarCdcClient sidecarCdcClient; - protected SidecarCdcClient.ClientConfig clientConfig; - @NotNull - protected Function portResolver; protected SidecarDownMonitor downMonitor = SidecarDownMonitor.STUB; protected ReplicationFactorSupplier replicationFactorSupplier = ReplicationFactorSupplier.DEFAULT; protected SidecarCdcStats sidecarCdcStats = SidecarCdcStats.STUB; @@ -61,66 +49,15 @@ public class SidecarCdcBuilder extends CdcBuilder SchemaSupplier schemaSupplier, TokenRangeSupplier tokenRangeSupplier, SidecarCdcClient sidecarCdcClient, - @NotNull Function portResolver, - CdcSidecarInstancesProvider sidecarInstancesProvider, - SidecarCdcClient.ClientConfig clientConfig, - SecretsProvider secretsProvider, - ICdcStats cdcStats) throws IOException - { - this( - jobId, - partitionId, - cdcOptions, - clusterConfigProvider, - eventConsumer, - schemaSupplier, - tokenRangeSupplier, - buildPortResolver(sidecarInstancesProvider, clientConfig), - clientConfig, - Sidecar.from(new SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream() - .map(i -> new SidecarInstanceImpl(i.hostname(), i.port())) - .collect(Collectors.toList())), - clientConfig.toGenericSidecarConfig(), secretsProvider), - cdcStats - ); - } - - SidecarCdcBuilder(@NotNull String jobId, - int partitionId, - CdcOptions cdcOptions, - ClusterConfigProvider clusterConfigProvider, - EventConsumer eventConsumer, - SchemaSupplier schemaSupplier, - TokenRangeSupplier tokenRangeSupplier, - @Nullable Function portResolver, - SidecarCdcClient.ClientConfig clientConfig, - SidecarClient sidecarClient, ICdcStats cdcStats) { super(jobId, partitionId, eventConsumer, schemaSupplier); this.clusterConfigProvider = clusterConfigProvider; this.sidecarCdcClient = sidecarCdcClient; - this.portResolver = portResolver; - this.clientConfig = clientConfig; - this.portResolver = portResolver != null ? portResolver : ignored -> clientConfig.effectivePort(); - this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats, this.portResolver); withCdcOptions(cdcOptions); withTokenRangeSupplier(tokenRangeSupplier); } - /** - * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver performs - * an O(1) map lookup keyed by hostname, falling back to the configured effective port. - */ - static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, - @NotNull SidecarCdcClient.ClientConfig clientConfig) - { - Map portMap = provider.instances().stream() - .collect(Collectors.toMap(CdcSidecarInstance::hostname, - CdcSidecarInstance::port)); - return instance -> portMap.getOrDefault(instance.nodeName(), clientConfig.effectivePort()); - } - public SidecarCdcBuilder withClusterConfigProvider(ClusterConfigProvider clusterConfigProvider) { this.clusterConfigProvider = clusterConfigProvider; @@ -133,21 +70,6 @@ public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor) return this; } - public SidecarCdcBuilder withPortResolver(@NotNull Function portResolver) - { - this.portResolver = portResolver; - return this; - } - - public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig clientConfig, - SidecarClient sidecarClient, - ICdcStats cdcStats) - { - this.clientConfig = clientConfig; - this.sidecarCdcClient = new SidecarCdcClient(clientConfig, sidecarClient, cdcStats, portResolver); - return this; - } - public SidecarCdcBuilder withReplicationFactorSupplier(ReplicationFactorSupplier replicationFactorSupplier) { this.replicationFactorSupplier = replicationFactorSupplier; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index c6c96a8d..18201ba0 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -70,6 +70,15 @@ public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, SecretsProvider secretsProvider, ICdcStats cdcStats) throws IOException + { + this(clientConfig, instancesProvider, secretsProvider, cdcStats, null); + } + + public SidecarCdcClient(ClientConfig clientConfig, + CdcSidecarInstancesProvider instancesProvider, + SecretsProvider secretsProvider, + ICdcStats cdcStats, + @Nullable Function portResolver) throws IOException { this(clientConfig, Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances() @@ -78,20 +87,14 @@ public SidecarCdcClient(ClientConfig clientConfig, .collect(Collectors.toList())), clientConfig.toGenericSidecarConfig(), secretsProvider), - cdcStats); - } - - private SidecarCdcClient(ClientConfig config, - SidecarClient sidecarClient, - ICdcStats stats) - { - this(config, sidecarClient, stats, ignored -> config.effectivePort()); + cdcStats, + portResolver != null ? portResolver : buildPortResolver(instancesProvider, clientConfig)); } - public SidecarCdcClient(ClientConfig config, - SidecarClient sidecarClient, - ICdcStats stats, - @NotNull Function portResolver) + SidecarCdcClient(ClientConfig config, + SidecarClient sidecarClient, + ICdcStats stats, + @NotNull Function portResolver) { this.config = config; this.sidecarClient = sidecarClient; @@ -99,6 +102,19 @@ public SidecarCdcClient(ClientConfig config, this.portResolver = portResolver; } + /** + * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver performs + * an O(1) map lookup keyed by hostname, falling back to the configured effective port. + */ + static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, + @NotNull ClientConfig clientConfig) + { + Map portMap = provider.instances().stream() + .collect(Collectors.toMap(CdcSidecarInstance::hostname, + CdcSidecarInstance::port)); + return instance -> portMap.getOrDefault(instance.nodeName(), clientConfig.effectivePort()); + } + /** * Closes the underlying {@link SidecarClient} and releases associated resources (e.g. thread pools, * HTTP connections). diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index 63b5feb3..f0fd3ccd 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import o.a.c.sidecar.client.shaded.client.SidecarClient; +import o.a.c.sidecar.client.shaded.client.SidecarInstance; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.EventConsumer; import org.apache.cassandra.cdc.api.SchemaSupplier; @@ -46,37 +48,32 @@ public class SidecarCdcTest @BeforeEach public void setup() { - String jobId = "test-job-123"; - int partitionId = 0; - CdcOptions cdcOptions = mock(CdcOptions.class); - ClusterConfigProvider clusterConfigProvider = mock(ClusterConfigProvider.class); - EventConsumer eventConsumer = mock(EventConsumer.class); - SchemaSupplier schemaSupplier = mock(SchemaSupplier.class); - TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class); + mockSidecarClient = mock(SidecarClient.class); + cdcStats = mock(ICdcStats.class); + } + + @Test + public void testBuilderMethodCreatesValidBuilder() + { SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcBuilder builder = new SidecarCdcBuilder( - jobId, - partitionId, - cdcOptions, - clusterConfigProvider, - eventConsumer, - schemaSupplier, - tokenRangeSupplier, + "test-job-123", + 0, + mock(CdcOptions.class), + mock(ClusterConfigProvider.class), + mock(EventConsumer.class), + mock(SchemaSupplier.class), + mock(TokenRangeSupplier.class), mockSidecarCdcClient, - cdcStats + mock(ICdcStats.class) ); - assertThat(builder).isNotNull(); assertThat(builder).isInstanceOf(SidecarCdcBuilder.class); - assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); + assertThat(builder.clusterConfigProvider).isNotNull(); assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); - mockSidecarClient = mock(SidecarClient.class); - cdcStats = mock(ICdcStats.class); } - @Test public void testPerInstancePortResolution() { @@ -118,47 +115,37 @@ public void testDefaultPortResolverUsesEffectivePort() { SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(7777, 3, 100L); - SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats); + SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + ignored -> clientConfig.effectivePort()); assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(7777); } @Test - public void testWithPortResolverOverridesDefault() + public void testBuildPortResolverFromProvider() { - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(9042, 3, 100L); - - SidecarCdcBuilder builder = new SidecarCdcBuilder( - "test-job", - 0, - mock(CdcOptions.class), - mock(ClusterConfigProvider.class), - mock(EventConsumer.class), - mock(SchemaSupplier.class), - mock(TokenRangeSupplier.class), - null, // use default portResolver: ignored -> clientConfig.effectivePort() - clientConfig, - mockSidecarClient, - cdcStats + List instances = List.of( + cdcSidecarInstance("host1", 9043), + cdcSidecarInstance("host2", 9044), + cdcSidecarInstance("host3", 9045) ); + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(5555, 3, 100L); - // Before override: default resolver returns effectivePort for every host - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) - .isEqualTo(9042); + var resolver = SidecarCdcClient.buildPortResolver(() -> instances, clientConfig); - // Apply custom resolver then rebuild the client so it picks up the new resolver - Map portMapping = Map.of("host1", 9100, "host2", 9200); - builder.withPortResolver(instance -> portMapping.getOrDefault(instance.nodeName(), clientConfig.effectivePort())) - .withSidecarClient(clientConfig, mockSidecarClient, cdcStats); - - // Known hosts now resolve to their mapped ports - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) - .isEqualTo(9100); - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")).port()) - .isEqualTo(9200); + assertThat(resolver.apply(new CassandraInstance("0", "host1", "DC1"))).isEqualTo(9043); + assertThat(resolver.apply(new CassandraInstance("100", "host2", "DC1"))).isEqualTo(9044); + assertThat(resolver.apply(new CassandraInstance("200", "host3", "DC1"))).isEqualTo(9045); + // Unknown host falls back to clientConfig.effectivePort() + assertThat(resolver.apply(new CassandraInstance("300", "unknown-host", "DC1"))).isEqualTo(5555); + } - // Unknown host still falls back to effectivePort - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("200", "unknown-host", "DC1")).port()) - .isEqualTo(9042); + private static CdcSidecarInstance cdcSidecarInstance(String hostname, int port) + { + return new CdcSidecarInstance() + { + public int port() { return port; } + public String hostname() { return hostname; } + }; } } From f8ac295cf0cb3a3c458a6416bac175895f041513 Mon Sep 17 00:00:00 2001 From: yafeng Date: Wed, 1 Apr 2026 17:52:56 -0700 Subject: [PATCH 4/6] add more tests --- .../cdc/sidecar/SidecarCdcClient.java | 5 +- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 67 +++++-------------- 2 files changed, 21 insertions(+), 51 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index 18201ba0..f33fa93b 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -103,8 +103,9 @@ public SidecarCdcClient(ClientConfig clientConfig, } /** - * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The resolver performs - * an O(1) map lookup keyed by hostname, falling back to the configured effective port. + * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The instances are + * indexed into a hostname-keyed map once (O(n)) at construction time, so that each subsequent + * resolution call is O(1). Unknown hostnames fall back to the configured effective port. */ static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, @NotNull ClientConfig clientConfig) diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index f0fd3ccd..3e586c0d 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cdc.sidecar; -import java.util.List; import java.util.Map; import org.junit.jupiter.api.BeforeEach; @@ -55,22 +54,31 @@ public void setup() @Test public void testBuilderMethodCreatesValidBuilder() { + String jobId = "test-job-123"; + int partitionId = 0; + CdcOptions cdcOptions = mock(CdcOptions.class); + ClusterConfigProvider clusterConfigProvider = mock(ClusterConfigProvider.class); + EventConsumer eventConsumer = mock(EventConsumer.class); + SchemaSupplier schemaSupplier = mock(SchemaSupplier.class); + TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class); SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class); + ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcBuilder builder = new SidecarCdcBuilder( - "test-job-123", - 0, - mock(CdcOptions.class), - mock(ClusterConfigProvider.class), - mock(EventConsumer.class), - mock(SchemaSupplier.class), - mock(TokenRangeSupplier.class), + jobId, + partitionId, + cdcOptions, + clusterConfigProvider, + eventConsumer, + schemaSupplier, + tokenRangeSupplier, mockSidecarCdcClient, - mock(ICdcStats.class) + cdcStats ); + assertThat(builder).isNotNull(); assertThat(builder).isInstanceOf(SidecarCdcBuilder.class); - assertThat(builder.clusterConfigProvider).isNotNull(); + assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider); assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient); } @@ -109,43 +117,4 @@ public void testFallbackToEffectivePortWhenHostNotFound() assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(9043); assertThat(client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")).port()).isEqualTo(8888); } - - @Test - public void testDefaultPortResolverUsesEffectivePort() - { - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(7777, 3, 100L); - - SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, - ignored -> clientConfig.effectivePort()); - - assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(7777); - } - - @Test - public void testBuildPortResolverFromProvider() - { - List instances = List.of( - cdcSidecarInstance("host1", 9043), - cdcSidecarInstance("host2", 9044), - cdcSidecarInstance("host3", 9045) - ); - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(5555, 3, 100L); - - var resolver = SidecarCdcClient.buildPortResolver(() -> instances, clientConfig); - - assertThat(resolver.apply(new CassandraInstance("0", "host1", "DC1"))).isEqualTo(9043); - assertThat(resolver.apply(new CassandraInstance("100", "host2", "DC1"))).isEqualTo(9044); - assertThat(resolver.apply(new CassandraInstance("200", "host3", "DC1"))).isEqualTo(9045); - // Unknown host falls back to clientConfig.effectivePort() - assertThat(resolver.apply(new CassandraInstance("300", "unknown-host", "DC1"))).isEqualTo(5555); - } - - private static CdcSidecarInstance cdcSidecarInstance(String hostname, int port) - { - return new CdcSidecarInstance() - { - public int port() { return port; } - public String hostname() { return hostname; } - }; - } } From 9e728e3c40f91c718f582df5a42c283381a2be0d Mon Sep 17 00:00:00 2001 From: yafeng Date: Wed, 1 Apr 2026 21:51:07 -0700 Subject: [PATCH 5/6] add with withPortResolver() --- .../cdc/sidecar/SidecarCdcBuilder.java | 8 ++++ .../cdc/sidecar/SidecarCdcClient.java | 30 ++++++------- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 44 +++++++++++++++++-- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index 4027db2d..502a5194 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,6 +19,8 @@ package org.apache.cassandra.cdc.sidecar; +import java.util.function.Function; + import com.google.common.base.Preconditions; import org.apache.cassandra.cdc.CdcBuilder; @@ -64,6 +66,12 @@ public SidecarCdcBuilder withClusterConfigProvider(ClusterConfigProvider cluster return this; } + public SidecarCdcBuilder withPortResolver(@NotNull Function portResolver) + { + this.sidecarCdcClient = sidecarCdcClient.withPortResolver(portResolver); + return this; + } + public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor) { this.downMonitor = downMonitor; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index f33fa93b..49b6ae29 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -64,21 +64,22 @@ public class SidecarCdcClient implements AutoCloseable final SidecarClient sidecarClient; final ICdcStats stats; @NotNull - final Function portResolver; + final Function portResolver; public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, SecretsProvider secretsProvider, ICdcStats cdcStats) throws IOException { - this(clientConfig, instancesProvider, secretsProvider, cdcStats, null); + this(clientConfig, instancesProvider, secretsProvider, cdcStats, + ignored -> clientConfig.effectivePort()); } public SidecarCdcClient(ClientConfig clientConfig, CdcSidecarInstancesProvider instancesProvider, SecretsProvider secretsProvider, ICdcStats cdcStats, - @Nullable Function portResolver) throws IOException + @NotNull Function portResolver) throws IOException { this(clientConfig, Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances() @@ -88,13 +89,13 @@ public SidecarCdcClient(ClientConfig clientConfig, clientConfig.toGenericSidecarConfig(), secretsProvider), cdcStats, - portResolver != null ? portResolver : buildPortResolver(instancesProvider, clientConfig)); + portResolver); } SidecarCdcClient(ClientConfig config, SidecarClient sidecarClient, ICdcStats stats, - @NotNull Function portResolver) + @NotNull Function portResolver) { this.config = config; this.sidecarClient = sidecarClient; @@ -103,17 +104,16 @@ public SidecarCdcClient(ClientConfig clientConfig, } /** - * Builds a port resolver function from the {@link CdcSidecarInstancesProvider}. The instances are - * indexed into a hostname-keyed map once (O(n)) at construction time, so that each subsequent - * resolution call is O(1). Unknown hostnames fall back to the configured effective port. + * Returns a new {@link SidecarCdcClient} that is identical to this one except that it uses + * the supplied {@code portResolver}. The underlying {@link SidecarClient} and all other + * configuration are reused, so no new HTTP connections or thread pools are created. + * + * @param portResolver function that maps a Sidecar node hostname to its port + * @return a new client with the given port resolver */ - static Function buildPortResolver(@NotNull CdcSidecarInstancesProvider provider, - @NotNull ClientConfig clientConfig) + public SidecarCdcClient withPortResolver(@NotNull Function portResolver) { - Map portMap = provider.instances().stream() - .collect(Collectors.toMap(CdcSidecarInstance::hostname, - CdcSidecarInstance::port)); - return instance -> portMap.getOrDefault(instance.nodeName(), clientConfig.effectivePort()); + return new SidecarCdcClient(config, sidecarClient, stats, portResolver); } /** @@ -220,7 +220,7 @@ protected SidecarInstance toSidecarInstance(CassandraInstance instance) @Override public int port() { - return portResolver.apply(instance); + return portResolver.apply(instance.nodeName()); } @Override diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index 3e586c0d..c12ead8c 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -62,7 +62,6 @@ public void testBuilderMethodCreatesValidBuilder() SchemaSupplier schemaSupplier = mock(SchemaSupplier.class); TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class); SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class); - ICdcStats cdcStats = mock(ICdcStats.class); SidecarCdcBuilder builder = new SidecarCdcBuilder( jobId, @@ -89,7 +88,7 @@ public void testPerInstancePortResolution() SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(); SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, - instance -> portMapping.getOrDefault(instance.nodeName(), 9043)); + hostname -> portMapping.getOrDefault(hostname, 9043)); SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")); assertThat(si1.hostname()).isEqualTo("host1"); @@ -104,6 +103,45 @@ public void testPerInstancePortResolution() assertThat(si3.port()).isEqualTo(9045); } + @Test + public void testWithPortResolverOnBuilder() + { + SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(9042, 3, 100L); + Map portMapping = Map.of("host1", 9100, "host2", 9200); + SidecarCdcClient baseClient = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, + ignored -> clientConfig.effectivePort()); + + SidecarCdcBuilder builder = new SidecarCdcBuilder( + "test-job", + 0, + mock(CdcOptions.class), + mock(ClusterConfigProvider.class), + mock(EventConsumer.class), + mock(SchemaSupplier.class), + mock(TokenRangeSupplier.class), + baseClient, + cdcStats + ); + + // Before override: every host resolves to effectivePort + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) + .isEqualTo(9042); + + // Override via builder chain + builder.withPortResolver(hostname -> portMapping.getOrDefault(hostname, + clientConfig.effectivePort())); + + // Known hosts resolve to their mapped ports after override + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) + .isEqualTo(9100); + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")).port()) + .isEqualTo(9200); + + // Unknown host falls back to effectivePort + assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("200", "unknown", "DC1")).port()) + .isEqualTo(9042); + } + @Test public void testFallbackToEffectivePortWhenHostNotFound() { @@ -111,7 +149,7 @@ public void testFallbackToEffectivePortWhenHostNotFound() Map portMapping = Map.of("host1", 9043); SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, - instance -> portMapping.getOrDefault(instance.nodeName(), + hostname -> portMapping.getOrDefault(hostname, clientConfig.effectivePort())); assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(9043); From 0b7c42b9352520f42ebc04135d41b507460cea2d Mon Sep 17 00:00:00 2001 From: yafeng Date: Thu, 2 Apr 2026 08:46:56 -0700 Subject: [PATCH 6/6] remove the withPortResolver in SidecarCdcBuilder --- .../cdc/sidecar/SidecarCdcBuilder.java | 8 ---- .../cdc/sidecar/SidecarCdcClient.java | 13 ------- .../cassandra/cdc/sidecar/SidecarCdcTest.java | 39 ------------------- 3 files changed, 60 deletions(-) diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java index 502a5194..4027db2d 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java @@ -19,8 +19,6 @@ package org.apache.cassandra.cdc.sidecar; -import java.util.function.Function; - import com.google.common.base.Preconditions; import org.apache.cassandra.cdc.CdcBuilder; @@ -66,12 +64,6 @@ public SidecarCdcBuilder withClusterConfigProvider(ClusterConfigProvider cluster return this; } - public SidecarCdcBuilder withPortResolver(@NotNull Function portResolver) - { - this.sidecarCdcClient = sidecarCdcClient.withPortResolver(portResolver); - return this; - } - public SidecarCdcBuilder withDownMonitor(SidecarDownMonitor downMonitor) { this.downMonitor = downMonitor; diff --git a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java index 49b6ae29..1a4a34ff 100644 --- a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java +++ b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java @@ -103,19 +103,6 @@ public SidecarCdcClient(ClientConfig clientConfig, this.portResolver = portResolver; } - /** - * Returns a new {@link SidecarCdcClient} that is identical to this one except that it uses - * the supplied {@code portResolver}. The underlying {@link SidecarClient} and all other - * configuration are reused, so no new HTTP connections or thread pools are created. - * - * @param portResolver function that maps a Sidecar node hostname to its port - * @return a new client with the given port resolver - */ - public SidecarCdcClient withPortResolver(@NotNull Function portResolver) - { - return new SidecarCdcClient(config, sidecarClient, stats, portResolver); - } - /** * Closes the underlying {@link SidecarClient} and releases associated resources (e.g. thread pools, * HTTP connections). diff --git a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java index c12ead8c..77d540dc 100644 --- a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java +++ b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java @@ -103,45 +103,6 @@ public void testPerInstancePortResolution() assertThat(si3.port()).isEqualTo(9045); } - @Test - public void testWithPortResolverOnBuilder() - { - SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(9042, 3, 100L); - Map portMapping = Map.of("host1", 9100, "host2", 9200); - SidecarCdcClient baseClient = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats, - ignored -> clientConfig.effectivePort()); - - SidecarCdcBuilder builder = new SidecarCdcBuilder( - "test-job", - 0, - mock(CdcOptions.class), - mock(ClusterConfigProvider.class), - mock(EventConsumer.class), - mock(SchemaSupplier.class), - mock(TokenRangeSupplier.class), - baseClient, - cdcStats - ); - - // Before override: every host resolves to effectivePort - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) - .isEqualTo(9042); - - // Override via builder chain - builder.withPortResolver(hostname -> portMapping.getOrDefault(hostname, - clientConfig.effectivePort())); - - // Known hosts resolve to their mapped ports after override - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()) - .isEqualTo(9100); - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("100", "host2", "DC1")).port()) - .isEqualTo(9200); - - // Unknown host falls back to effectivePort - assertThat(builder.sidecarCdcClient.toSidecarInstance(new CassandraInstance("200", "unknown", "DC1")).port()) - .isEqualTo(9042); - } - @Test public void testFallbackToEffectivePortWhenHostNotFound() {