Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to null (CASSSIDECAR-417)
* Add JDK11_OPTIONS to the startup script (CASSSIDECAR-416)
* Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409)
* Define common operational job tracking interface and refactor current operational job tracker (CASSSIDECAR-372)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.cassandra.sidecar.cdc;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,14 +41,14 @@
import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;


/**
Expand Down Expand Up @@ -78,6 +76,7 @@
public class CdcManager
{
private static final Logger LOGGER = LoggerFactory.getLogger(CdcManager.class);
private static final int UNKNOWN_INSTANCE = -1;
private final CdcConfig conf;
private final RangeManager rangeManager;
private final InstanceMetadataFetcher instanceFetcher;
Expand Down Expand Up @@ -207,34 +206,17 @@ public void stopConsumers()
consumers.forEach(SidecarCdc::stop);
}

private Integer getInstanceId(String instanceIp)
{
for (InstanceMetadata instance : instanceFetcher.allLocalInstances())
{
String configuredIpAddress = instance.ipAddress();

// Option 1a: Normalize both to InetAddress and compare
if (resolveToSameAddress(instanceIp, configuredIpAddress))
{
return instance.id();
}
}
LOGGER.warn("Requested IP {} does not match with any instances", instanceIp);
return -1;
}

public static boolean resolveToSameAddress(String address1, String address2)
@VisibleForTesting
Integer getInstanceId(String instanceIp)
{
try
{
InetAddress addr1 = InetAddress.getByName(address1);
InetAddress addr2 = InetAddress.getByName(address2);
return addr1.equals(addr2);
return instanceFetcher.instance(instanceIp).id();
}
catch (UnknownHostException e)
catch (Exception e)
{
LOGGER.warn("Could not resolve hostname: {}", e.getMessage());
return address1.equals(address2); // Fallback to string comparison
LOGGER.warn("Requested IP {} does not match with any instances", instanceIp);
return UNKNOWN_INSTANCE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -44,6 +45,7 @@
import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
import org.apache.cassandra.sidecar.coordination.RangeManager;
import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
Expand All @@ -52,10 +54,12 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -144,10 +148,9 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException
InstanceMetadata instance = mockInstance(instanceId, instanceIp);

when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance));
when(instanceFetcher.instance(instanceIp)).thenReturn(instance);
when(cdcConfig.jobId()).thenReturn("test-job");

// Spy to mock loadOrBuildCdcConsumer
CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer = mock(SidecarCdc.class);
doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
Expand All @@ -156,7 +159,6 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

// Assert
assertThat(consumers).hasSize(1);
}

Expand All @@ -177,10 +179,9 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti
InstanceMetadata instance = mockInstance(instanceId, instanceIp);

when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance));
when(instanceFetcher.instance(instanceIp)).thenReturn(instance);
when(cdcConfig.jobId()).thenReturn("test-job");

// Spy to mock loadOrBuildCdcConsumer
CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer1 = mock(SidecarCdc.class);
SidecarCdc mockConsumer2 = mock(SidecarCdc.class);
Expand All @@ -190,7 +191,6 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

// Assert
assertThat(consumers).hasSize(2);
}

Expand All @@ -212,15 +212,11 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException
InstanceMetadata instance1 = mockInstance(instance1Id, instance1Ip);
InstanceMetadata instance2 = mockInstance(instance2Id, instance2Ip);

List<InstanceMetadata> instances = new ArrayList<>();
instances.add(instance1);
instances.add(instance2);

when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.allLocalInstances()).thenReturn(instances);
when(instanceFetcher.instance(instance1Ip)).thenReturn(instance1);
when(instanceFetcher.instance(instance2Ip)).thenReturn(instance2);
when(cdcConfig.jobId()).thenReturn("test-job");

// Spy to mock loadOrBuildCdcConsumer
CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer1 = mock(SidecarCdc.class);
SidecarCdc mockConsumer2 = mock(SidecarCdc.class);
Expand All @@ -230,7 +226,6 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

// Assert
assertThat(consumers).hasSize(2);
}

Expand All @@ -240,7 +235,6 @@ void testDuplicateRangesDeduplicates() throws IOException
String instanceIp = "127.0.0.1";
int instanceId = 1;

// Create two identical ranges
TokenRange range1 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN);
TokenRange range2 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN);

Expand All @@ -253,10 +247,9 @@ void testDuplicateRangesDeduplicates() throws IOException
InstanceMetadata instance = mockInstance(instanceId, instanceIp);

when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance));
when(instanceFetcher.instance(instanceIp)).thenReturn(instance);
when(cdcConfig.jobId()).thenReturn("test-job");

// Spy to mock loadOrBuildCdcConsumer
CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer = mock(SidecarCdc.class);
doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
Expand All @@ -265,7 +258,6 @@ void testDuplicateRangesDeduplicates() throws IOException

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

// Assert - Should deduplicate to 1 consumer
assertThat(consumers).hasSize(1);
}

Expand All @@ -277,12 +269,10 @@ void testUnknownInstanceHandlesGracefully() throws IOException
TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN);
Map<String, Set<TokenRange>> ownedRanges = Collections.singletonMap(unknownIp, Collections.singleton(range));

// No matching instances
when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList());
when(instanceFetcher.instance(unknownIp)).thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp));
when(cdcConfig.jobId()).thenReturn("test-job");

// Spy to mock loadOrBuildCdcConsumer - will be called with instanceId = -1
CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer = mock(SidecarCdc.class);
doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
Expand All @@ -291,24 +281,94 @@ void testUnknownInstanceHandlesGracefully() throws IOException

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

// Assert - Should still create consumer with instanceId = -1
assertThat(consumers).hasSize(1);
}

@Test
void testResolveToSameAddressTrue()
{
String address1 = "127.0.0.1";
String address2 = "localhost";
assertThat(CdcManager.resolveToSameAddress(address1, address2)).isTrue();
assertThat(resolveToSameAddress("127.0.0.1", "localhost")).isTrue();
}

@Test
void testResolveToSameAddressFalse()
{
String address1 = "127.0.0.1";
String address2 = "127.0.0.2";
assertThat(CdcManager.resolveToSameAddress(address1, address2)).isFalse();
assertThat(resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse();
}

/**
* Verifies that the correct instanceId is propagated into {@code loadOrBuildCdcConsumer}
* during the full {@code buildCdcConsumers()} flow when {@code ipAddress()} is null.
* Complements {@code testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull}, which tests
* {@code getInstanceId} in isolation; this test confirms the fix is effective end-to-end.
*/
@Test
void testGetInstanceIdResolvesCorrectlyWhenIpAddressIsNull() throws IOException
{
String instanceIp = "172.19.0.5";
int instanceId = 1000;

TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN);
Map<String, Set<TokenRange>> ownedRanges = Collections.singletonMap(instanceIp, Collections.singleton(range));

InstanceMetadata instance = mock(InstanceMetadata.class, RETURNS_DEEP_STUBS);
when(instance.id()).thenReturn(instanceId);
when(instance.ipAddress()).thenReturn(null);

when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges);
when(instanceFetcher.instance(instanceIp)).thenReturn(instance);
when(cdcConfig.jobId()).thenReturn("test-job");

CdcManager spyManager = spy(cdcManager);
SidecarCdc mockConsumer = mock(SidecarCdc.class);
doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer(
anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()
);

List<SidecarCdc> consumers = spyManager.buildCdcConsumers();

assertThat(consumers).hasSize(1);
verify(spyManager).loadOrBuildCdcConsumer(
eq(instanceId), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()
);
}

/**
* Unit test for the CASSSIDECAR-417 bug fix: {@code getInstanceId} must return the correct id
* even when {@code ipAddress()} is null (not yet refreshed). The old code passed {@code null}
* to {@code resolveToSameAddress}, which resolved to {@code 127.0.0.1} and returned {@code -1}.
* The fix resolves the instance via {@code instanceFetcher.instance(ip)} instead.
*/
@Test
void testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull()
{
String instanceIp = "172.19.0.5";
int instanceId = 1;

InstanceMetadata instance = mock(InstanceMetadata.class);
when(instance.id()).thenReturn(instanceId);
when(instance.ipAddress()).thenReturn(null); // not yet refreshed — the key precondition

when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance));
when(instanceFetcher.instance(instanceIp)).thenReturn(instance);

assertThat(cdcManager.getInstanceId(instanceIp)).isEqualTo(instanceId);
}

/**
* Verifies that getInstanceId returns -1 when the IP is not known to any local instance.
* Both old and new code produce -1 here, but via different mechanisms.
*/
@Test
void testGetInstanceIdReturnsMinusOneWhenInstanceNotFound()
{
String unknownIp = "192.168.1.100";

when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList());
when(instanceFetcher.instance(unknownIp))
.thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp));

assertThat(cdcManager.getInstanceId(unknownIp)).isEqualTo(-1);
}

// Helper methods
Expand All @@ -328,4 +388,18 @@ private InstanceMetadata mockInstance(int id, String ipAddress)
when(instance.ipAddress()).thenReturn(ipAddress);
return instance;
}

private static boolean resolveToSameAddress(String address1, String address2)
{
try
{
InetAddress addr1 = InetAddress.getByName(address1);
InetAddress addr2 = InetAddress.getByName(address2);
return addr1.equals(addr2);
}
catch (UnknownHostException e)
{
return address1.equals(address2);
}
}
}