From 388f906399df5ba6b720e9e3c7e98c4f2e0cce01 Mon Sep 17 00:00:00 2001 From: jkonisa Date: Fri, 27 Mar 2026 22:30:12 -0700 Subject: [PATCH 1/5] CASSSIDECAR-417 : CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to null --- .../cassandra/sidecar/cdc/CdcManager.java | 24 ++-- .../cassandra/sidecar/cdc/CdcManagerTest.java | 112 +++++++++++++----- 2 files changed, 96 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index 215913540..9ce2b74b0 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -28,6 +28,8 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +45,10 @@ import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; import org.apache.cassandra.cdc.stats.ICdcStats; import org.apache.cassandra.secrets.SecretsProvider; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.utils.AsyncExecutor; @@ -207,20 +209,18 @@ public void stopConsumers() consumers.forEach(SidecarCdc::stop); } - private Integer getInstanceId(String instanceIp) + @VisibleForTesting + Integer getInstanceId(String instanceIp) { - for (InstanceMetadata instance : instanceFetcher.allLocalInstances()) + try { - String configuredIpAddress = instance.ipAddress(); - - // Option 1a: Normalize both to InetAddress and compare - if (resolveToSameAddress(instanceIp, configuredIpAddress)) - { - return instance.id(); - } + return instanceFetcher.instance(instanceIp).id(); + } + catch (NoSuchCassandraInstanceException e) + { + LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); + return -1; } - LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); - return -1; } public static boolean resolveToSameAddress(String address1, String address2) diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java index 3fd958ef3..a0c31876e 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.math.BigInteger; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +43,7 @@ import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -52,10 +52,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -144,10 +146,9 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -156,7 +157,6 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(1); } @@ -177,10 +177,9 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer1 = mock(SidecarCdc.class); SidecarCdc mockConsumer2 = mock(SidecarCdc.class); @@ -190,7 +189,6 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(2); } @@ -212,15 +210,11 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException InstanceMetadata instance1 = mockInstance(instance1Id, instance1Ip); InstanceMetadata instance2 = mockInstance(instance2Id, instance2Ip); - List instances = new ArrayList<>(); - instances.add(instance1); - instances.add(instance2); - when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(instances); + when(instanceFetcher.instance(instance1Ip)).thenReturn(instance1); + when(instanceFetcher.instance(instance2Ip)).thenReturn(instance2); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer1 = mock(SidecarCdc.class); SidecarCdc mockConsumer2 = mock(SidecarCdc.class); @@ -230,7 +224,6 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(2); } @@ -240,7 +233,6 @@ void testDuplicateRangesDeduplicates() throws IOException String instanceIp = "127.0.0.1"; int instanceId = 1; - // Create two identical ranges TokenRange range1 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); TokenRange range2 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); @@ -253,10 +245,9 @@ void testDuplicateRangesDeduplicates() throws IOException InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -265,7 +256,6 @@ void testDuplicateRangesDeduplicates() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert - Should deduplicate to 1 consumer assertThat(consumers).hasSize(1); } @@ -277,12 +267,10 @@ void testUnknownInstanceHandlesGracefully() throws IOException TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); Map> ownedRanges = Collections.singletonMap(unknownIp, Collections.singleton(range)); - // No matching instances when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList()); + when(instanceFetcher.instance(unknownIp)).thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp)); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer - will be called with instanceId = -1 CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -291,24 +279,92 @@ void testUnknownInstanceHandlesGracefully() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert - Should still create consumer with instanceId = -1 assertThat(consumers).hasSize(1); } @Test void testResolveToSameAddressTrue() { - String address1 = "127.0.0.1"; - String address2 = "localhost"; - assertThat(CdcManager.resolveToSameAddress(address1, address2)).isTrue(); + assertThat(CdcManager.resolveToSameAddress("127.0.0.1", "localhost")).isTrue(); } @Test void testResolveToSameAddressFalse() { - String address1 = "127.0.0.1"; - String address2 = "127.0.0.2"; - assertThat(CdcManager.resolveToSameAddress(address1, address2)).isFalse(); + assertThat(CdcManager.resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse(); + } + + @Test + void testGetInstanceIdResolvesCorrectlyWhenIpAddressIsNull() throws IOException + { + String instanceIp = "172.19.0.5"; + int instanceId = 1000; + + TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); + Map> ownedRanges = Collections.singletonMap(instanceIp, Collections.singleton(range)); + + InstanceMetadata instance = mock(InstanceMetadata.class, RETURNS_DEEP_STUBS); + when(instance.id()).thenReturn(instanceId); + when(instance.ipAddress()).thenReturn(null); + + when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); + when(cdcConfig.jobId()).thenReturn("test-job"); + + CdcManager spyManager = spy(cdcManager); + SidecarCdc mockConsumer = mock(SidecarCdc.class); + doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( + anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + ); + + List consumers = spyManager.buildCdcConsumers(); + + assertThat(consumers).hasSize(1); + verify(spyManager).loadOrBuildCdcConsumer( + eq(instanceId), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + ); + } + + /** + * Verifies that getInstanceId returns the correct id even when ipAddress() is null. + * + *

Old code (broken): allLocalInstances() returns the instance; ipAddress()=null; + * resolveToSameAddress("172.19.0.5", null) calls InetAddress.getByName(null) which + * returns 127.0.0.1 ≠ 172.19.0.5 → returns -1. Test FAILS with old code. + * + *

Fixed code: instanceFetcher.instance(ip) resolves via instanceFromHost refresh + * path → returns instance with id=1. Test PASSES with fix. + */ + @Test + void testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull() + { + String instanceIp = "172.19.0.5"; + int instanceId = 1; + + InstanceMetadata instance = mock(InstanceMetadata.class); + when(instance.id()).thenReturn(instanceId); + when(instance.ipAddress()).thenReturn(null); // not yet refreshed — the key precondition + + when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); + + assertThat(cdcManager.getInstanceId(instanceIp)).isEqualTo(instanceId); + } + + /** + * Verifies that getInstanceId returns -1 when the IP is not known to any local instance. + * Both old and new code produce -1 here, but via different mechanisms. + */ + @Test + void testGetInstanceIdReturnsMinusOneWhenInstanceNotFound() + { + String unknownIp = "192.168.1.100"; + + when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList()); + when(instanceFetcher.instance(unknownIp)) + .thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp)); + + assertThat(cdcManager.getInstanceId(unknownIp)).isEqualTo(-1); } // Helper methods From 21125b7c1542a8a923221503f4d7ab301d9ee570 Mon Sep 17 00:00:00 2001 From: jkonisa Date: Tue, 31 Mar 2026 06:32:05 -0700 Subject: [PATCH 2/5] Addressing comments --- .../cassandra/sidecar/cdc/CdcManager.java | 21 +------------------ .../cassandra/sidecar/cdc/CdcManagerTest.java | 20 ++++++++++++++++-- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index 9ce2b74b0..c98295f33 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -19,8 +19,6 @@ package org.apache.cassandra.sidecar.cdc; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,7 +26,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +45,6 @@ import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; -import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.utils.AsyncExecutor; @@ -216,28 +212,13 @@ Integer getInstanceId(String instanceIp) { return instanceFetcher.instance(instanceIp).id(); } - catch (NoSuchCassandraInstanceException e) + catch (Exception e) { LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); return -1; } } - public static boolean resolveToSameAddress(String address1, String address2) - { - try - { - InetAddress addr1 = InetAddress.getByName(address1); - InetAddress addr2 = InetAddress.getByName(address2); - return addr1.equals(addr2); - } - catch (UnknownHostException e) - { - LOGGER.warn("Could not resolve hostname: {}", e.getMessage()); - return address1.equals(address2); // Fallback to string comparison - } - } - public SidecarCdc buildConsumer(@NotNull String jobId, int partitionId, diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java index a0c31876e..c1c011a47 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.math.BigInteger; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -285,13 +287,13 @@ void testUnknownInstanceHandlesGracefully() throws IOException @Test void testResolveToSameAddressTrue() { - assertThat(CdcManager.resolveToSameAddress("127.0.0.1", "localhost")).isTrue(); + assertThat(resolveToSameAddress("127.0.0.1", "localhost")).isTrue(); } @Test void testResolveToSameAddressFalse() { - assertThat(CdcManager.resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse(); + assertThat(resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse(); } @Test @@ -384,4 +386,18 @@ private InstanceMetadata mockInstance(int id, String ipAddress) when(instance.ipAddress()).thenReturn(ipAddress); return instance; } + + private static boolean resolveToSameAddress(String address1, String address2) + { + try + { + InetAddress addr1 = InetAddress.getByName(address1); + InetAddress addr2 = InetAddress.getByName(address2); + return addr1.equals(addr2); + } + catch (UnknownHostException e) + { + return address1.equals(address2); + } + } } From 7a126f8c36595e6aabd569d104789425fe1c1f1b Mon Sep 17 00:00:00 2001 From: jkonisa Date: Wed, 1 Apr 2026 11:42:52 -0700 Subject: [PATCH 3/5] Adding comment to tests --- .../cassandra/sidecar/cdc/CdcManagerTest.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java index c1c011a47..48fe70b5d 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java @@ -296,6 +296,12 @@ void testResolveToSameAddressFalse() assertThat(resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse(); } + /** + * Verifies that the correct instanceId is propagated into {@code loadOrBuildCdcConsumer} + * during the full {@code buildCdcConsumers()} flow when {@code ipAddress()} is null. + * Complements {@code testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull}, which tests + * {@code getInstanceId} in isolation; this test confirms the fix is effective end-to-end. + */ @Test void testGetInstanceIdResolvesCorrectlyWhenIpAddressIsNull() throws IOException { @@ -328,14 +334,10 @@ void testGetInstanceIdResolvesCorrectlyWhenIpAddressIsNull() throws IOException } /** - * Verifies that getInstanceId returns the correct id even when ipAddress() is null. - * - *

Old code (broken): allLocalInstances() returns the instance; ipAddress()=null; - * resolveToSameAddress("172.19.0.5", null) calls InetAddress.getByName(null) which - * returns 127.0.0.1 ≠ 172.19.0.5 → returns -1. Test FAILS with old code. - * - *

Fixed code: instanceFetcher.instance(ip) resolves via instanceFromHost refresh - * path → returns instance with id=1. Test PASSES with fix. + * Unit test for the CASSSIDECAR-417 bug fix: {@code getInstanceId} must return the correct id + * even when {@code ipAddress()} is null (not yet refreshed). The old code passed {@code null} + * to {@code resolveToSameAddress}, which resolved to {@code 127.0.0.1} and returned {@code -1}. + * The fix resolves the instance via {@code instanceFetcher.instance(ip)} instead. */ @Test void testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull() From 9b8f166832ad5cee711f0dade18653c6ef77b740 Mon Sep 17 00:00:00 2001 From: jkonisa Date: Wed, 1 Apr 2026 11:45:05 -0700 Subject: [PATCH 4/5] changes.txt --- CHANGES.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.txt b/CHANGES.txt index 45666644d..8f05b92fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to null (CASSSIDECAR-417) * Add JDK11_OPTIONS to the startup script (CASSSIDECAR-416) * Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409) * Define common operational job tracking interface and refactor current operational job tracker (CASSSIDECAR-372) From 899ad9a78af54b90b84673512e62574d4048f454 Mon Sep 17 00:00:00 2001 From: jkonisa Date: Mon, 6 Apr 2026 12:52:06 -0700 Subject: [PATCH 5/5] fixing checkstyle and addressing comments --- .../java/org/apache/cassandra/sidecar/cdc/CdcManager.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index c98295f33..d5bb24e47 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.jetbrains.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +48,7 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; /** @@ -76,6 +76,7 @@ public class CdcManager { private static final Logger LOGGER = LoggerFactory.getLogger(CdcManager.class); + private static final int UNKNOWN_INSTANCE = -1; private final CdcConfig conf; private final RangeManager rangeManager; private final InstanceMetadataFetcher instanceFetcher; @@ -215,7 +216,7 @@ Integer getInstanceId(String instanceIp) catch (Exception e) { LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); - return -1; + return UNKNOWN_INSTANCE; } }