Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.5.0
-----
* Exclude IP address from RingInstance equality so node replacement does not fail bulk write jobs (CASSANALYTICS-175)
* Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ public NodeStatus nodeStatus()
}

/**
* Custom equality that compares the token, fully qualified domain name, the port, the datacenter and the clusterId
* Custom equality that compares the token, fully qualified domain name, the rack, the port, the datacenter
* and the clusterId
*
* Note that node state, status, are not part of the calculation.
* Note that node state, status and IP address are not part of the calculation. The IP address is excluded
* because a node can come back with a different IP address (e.g. a pod replacement in Kubernetes) while
* remaining the same logical instance.
*
* @param other the other instance
* @return true if both instances are equal, false otherwise
Expand All @@ -147,22 +150,22 @@ public boolean equals(@Nullable Object other)
&& Objects.equals(ringEntry.token(), that.ringEntry.token())
&& Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn())
&& Objects.equals(ringEntry.rack(), that.ringEntry.rack())
&& Objects.equals(ringEntry.address(), that.ringEntry.address())
&& ringEntry.port() == that.ringEntry.port()
&& Objects.equals(ringEntry.datacenter(), that.ringEntry.datacenter());
}

/**
* Custom hashCode that compares the token, fully qualified domain name, the port, and the datacenter
* Custom hashCode that hashes the token, fully qualified domain name, the rack, the port, the datacenter
* and the clusterId
*
* Note that node state and status are not part of the calculation.
* Note that node state, status and IP address are not part of the calculation.
*
* @return The hashcode of this instance based on the important fields
*/
@Override
public int hashCode()
{
return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(), ringEntry.address());
return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), ringEntry.rack(), ringEntry.port(), ringEntry.datacenter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ public void testEqualsAndHashcodeIgnoreNonCriticalFields()
assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
}

@Test
public void testEqualsAndHashcodeIgnoreIpAddress()
{
// The same logical instance can come back with a different IP address,
// e.g. when Kubernetes replaces a pod; it should compare equal
RingInstance instance1 = new RingInstance(mockRingEntryBuilder()
.address("127.0.0.1")
.build());
RingInstance instance2 = new RingInstance(mockRingEntryBuilder()
.address("127.0.0.2")
.build());
assertThat(instance1).isEqualTo(instance2);
assertThat(instance1.hashCode()).isEqualTo(instance2.hashCode());
}

@Test
public void testEqualsAndHashcodeConsidersClusterId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,44 @@ public static TokenRangeMapping<RingInstance> buildTokenRangeMappingWithFailures
new HashSet<>(instances));
}

/**
* Builds the same topology as {@link #buildTokenRangeMapping(int, ImmutableMap, int)}, but every instance
* has a different IP address, simulating nodes coming back with new IPs, e.g. pod replacement in Kubernetes
*/
public static TokenRangeMapping<RingInstance> buildTokenRangeMappingWithChangedIpAddresses(int initialToken,
ImmutableMap<String, Integer> rfByDC,
int instancesPerDC)
{
List<RingInstance> instances = getInstances(initialToken, rfByDC, instancesPerDC)
.stream()
.map(TokenRangeMappingUtils::withChangedIpAddress)
.collect(Collectors.toList());
ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
Multimap<RingInstance, Range<BigInteger>> tokenRanges = setupTokenRangeMap(Partitioner.Murmur3Partitioner, replicationFactor, instances);
return new TokenRangeMapping<>(Partitioner.Murmur3Partitioner,
tokenRanges,
new HashSet<>(instances));
}

private static RingInstance withChangedIpAddress(RingInstance instance)
{
RingEntry entry = instance.ringEntry();
RingEntry newEntry = new RingEntry.Builder()
.datacenter(entry.datacenter())
.port(entry.port())
.address(entry.address().replace("127.", "10."))
.status(entry.status())
.state(entry.state())
.token(entry.token())
.fqdn(entry.fqdn())
.rack(entry.rack())
.owns(entry.owns())
.load(entry.load())
.hostId(entry.hostId())
.build();
return new RingInstance(newEntry);
}

public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(int initialToken,
ImmutableMap<String, Integer> rfByDC,
int instancesPerDC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,31 @@ void testTopologyChanged()
assertThat(noChange.get()).isFalse();
}

// In environments where nodes are not bound to fixed IP addresses, a node that goes down can
// come back with a new IP while remaining the same logical instance — same hostname, tokens and
// data. This is routine in Kubernetes: a rescheduled pod keeps its identity but gets a new IP.
// The write is still correct, so the monitor must not report a topology change and cancel the
// job when instances differ only by IP address.
@Test
void testIpAddressChangeIsNotTopologyChange()
{
ClusterInfo mockClusterInfo = mock(ClusterInfo.class);
when(mockClusterInfo.getTokenRangeMapping(false))
.thenReturn(buildTopology(10))
.thenReturn(buildTopologyWithChangedIpAddresses(10)); // same instances, new IP addresses
AtomicBoolean noChange = new AtomicBoolean(true);
CassandraTopologyMonitor monitor = new CassandraTopologyMonitor(mockClusterInfo, event -> noChange.set(false));
monitor.checkTopologyOnDemand();
assertThat(noChange.get()).isTrue();
}

private TokenRangeMapping<RingInstance> buildTopology(int instancesCount)
{
return TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), instancesCount);
}

private TokenRangeMapping<RingInstance> buildTopologyWithChangedIpAddresses(int instancesCount)
{
return TokenRangeMappingUtils.buildTokenRangeMappingWithChangedIpAddresses(0, ImmutableMap.of("DC1", 3), instancesCount);
}
}
Loading