diff --git a/kafka/cluster.py b/kafka/cluster.py index ed07f54dd..a44dc076c 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -298,6 +298,35 @@ def leader_for_partition(self, partition): return None return self._partitions[partition.topic][partition.partition].leader_id + def is_replica_node(self, partition, node_id): + """Return MetadataResponseBroker for ``node_id`` only when it is + known AND still listed as a replica of ``partition`` (KIP-392). + + Used by the consumer's preferred-read-replica routing to avoid + sending fetches to a broker that has been demoted out of the + partition's replica set even though it still exists as a node. + + Arguments: + partition (TopicPartition): topic / partition to look up. + node_id (int): broker id to validate. + + Returns: + MetadataResponseBroker if the node exists in cluster metadata + and is currently listed as a replica of ``partition``; + otherwise None. + """ + broker = self.broker_metadata(node_id) + if broker is None: + return None + if partition.topic not in self._partitions: + return None + partition_data = self._partitions[partition.topic].get(partition.partition) + if partition_data is None: + return None + if node_id not in partition_data.replica_nodes: + return None + return broker + def leader_epoch_for_partition(self, partition): """Return leader_epoch for partition, or None if topic/partition is unknown.""" if partition.topic not in self._partitions: diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 947b0a7e0..da48cdf84 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1141,25 +1141,22 @@ def _fetchable_partitions(self): def _select_read_replica(self, tp): """Pick the node to fetch from for ``tp``: a cached preferred read - replica (KIP-392) when valid and known to the cluster, otherwise the - partition leader. An unknown / unreachable preferred replica is - cleared so the next fetch goes to the leader.""" + replica (KIP-392) when valid and *still listed as a replica of + ``tp``*, otherwise the partition leader. A preferred replica that + has been demoted out of the partition's replica set (or fell out + of cluster metadata entirely) is cleared so the next fetch goes + to the leader. + """ preferred = self._subscriptions.assignment[tp].preferred_read_replica() if preferred is None: - leader = self._manager.cluster.leader_for_partition(tp) - log.debug("Selecting leader %s as read replica for partition %s", - leader, tp) - return leader - # If the preferred node fell out of cluster metadata, fall back to leader. - if self._manager.cluster.broker_metadata(preferred) is None: + return self._manager.cluster.leader_for_partition(tp) + if not self._manager.cluster.is_replica_node(tp, preferred): self._subscriptions.assignment[tp].clear_preferred_read_replica() leader = self._manager.cluster.leader_for_partition(tp) - log.debug("Preferred read replica %s for partition %s no longer in" - " cluster metadata; falling back to leader %s", + log.debug("Preferred read replica %s for partition %s no longer" + " online or no longer a replica; falling back to leader %s", preferred, tp, leader) return leader - log.debug("Selecting preferred read replica %s for partition %s", - preferred, tp) return preferred def _create_fetch_requests(self): diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index 3e6ad9e8a..66b1f6e0c 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -1523,22 +1523,23 @@ def test_select_read_replica_uses_cache_when_valid( 3, time.monotonic() + 60) mocker.patch.object( fetcher._manager.cluster, 'leader_for_partition', return_value=7) - # broker_metadata returns something truthy so we don't fall back. + # is_replica_node returns truthy -> preferred replica is still valid. mocker.patch.object( - fetcher._manager.cluster, 'broker_metadata', return_value=MagicMock()) + fetcher._manager.cluster, 'is_replica_node', return_value=MagicMock()) assert fetcher._select_read_replica(tp) == 3 def test_select_read_replica_falls_back_when_replica_unknown( self, fetcher, topic, mocker): - """Cached preferred replica not in cluster metadata -> fall back to - leader AND clear the cache so we re-learn next time.""" + """Cached preferred replica no longer in the partition's replica + set (per ``is_replica_node``) -> fall back to leader AND clear + the cache so we re-learn next time.""" tp = TopicPartition(topic, 0) fetcher._subscriptions.assignment[tp].update_preferred_read_replica( 3, time.monotonic() + 60) mocker.patch.object( fetcher._manager.cluster, 'leader_for_partition', return_value=7) mocker.patch.object( - fetcher._manager.cluster, 'broker_metadata', return_value=None) + fetcher._manager.cluster, 'is_replica_node', return_value=None) assert fetcher._select_read_replica(tp) == 7 assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None @@ -1684,3 +1685,55 @@ def test_reset_clears_preferred_replica(self, topic): state.update_preferred_read_replica(3, time.monotonic() + 60) state.reset(OffsetResetStrategy.LATEST) assert state.preferred_read_replica() is None + + def test_is_replica_node(self): + """``ClusterMetadata.is_replica_node`` returns None when the broker + exists but is no longer listed as a replica of the partition.""" + from kafka.cluster import ClusterMetadata + from kafka.protocol.metadata import MetadataResponse + + cluster = ClusterMetadata() + Broker = MetadataResponse.MetadataResponseBroker + Topic = MetadataResponse.MetadataResponseTopic + Partition = Topic.MetadataResponsePartition + v = 7 + cluster.update_metadata(MetadataResponse( + version=v, throttle_time_ms=0, + brokers=[Broker(node_id=1, host='h', port=9092, rack=None, version=v), + Broker(node_id=2, host='h', port=9092, rack=None, version=v), + Broker(node_id=3, host='h', port=9092, rack=None, version=v)], + cluster_id='c', controller_id=1, + topics=[Topic(version=v, error_code=0, name='t', is_internal=False, + partitions=[Partition( + version=v, error_code=0, partition_index=0, + leader_id=1, leader_epoch=0, + replica_nodes=[1, 2], isr_nodes=[1, 2], + offline_replicas=[])])])) + tp = TopicPartition('t', 0) + + # Node 1 is the leader and a replica -> online. + assert cluster.is_replica_node(tp, 1) is not None + # Node 2 is a replica -> online. + assert cluster.is_replica_node(tp, 2) is not None + # Node 3 is a known broker but NOT a replica of this partition. + assert cluster.is_replica_node(tp, 3) is None + # Node 99 doesn't exist. + assert cluster.is_replica_node(tp, 99) is None + # Unknown topic. + assert cluster.is_replica_node(TopicPartition('other', 0), 1) is None + + def test_select_read_replica_falls_back_when_node_demoted( + self, fetcher, topic, mocker): + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 3, time.monotonic() + 60) + mocker.patch.object( + fetcher._manager.cluster, 'leader_for_partition', return_value=7) + # broker_metadata still returns the node (it's a known broker)... + mocker.patch.object( + fetcher._manager.cluster, 'broker_metadata', return_value=MagicMock()) + # ...but is_replica_node sees that it's no longer a replica. + mocker.patch.object( + fetcher._manager.cluster, 'is_replica_node', return_value=None) + assert fetcher._select_read_replica(tp) == 7 + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None