Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Support per-instance sidecar port resolution in CDC client (CASSANALYTICS-130)
* Pass SidecarCdcClient as a constructor parameter to avoid thread/resource leaks (CASSANALYTICS-142)
* Support extended deletion time in CDC for Cassandra 5.0
* Flush event consumer before persisting CDC state to prevent data loss on failure (CASSANALYTICS-126)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
Expand All @@ -45,6 +46,7 @@
import org.apache.cassandra.spark.utils.MapUtils;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
Expand All @@ -61,11 +63,23 @@ public class SidecarCdcClient implements AutoCloseable
final ClientConfig config;
final SidecarClient sidecarClient;
final ICdcStats stats;
@NotNull
final Function<String, Integer> portResolver;

public SidecarCdcClient(ClientConfig clientConfig,
CdcSidecarInstancesProvider instancesProvider,
SecretsProvider secretsProvider,
ICdcStats cdcStats) throws IOException
{
this(clientConfig, instancesProvider, secretsProvider, cdcStats,
ignored -> clientConfig.effectivePort());
}

public SidecarCdcClient(ClientConfig clientConfig,
CdcSidecarInstancesProvider instancesProvider,
SecretsProvider secretsProvider,
ICdcStats cdcStats,
@NotNull Function<String, Integer> portResolver) throws IOException
{
this(clientConfig,
Sidecar.from(new SimpleSidecarInstancesProvider(instancesProvider.instances()
Expand All @@ -74,16 +88,19 @@ public SidecarCdcClient(ClientConfig clientConfig,
.collect(Collectors.toList())),
clientConfig.toGenericSidecarConfig(),
secretsProvider),
cdcStats);
cdcStats,
portResolver);
}

private SidecarCdcClient(ClientConfig config,
SidecarClient sidecarClient,
ICdcStats stats)
SidecarCdcClient(ClientConfig config,
SidecarClient sidecarClient,
ICdcStats stats,
@NotNull Function<String, Integer> portResolver)
{
this.config = config;
this.sidecarClient = sidecarClient;
this.stats = stats;
this.portResolver = portResolver;
}

/**
Expand Down Expand Up @@ -190,7 +207,7 @@ protected SidecarInstance toSidecarInstance(CassandraInstance instance)
@Override
public int port()
{
return config.effectivePort();
return portResolver.apply(instance.nodeName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package org.apache.cassandra.cdc.sidecar;

import java.util.Map;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import o.a.c.sidecar.client.shaded.client.SidecarClient;
import o.a.c.sidecar.client.shaded.client.SidecarInstance;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
import org.apache.cassandra.cdc.api.SchemaSupplier;
import org.apache.cassandra.cdc.api.TokenRangeSupplier;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
Expand All @@ -35,6 +41,16 @@
*/
public class SidecarCdcTest
{
private SidecarClient mockSidecarClient;
private ICdcStats cdcStats;

@BeforeEach
public void setup()
{
mockSidecarClient = mock(SidecarClient.class);
cdcStats = mock(ICdcStats.class);
}

@Test
public void testBuilderMethodCreatesValidBuilder()
{
Expand All @@ -46,7 +62,6 @@ public void testBuilderMethodCreatesValidBuilder()
SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class);
ICdcStats cdcStats = mock(ICdcStats.class);

SidecarCdcBuilder builder = new SidecarCdcBuilder(
jobId,
Expand All @@ -65,4 +80,40 @@ public void testBuilderMethodCreatesValidBuilder()
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);
assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient);
}

@Test
public void testPerInstancePortResolution()
{
Map<String, Integer> portMapping = Map.of("host1", 9043, "host2", 9044, "host3", 9045);
SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create();

SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats,
hostname -> portMapping.getOrDefault(hostname, 9043));

SidecarInstance si1 = client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1"));
assertThat(si1.hostname()).isEqualTo("host1");
assertThat(si1.port()).isEqualTo(9043);

SidecarInstance si2 = client.toSidecarInstance(new CassandraInstance("100", "host2", "DC1"));
assertThat(si2.hostname()).isEqualTo("host2");
assertThat(si2.port()).isEqualTo(9044);

SidecarInstance si3 = client.toSidecarInstance(new CassandraInstance("200", "host3", "DC1"));
assertThat(si3.hostname()).isEqualTo("host3");
assertThat(si3.port()).isEqualTo(9045);
}

@Test
public void testFallbackToEffectivePortWhenHostNotFound()
{
SidecarCdcClient.ClientConfig clientConfig = SidecarCdcClient.ClientConfig.create(8888, 3, 100L);
Map<String, Integer> portMapping = Map.of("host1", 9043);

SidecarCdcClient client = new SidecarCdcClient(clientConfig, mockSidecarClient, cdcStats,
hostname -> portMapping.getOrDefault(hostname,
clientConfig.effectivePort()));

assertThat(client.toSidecarInstance(new CassandraInstance("0", "host1", "DC1")).port()).isEqualTo(9043);
assertThat(client.toSidecarInstance(new CassandraInstance("100", "unknown-host", "DC1")).port()).isEqualTo(8888);
}
}