Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,35 @@ def leader_for_partition(self, partition):
return None
return self._partitions[partition.topic][partition.partition].leader_id

def is_replica_node(self, partition, node_id):
"""Return MetadataResponseBroker for ``node_id`` only when it is
known AND still listed as a replica of ``partition`` (KIP-392).

Used by the consumer's preferred-read-replica routing to avoid
sending fetches to a broker that has been demoted out of the
partition's replica set even though it still exists as a node.

Arguments:
partition (TopicPartition): topic / partition to look up.
node_id (int): broker id to validate.

Returns:
MetadataResponseBroker if the node exists in cluster metadata
and is currently listed as a replica of ``partition``;
otherwise None.
"""
broker = self.broker_metadata(node_id)
if broker is None:
return None
if partition.topic not in self._partitions:
return None
partition_data = self._partitions[partition.topic].get(partition.partition)
if partition_data is None:
return None
if node_id not in partition_data.replica_nodes:
return None
return broker

def leader_epoch_for_partition(self, partition):
"""Return leader_epoch for partition, or None if topic/partition is unknown."""
if partition.topic not in self._partitions:
Expand Down
23 changes: 10 additions & 13 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1141,25 +1141,22 @@ def _fetchable_partitions(self):

def _select_read_replica(self, tp):
"""Pick the node to fetch from for ``tp``: a cached preferred read
replica (KIP-392) when valid and known to the cluster, otherwise the
partition leader. An unknown / unreachable preferred replica is
cleared so the next fetch goes to the leader."""
replica (KIP-392) when valid and *still listed as a replica of
``tp``*, otherwise the partition leader. A preferred replica that
has been demoted out of the partition's replica set (or fell out
of cluster metadata entirely) is cleared so the next fetch goes
to the leader.
"""
preferred = self._subscriptions.assignment[tp].preferred_read_replica()
if preferred is None:
leader = self._manager.cluster.leader_for_partition(tp)
log.debug("Selecting leader %s as read replica for partition %s",
leader, tp)
return leader
# If the preferred node fell out of cluster metadata, fall back to leader.
if self._manager.cluster.broker_metadata(preferred) is None:
return self._manager.cluster.leader_for_partition(tp)
if not self._manager.cluster.is_replica_node(tp, preferred):
self._subscriptions.assignment[tp].clear_preferred_read_replica()
leader = self._manager.cluster.leader_for_partition(tp)
log.debug("Preferred read replica %s for partition %s no longer in"
" cluster metadata; falling back to leader %s",
log.debug("Preferred read replica %s for partition %s no longer"
" online or no longer a replica; falling back to leader %s",
preferred, tp, leader)
return leader
log.debug("Selecting preferred read replica %s for partition %s",
preferred, tp)
return preferred

def _create_fetch_requests(self):
Expand Down
63 changes: 58 additions & 5 deletions test/consumer/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1523,22 +1523,23 @@ def test_select_read_replica_uses_cache_when_valid(
3, time.monotonic() + 60)
mocker.patch.object(
fetcher._manager.cluster, 'leader_for_partition', return_value=7)
# broker_metadata returns something truthy so we don't fall back.
# is_replica_node returns truthy -> preferred replica is still valid.
mocker.patch.object(
fetcher._manager.cluster, 'broker_metadata', return_value=MagicMock())
fetcher._manager.cluster, 'is_replica_node', return_value=MagicMock())
assert fetcher._select_read_replica(tp) == 3

def test_select_read_replica_falls_back_when_replica_unknown(
self, fetcher, topic, mocker):
"""Cached preferred replica not in cluster metadata -> fall back to
leader AND clear the cache so we re-learn next time."""
"""Cached preferred replica no longer in the partition's replica
set (per ``is_replica_node``) -> fall back to leader AND clear
the cache so we re-learn next time."""
tp = TopicPartition(topic, 0)
fetcher._subscriptions.assignment[tp].update_preferred_read_replica(
3, time.monotonic() + 60)
mocker.patch.object(
fetcher._manager.cluster, 'leader_for_partition', return_value=7)
mocker.patch.object(
fetcher._manager.cluster, 'broker_metadata', return_value=None)
fetcher._manager.cluster, 'is_replica_node', return_value=None)
assert fetcher._select_read_replica(tp) == 7
assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None

Expand Down Expand Up @@ -1684,3 +1685,55 @@ def test_reset_clears_preferred_replica(self, topic):
state.update_preferred_read_replica(3, time.monotonic() + 60)
state.reset(OffsetResetStrategy.LATEST)
assert state.preferred_read_replica() is None

def test_is_replica_node(self):
"""``ClusterMetadata.is_replica_node`` returns None when the broker
exists but is no longer listed as a replica of the partition."""
from kafka.cluster import ClusterMetadata
from kafka.protocol.metadata import MetadataResponse

cluster = ClusterMetadata()
Broker = MetadataResponse.MetadataResponseBroker
Topic = MetadataResponse.MetadataResponseTopic
Partition = Topic.MetadataResponsePartition
v = 7
cluster.update_metadata(MetadataResponse(
version=v, throttle_time_ms=0,
brokers=[Broker(node_id=1, host='h', port=9092, rack=None, version=v),
Broker(node_id=2, host='h', port=9092, rack=None, version=v),
Broker(node_id=3, host='h', port=9092, rack=None, version=v)],
cluster_id='c', controller_id=1,
topics=[Topic(version=v, error_code=0, name='t', is_internal=False,
partitions=[Partition(
version=v, error_code=0, partition_index=0,
leader_id=1, leader_epoch=0,
replica_nodes=[1, 2], isr_nodes=[1, 2],
offline_replicas=[])])]))
tp = TopicPartition('t', 0)

# Node 1 is the leader and a replica -> online.
assert cluster.is_replica_node(tp, 1) is not None
# Node 2 is a replica -> online.
assert cluster.is_replica_node(tp, 2) is not None
# Node 3 is a known broker but NOT a replica of this partition.
assert cluster.is_replica_node(tp, 3) is None
# Node 99 doesn't exist.
assert cluster.is_replica_node(tp, 99) is None
# Unknown topic.
assert cluster.is_replica_node(TopicPartition('other', 0), 1) is None

def test_select_read_replica_falls_back_when_node_demoted(
self, fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
fetcher._subscriptions.assignment[tp].update_preferred_read_replica(
3, time.monotonic() + 60)
mocker.patch.object(
fetcher._manager.cluster, 'leader_for_partition', return_value=7)
# broker_metadata still returns the node (it's a known broker)...
mocker.patch.object(
fetcher._manager.cluster, 'broker_metadata', return_value=MagicMock())
# ...but is_replica_node sees that it's no longer a replica.
mocker.patch.object(
fetcher._manager.cluster, 'is_replica_node', return_value=None)
assert fetcher._select_read_replica(tp) == 7
assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None
Loading