diff --git a/CHANGES.txt b/CHANGES.txt index 45666644d..8f05b92fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.4.0 ----- + * CdcManager.getInstanceId(instanceIp) returns -1 as it resolves ipAddress to null (CASSSIDECAR-417) * Add JDK11_OPTIONS to the startup script (CASSSIDECAR-416) * Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409) * Define common operational job tracking interface and refactor current operational job tracker (CASSSIDECAR-372) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java index 215913540..d5bb24e47 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java @@ -19,8 +19,6 @@ package org.apache.cassandra.sidecar.cdc; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -43,7 +41,6 @@ import org.apache.cassandra.cdc.sidecar.SidecarStatePersister; import org.apache.cassandra.cdc.stats.ICdcStats; import org.apache.cassandra.secrets.SecretsProvider; -import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; @@ -51,6 +48,7 @@ import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; /** @@ -78,6 +76,7 @@ public class CdcManager { private static final Logger LOGGER = LoggerFactory.getLogger(CdcManager.class); + private static final int UNKNOWN_INSTANCE = -1; private final CdcConfig conf; private final RangeManager rangeManager; private final InstanceMetadataFetcher instanceFetcher; @@ -207,34 +206,17 @@ public void stopConsumers() consumers.forEach(SidecarCdc::stop); } - private Integer getInstanceId(String instanceIp) - { - for (InstanceMetadata instance : instanceFetcher.allLocalInstances()) - { - String configuredIpAddress = instance.ipAddress(); - - // Option 1a: Normalize both to InetAddress and compare - if (resolveToSameAddress(instanceIp, configuredIpAddress)) - { - return instance.id(); - } - } - LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); - return -1; - } - - public static boolean resolveToSameAddress(String address1, String address2) + @VisibleForTesting + Integer getInstanceId(String instanceIp) { try { - InetAddress addr1 = InetAddress.getByName(address1); - InetAddress addr2 = InetAddress.getByName(address2); - return addr1.equals(addr2); + return instanceFetcher.instance(instanceIp).id(); } - catch (UnknownHostException e) + catch (Exception e) { - LOGGER.warn("Could not resolve hostname: {}", e.getMessage()); - return address1.equals(address2); // Fallback to string comparison + LOGGER.warn("Requested IP {} does not match with any instances", instanceIp); + return UNKNOWN_INSTANCE; } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java index 3fd958ef3..48fe70b5d 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcManagerTest.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.math.BigInteger; -import java.util.ArrayList; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,7 @@ import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.coordination.RangeManager; import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -52,10 +54,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -144,10 +148,9 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -156,7 +159,6 @@ void testSingleInstanceSingleRangeCreatesOneConsumer() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(1); } @@ -177,10 +179,9 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer1 = mock(SidecarCdc.class); SidecarCdc mockConsumer2 = mock(SidecarCdc.class); @@ -190,7 +191,6 @@ void testSingleInstanceMultipleRangesCreatesMultipleConsumers() throws IOExcepti List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(2); } @@ -212,15 +212,11 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException InstanceMetadata instance1 = mockInstance(instance1Id, instance1Ip); InstanceMetadata instance2 = mockInstance(instance2Id, instance2Ip); - List instances = new ArrayList<>(); - instances.add(instance1); - instances.add(instance2); - when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(instances); + when(instanceFetcher.instance(instance1Ip)).thenReturn(instance1); + when(instanceFetcher.instance(instance2Ip)).thenReturn(instance2); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer1 = mock(SidecarCdc.class); SidecarCdc mockConsumer2 = mock(SidecarCdc.class); @@ -230,7 +226,6 @@ void testMultipleInstancesMultipleRangesCreatesConsumers() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert assertThat(consumers).hasSize(2); } @@ -240,7 +235,6 @@ void testDuplicateRangesDeduplicates() throws IOException String instanceIp = "127.0.0.1"; int instanceId = 1; - // Create two identical ranges TokenRange range1 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); TokenRange range2 = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); @@ -253,10 +247,9 @@ void testDuplicateRangesDeduplicates() throws IOException InstanceMetadata instance = mockInstance(instanceId, instanceIp); when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -265,7 +258,6 @@ void testDuplicateRangesDeduplicates() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert - Should deduplicate to 1 consumer assertThat(consumers).hasSize(1); } @@ -277,12 +269,10 @@ void testUnknownInstanceHandlesGracefully() throws IOException TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); Map> ownedRanges = Collections.singletonMap(unknownIp, Collections.singleton(range)); - // No matching instances when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); - when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList()); + when(instanceFetcher.instance(unknownIp)).thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp)); when(cdcConfig.jobId()).thenReturn("test-job"); - // Spy to mock loadOrBuildCdcConsumer - will be called with instanceId = -1 CdcManager spyManager = spy(cdcManager); SidecarCdc mockConsumer = mock(SidecarCdc.class); doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( @@ -291,24 +281,94 @@ void testUnknownInstanceHandlesGracefully() throws IOException List consumers = spyManager.buildCdcConsumers(); - // Assert - Should still create consumer with instanceId = -1 assertThat(consumers).hasSize(1); } @Test void testResolveToSameAddressTrue() { - String address1 = "127.0.0.1"; - String address2 = "localhost"; - assertThat(CdcManager.resolveToSameAddress(address1, address2)).isTrue(); + assertThat(resolveToSameAddress("127.0.0.1", "localhost")).isTrue(); } @Test void testResolveToSameAddressFalse() { - String address1 = "127.0.0.1"; - String address2 = "127.0.0.2"; - assertThat(CdcManager.resolveToSameAddress(address1, address2)).isFalse(); + assertThat(resolveToSameAddress("127.0.0.1", "127.0.0.2")).isFalse(); + } + + /** + * Verifies that the correct instanceId is propagated into {@code loadOrBuildCdcConsumer} + * during the full {@code buildCdcConsumers()} flow when {@code ipAddress()} is null. + * Complements {@code testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull}, which tests + * {@code getInstanceId} in isolation; this test confirms the fix is effective end-to-end. + */ + @Test + void testGetInstanceIdResolvesCorrectlyWhenIpAddressIsNull() throws IOException + { + String instanceIp = "172.19.0.5"; + int instanceId = 1000; + + TokenRange range = mockTokenRange(BigInteger.ZERO, BigInteger.TEN); + Map> ownedRanges = Collections.singletonMap(instanceIp, Collections.singleton(range)); + + InstanceMetadata instance = mock(InstanceMetadata.class, RETURNS_DEEP_STUBS); + when(instance.id()).thenReturn(instanceId); + when(instance.ipAddress()).thenReturn(null); + + when(rangeManager.ownedTokenRanges()).thenReturn(ownedRanges); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); + when(cdcConfig.jobId()).thenReturn("test-job"); + + CdcManager spyManager = spy(cdcManager); + SidecarCdc mockConsumer = mock(SidecarCdc.class); + doReturn(mockConsumer).when(spyManager).loadOrBuildCdcConsumer( + anyInt(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + ); + + List consumers = spyManager.buildCdcConsumers(); + + assertThat(consumers).hasSize(1); + verify(spyManager).loadOrBuildCdcConsumer( + eq(instanceId), any(), any(), any(), any(), any(), any(), any(), any(), any(), any() + ); + } + + /** + * Unit test for the CASSSIDECAR-417 bug fix: {@code getInstanceId} must return the correct id + * even when {@code ipAddress()} is null (not yet refreshed). The old code passed {@code null} + * to {@code resolveToSameAddress}, which resolved to {@code 127.0.0.1} and returned {@code -1}. + * The fix resolves the instance via {@code instanceFetcher.instance(ip)} instead. + */ + @Test + void testGetInstanceIdReturnsCorrectIdWhenIpAddressIsNull() + { + String instanceIp = "172.19.0.5"; + int instanceId = 1; + + InstanceMetadata instance = mock(InstanceMetadata.class); + when(instance.id()).thenReturn(instanceId); + when(instance.ipAddress()).thenReturn(null); // not yet refreshed — the key precondition + + when(instanceFetcher.allLocalInstances()).thenReturn(Collections.singletonList(instance)); + when(instanceFetcher.instance(instanceIp)).thenReturn(instance); + + assertThat(cdcManager.getInstanceId(instanceIp)).isEqualTo(instanceId); + } + + /** + * Verifies that getInstanceId returns -1 when the IP is not known to any local instance. + * Both old and new code produce -1 here, but via different mechanisms. + */ + @Test + void testGetInstanceIdReturnsMinusOneWhenInstanceNotFound() + { + String unknownIp = "192.168.1.100"; + + when(instanceFetcher.allLocalInstances()).thenReturn(Collections.emptyList()); + when(instanceFetcher.instance(unknownIp)) + .thenThrow(new NoSuchCassandraInstanceException("Instance not found: " + unknownIp)); + + assertThat(cdcManager.getInstanceId(unknownIp)).isEqualTo(-1); } // Helper methods @@ -328,4 +388,18 @@ private InstanceMetadata mockInstance(int id, String ipAddress) when(instance.ipAddress()).thenReturn(ipAddress); return instance; } + + private static boolean resolveToSameAddress(String address1, String address2) + { + try + { + InetAddress addr1 = InetAddress.getByName(address1); + InetAddress addr2 = InetAddress.getByName(address2); + return addr1.equals(addr2); + } + catch (UnknownHostException e) + { + return address1.equals(address2); + } + } }