diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index 21b7b366a..8b9aa762b 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -1,10 +1,30 @@ import abc +from enum import IntEnum from kafka.protocol.consumer.metadata import ( ConsumerProtocolSubscription, ConsumerProtocolAssignment, ) +class RebalanceProtocol(IntEnum): + """KIP-429: rebalance protocol mode for a partition assignor. + + EAGER - pre-KIP-429 behaviour: every member revokes its full + assignment before JoinGroup, then receives a fresh assignment in + SyncGroup. Simple but causes a "stop the world" pause on every + rebalance. + + COOPERATIVE - KIP-429 incremental rebalance: members keep their + existing assignment across JoinGroup; the leader's assignment + indicates the partitions that need to move; only revoked + partitions are released, and only newly-assigned partitions + invoke the listener. A second rebalance round picks up partitions + that were revoked in round 1. + """ + EAGER = 0 + COOPERATIVE = 1 + + class AbstractPartitionAssignor(metaclass=abc.ABCMeta): """ Abstract assignor implementation which does some common grunt work (in particular collecting @@ -16,6 +36,18 @@ def name(self): """.name should be a string identifying the assignor""" pass + def supported_protocols(self): + """Return the list of :class:`RebalanceProtocol` modes this + assignor supports, in order of preference. + + Default is ``[EAGER]`` - every legacy assignor (Range, + RoundRobin, the original Sticky from KIP-54) behaves this + way. Override in subclasses that participate in KIP-429 + incremental cooperative rebalancing (e.g. + ``CooperativeStickyAssignor``). + """ + return [RebalanceProtocol.EAGER] + @abc.abstractmethod def assign(self, cluster, members): """Perform group assignment given cluster metadata and member subscriptions diff --git a/kafka/coordinator/assignors/cooperative_sticky.py b/kafka/coordinator/assignors/cooperative_sticky.py new file mode 100644 index 000000000..3ef3ba74a --- /dev/null +++ b/kafka/coordinator/assignors/cooperative_sticky.py @@ -0,0 +1,167 @@ +"""KIP-429 cooperative sticky partition assignor. + +Wraps :class:`StickyPartitionAssignor` (KIP-54) with the two-phase +"incremental cooperative" rebalancing protocol: + + * Members keep their assignment across JoinGroup - no global revoke. + * The leader runs the sticky algorithm to compute the *ideal* final + assignment, then identifies any partition that is moving from one + owner to another and *removes it from the new owner's first-round + assignment*. The current owner sees its assignment shrink, revokes + the lost partition, and the broker is signaled (via + ``request_rejoin``) that another rebalance is needed. + * Round two: the freshly-revoked partition is owned by nobody; the + sticky algorithm now gives it to its intended new owner. + +This avoids the "stop the world" pause that EAGER mode imposes - each +member only pauses while it's processing the specific partitions +moving in or out of its own assignment. + +References: + * KIP-429: https://cwiki.apache.org/confluence/x/vAclBg + * Java: org.apache.kafka.clients.consumer.CooperativeStickyAssignor +""" + +from collections import defaultdict + +from kafka.coordinator.assignors.abstract import RebalanceProtocol +from kafka.coordinator.assignors.sticky.sticky_assignor import ( + StickyAssignmentExecutor, + StickyAssignorMemberMetadataV1, + StickyPartitionAssignor, +) +from kafka.protocol.consumer.metadata import ( + ConsumerProtocolAssignment, ConsumerProtocolSubscription, +) +from kafka.structs import TopicPartition + + +# Wire version 1 of ConsumerProtocolSubscription is what KIP-429 added. +# Members advertise their currently-owned partitions in the +# ``owned_partitions`` field so the leader can compute the diff. +_COOPERATIVE_SUBSCRIPTION_VERSION = 1 + + +class CooperativeStickyAssignor(StickyPartitionAssignor): + """KIP-429 cooperative variant of the sticky assignor. + + Behaviorally identical to :class:`StickyPartitionAssignor` for + final partition placement (it inherits the same algorithm) - the + only difference is that movements are staged across two rebalance + rounds so no member ever sees a partition assigned to it while + another member still owns it. + """ + + name = "cooperative-sticky" + # Bump the wire metadata to v1 so OwnedPartitions is encoded on + # JoinGroup. The leader reads .owned_partitions to compute the + # set of partitions that are moving. + version = _COOPERATIVE_SUBSCRIPTION_VERSION + + def supported_protocols(self): + # COOPERATIVE only - mixing this assignor with eager assignors + # in the same consumer is rejected at consumer init time + # (see KafkaConsumer.__init__ validation). + return [RebalanceProtocol.COOPERATIVE] + + def metadata(self, topics): + # Encode OwnedPartitions (v1+) so the leader can compute the + # cooperative diff. The base class uses StickyAssignorUserData + # for the same purpose in v0 - under cooperative we surface + # the owned set via the dedicated schema field instead. + SubTP = ConsumerProtocolSubscription.TopicPartition + owned_partitions = [] + if self.member_assignment is not None: + by_topic = defaultdict(list) + for tp in self.member_assignment: + by_topic[tp.topic].append(tp.partition) + owned_partitions = [ + SubTP(topic=t, partitions=sorted(parts)) + for t, parts in by_topic.items() + ] + return ConsumerProtocolSubscription( + version=self.version, + topics=sorted(topics), + user_data=b'', + owned_partitions=owned_partitions, + ) + + @classmethod + def parse_member_metadata(cls, metadata): + """Decode a member's ``ConsumerProtocolSubscription``. + + Cooperative members carry owned partitions in the + ``owned_partitions`` schema field (v1+) rather than the + ``user_data`` blob the legacy sticky assignor uses. Returns + the same ``StickyAssignorMemberMetadataV1`` shape so the + underlying sticky algorithm can consume it unchanged. + """ + member_partitions = [] + # owned_partitions is a list of TopicPartition data containers + # (v1+); on v0 metadata the field is absent - treat as empty. + for tp in getattr(metadata, 'owned_partitions', None) or (): + for partition in tp.partitions: + member_partitions.append(TopicPartition(tp.topic, partition)) + + generation = metadata.generation_id + return StickyAssignorMemberMetadataV1( + partitions=member_partitions, + generation=metadata.generation_id, # requires schema v2, defaults to -1 + subscription=list(metadata.topics), + ) + + def assign(self, cluster, members): + """Cooperative two-phase assignment. + + 1. Compute the ideal final sticky assignment. + 2. Build a map of currently-owned partitions across all + members from their ``OwnedPartitions``. + 3. For any partition whose final owner differs from its + current owner, remove it from the new owner's first-round + assignment. The current owner sees its assignment shrink, + revokes the partition, and re-joins; on round two the + partition is unowned and the algorithm assigns it. + """ + members_metadata = { + member.member_id: self.parse_member_metadata(member.metadata) + for member in members + } + executor = StickyAssignmentExecutor(cluster, members_metadata) + executor.perform_initial_assignment() + executor.balance() + # Expose for diagnostic tests (matches parent behaviour). + self._latest_partition_movements = executor.partition_movements + + # Map: partition -> current_owner_member_id (None if unowned). + currently_owned = {} + for member_id, parsed in members_metadata.items(): + for tp in parsed.partitions: + currently_owned[tp] = member_id + + # Build the round-1 assignment: drop any partition that's + # moving (final owner != current owner). The current owner + # will revoke it in _on_join_complete; the broker will see + # the consumer re-join and round 2 will land the partition + # on the intended new owner. + # + # ``executor.get_final_assignment`` returns the canonical wire + # shape: ``[(topic, [partition, ...]), ...]``. We rebuild the + # same shape after filtering so the encoder is happy. + cooperative = {} + for member in members: + member_id = member.member_id + kept_by_topic = defaultdict(list) + for topic, parts in executor.get_final_assignment(member_id): + for p in parts: + tp = TopicPartition(topic, p) + current_owner = currently_owned.get(tp) + if current_owner is None or current_owner == member_id: + # Either nobody owned it, or this member + # already owns it. Safe to assign now. + kept_by_topic[topic].append(p) + # else: partition is moving; defer to round 2. + assigned_partitions = sorted( + (t, sorted(ps)) for t, ps in kept_by_topic.items()) + cooperative[member_id] = ConsumerProtocolAssignment( + self.version, assigned_partitions, b'') + return cooperative diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 5d93cedd8..54633f723 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -6,6 +6,7 @@ import time from kafka.coordinator.base import BaseCoordinator, Generation +from kafka.coordinator.assignors.abstract import RebalanceProtocol from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor @@ -97,6 +98,7 @@ def __init__(self, client, subscription, **configs): self._subscription = subscription self._is_leader = False + self._rebalance_protocol = None self._joined_subscription = set() self._metadata_snapshot = self._build_metadata_snapshot(subscription, self._cluster) self._assignment_snapshot = None @@ -141,9 +143,37 @@ def __init__(self, client, subscription, **configs): for klass in self.config['assignors']: assignor = klass() self._assignors[assignor.name] = assignor + # KIP-429: all configured assignors must agree on a single + # RebalanceProtocol mode. Mixing EAGER and COOPERATIVE + # assignors in the same consumer is unsafe - at JoinGroup time + # the broker picks one assignor, and the consumer needs to + # know up front whether to do eager (full) or cooperative + # (incremental) revocation. + self._rebalance_protocol = self._validate_rebalance_protocol() self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) + def _validate_rebalance_protocol(self): + """Return the single :class:`RebalanceProtocol` mode that all + configured assignors support; raise + :class:`KafkaConfigurationError` if they don't agree. + """ + if not self._assignors: + return RebalanceProtocol.EAGER + common = None + for assignor in self._assignors.values(): + supported = set(assignor.supported_protocols()) + common = supported if common is None else common & supported + if not common: + names = [a.name for a in self._assignors.values()] + raise Errors.KafkaConfigurationError( + "Specified partition_assignment_strategy assignors %s do not" + " support a common RebalanceProtocol. Mixing EAGER and" + " COOPERATIVE assignors in a single consumer is not" + " supported." % (names,)) + # Pick the highest mode they all agree on (EAGER < COOPERATIVE). + return max(common) + @property def _use_offset_apis(self): return self.config['api_version'] >= (0, 8, 1) @@ -265,12 +295,77 @@ async def _on_join_complete_async(self, generation, member_id, protocol, assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) assignment = ConsumerProtocolAssignment.decode(member_assignment_bytes) + new_assigned = set(assignment.partitions()) + + # KIP-429: under COOPERATIVE, compute the diff between what we + # currently own and what the leader just assigned. Revoke the + # partitions we lost; only invoke on_partitions_assigned for + # the newly-added ones. If we lost any partitions, request a + # follow-up rebalance so the revoked partitions can land on + # their intended new owner. + if self._rebalance_protocol == RebalanceProtocol.COOPERATIVE: + currently_owned = set(self._subscription.assigned_partitions()) + revoked = currently_owned - new_assigned + added = new_assigned - currently_owned + + try: + self._subscription.assign_from_subscribed(sorted(new_assigned)) + except ValueError as e: + log.warning("Cooperative assignment rejected: %s." + " Probably due to a deleted topic." + " Requesting re-join.", e) + self.request_rejoin() + return + + assignor.on_assignment(assignment, generation) + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval + + log.info("Cooperative rebalance complete for group %s:" + " owned=%s, assigned=%s, revoked=%s, added=%s", + self.group_id, currently_owned, new_assigned, revoked, added) + + if self._subscription.rebalance_listener: + if revoked: + try: + await self._invoke_rebalance_listener_async( + 'on_partitions_revoked', revoked) + except Exception: + log.exception( + "User provided rebalance listener %s for group %s" + " failed on_partitions_revoked: %s", + self._subscription.rebalance_listener, + self.group_id, revoked) + if added: + try: + await self._invoke_rebalance_listener_async( + 'on_partitions_assigned', added) + except Exception: + log.exception( + "User provided rebalance listener %s for group %s" + " failed on_partitions_assigned: %s", + self._subscription.rebalance_listener, + self.group_id, added) + + if revoked: + # Round 2: the partitions we just revoked should now + # be unowned cluster-wide and can be assigned to + # their intended new owners. Trigger a follow-up + # rebalance to surface those assignments. + log.info("Triggering follow-up rebalance for group %s to" + " complete cooperative move of %d partition(s)", + self.group_id, len(revoked)) + self.request_rejoin() + return + # EAGER mode (legacy): replace the full assignment and invoke + # on_partitions_assigned with the entire new set. try: self._subscription.assign_from_subscribed(assignment.partitions()) except ValueError as e: - log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e) + log.warning("Assignment rejected: %s. Probably due to a" + " deleted topic. Requesting re-join.", e) self.request_rejoin() + return # give the assignor a chance to update internal state # based on the received assignment @@ -409,18 +504,30 @@ async def _on_join_prepare_async(self, generation, member_id, timeout_ms=None): log.exception("Pre-rebalance offset commit failed: This is likely" " to cause duplicate message delivery") - # execute the user's callback before rebalance - log.info("Revoking previously assigned partitions %s for group %s", - self._subscription.assigned_partitions(), self.group_id) - if self._subscription.rebalance_listener: - try: - revoked = set(self._subscription.assigned_partitions()) - await self._invoke_rebalance_listener_async( - 'on_partitions_revoked', revoked) - except Exception: - log.exception("User provided subscription rebalance listener %s" - " for group %s failed on_partitions_revoked", - self._subscription.rebalance_listener, self.group_id) + # Under EAGER, notify the user that the full current + # assignment is about to be revoked so they can flush state / + # commit offsets before the rebalance. The partitions remain + # in self._subscription.assignment until _on_join_complete + # replaces it via assign_from_subscribed - this listener call + # is a *notification*, not the actual state mutation. + # + # Under COOPERATIVE we skip the notification entirely: members + # keep their assignment across JoinGroup and only the + # individual partitions that actually moved are revoked in + # _on_join_complete_async (computed from the owned-vs-assigned + # diff). + if self._rebalance_protocol == RebalanceProtocol.EAGER: + log.info("Revoking previously assigned partitions %s for group %s", + self._subscription.assigned_partitions(), self.group_id) + if self._subscription.rebalance_listener: + try: + revoked = set(self._subscription.assigned_partitions()) + await self._invoke_rebalance_listener_async( + 'on_partitions_revoked', revoked) + except Exception: + log.exception("User provided subscription rebalance listener %s" + " for group %s failed on_partitions_revoked", + self._subscription.rebalance_listener, self.group_id) self._is_leader = False self._subscription.reset_group_subscription() diff --git a/test/consumer/test_assignors.py b/test/consumer/test_assignors.py index 427fbecb6..04a07c928 100644 --- a/test/consumer/test_assignors.py +++ b/test/consumer/test_assignors.py @@ -879,3 +879,130 @@ def group_partitions_by_topic(partitions): for p in partitions: result[p.topic].add(p.partition) return result + + +# --------------------------------------------------------------------------- +# KIP-429: CooperativeStickyAssignor +# --------------------------------------------------------------------------- + + +class TestCooperativeStickyAssignor: + """Two-phase incremental cooperative rebalance (KIP-429). + + The leader-side semantic is: compute the final sticky assignment, + then for any partition that's moving from owner A to owner B, + drop it from B's round-1 assignment. A revokes it (sees its + assignment shrink), re-joins, and round 2 lands the partition + on B. + """ + + def _assignor(self): + from kafka.coordinator.assignors.cooperative_sticky import ( + CooperativeStickyAssignor, + ) + return CooperativeStickyAssignor() + + def test_supported_protocols_is_cooperative_only(self): + from kafka.coordinator.assignors.abstract import RebalanceProtocol + a = self._assignor() + assert a.supported_protocols() == [RebalanceProtocol.COOPERATIVE] + + def test_metadata_encodes_owned_partitions(self): + """The wire metadata must carry OwnedPartitions (v1+) so the + leader can compute the cooperative diff.""" + a = self._assignor() + a.member_assignment = [TopicPartition('t', 0), TopicPartition('t', 2)] + meta = a.metadata({'t'}) + assert meta.version == 1 + assert len(meta.owned_partitions) == 1 + assert meta.owned_partitions[0].topic == 't' + assert meta.owned_partitions[0].partitions == [0, 2] + + def test_metadata_no_prior_assignment_empty_owned(self): + a = self._assignor() + meta = a.metadata({'t'}) + assert meta.owned_partitions == [] + + def test_parse_member_metadata_reads_owned_partitions(self): + a = self._assignor() + # Round-trip a v1 subscription with OwnedPartitions through encode/decode. + from kafka.protocol.consumer.metadata import ConsumerProtocolSubscription + SubTP = ConsumerProtocolSubscription.TopicPartition + sub = ConsumerProtocolSubscription( + version=1, topics=['t'], user_data=b'', + owned_partitions=[SubTP(topic='t', partitions=[3, 7])]) + decoded = ConsumerProtocolSubscription.decode(sub.encode(), version=1) + parsed = a.parse_member_metadata(decoded) + assert parsed.partitions == [TopicPartition('t', 3), TopicPartition('t', 7)] + assert parsed.subscription == ['t'] + + def test_assign_fresh_cluster_no_movements(self, mocker): + """First-ever assignment (no prior ownership): the cooperative + path is identical to the eager sticky path - every partition + is unowned, so nothing is "moving".""" + assignor = self._assignor() + members = make_join_group_response_members({ + 'C0': assignor.metadata({'t'}), + 'C1': assignor.metadata({'t'}), + }) + cluster = create_cluster(mocker, {'t'}, topics_partitions={0, 1, 2, 3}) + ret = assignor.assign(cluster, members) + # All 4 partitions get assigned in round 1; nothing deferred. + total = sum(len(a.partitions()) for a in ret.values()) + assert total == 4 + + def test_assign_defers_moved_partitions(self, mocker): + """Round 1: C0 owns t/0 and t/1. C1 owns t/2 and t/3. If the + sticky algorithm decides to move t/1 from C0 to C1, then in + the cooperative round-1 output t/1 must NOT appear in C1's + assignment (must wait until C0 revokes it).""" + assignor = self._assignor() + + # Build subscriptions with explicit OwnedPartitions. + from kafka.protocol.consumer.metadata import ConsumerProtocolSubscription + SubTP = ConsumerProtocolSubscription.TopicPartition + + def _sub(owned): + return ConsumerProtocolSubscription( + version=1, topics=['t'], user_data=b'', + owned_partitions=[ + SubTP(topic=t, partitions=sorted(parts)) + for t, parts in owned.items()]) + + # C0 currently owns the first 3; C1 owns the last. The ideal + # balanced assignment is 2/2, so one of C0's partitions must + # move to C1. + members = make_join_group_response_members({ + 'C0': _sub({'t': [0, 1, 2]}), + 'C1': _sub({'t': [3]}), + }) + cluster = create_cluster(mocker, {'t'}, topics_partitions={0, 1, 2, 3}) + ret = assignor.assign(cluster, members) + + c0_owned_round1 = set(ret['C0'].partitions()) + c1_owned_round1 = set(ret['C1'].partitions()) + + # No partition should appear in both members' round-1 output. + assert not (c0_owned_round1 & c1_owned_round1) + # The total number of partitions assigned in round 1 must be + # strictly less than the cluster total - at least one + # partition is deferred because it's moving. + cluster_total = {TopicPartition('t', p) for p in range(4)} + round1_total = c0_owned_round1 | c1_owned_round1 + assert round1_total < cluster_total + # The deferred partition is one that was owned by C0 but is + # being moved to C1 - round 1 dropped it from C1. + deferred = cluster_total - round1_total + assert len(deferred) == 1 + + def test_assign_returns_assignments_for_all_members(self, mocker): + """Even when partitions are deferred, every member must + receive an assignment entry (possibly with shrunken set).""" + assignor = self._assignor() + members = make_join_group_response_members({ + 'C0': assignor.metadata({'t'}), + 'C1': assignor.metadata({'t'}), + }) + cluster = create_cluster(mocker, {'t'}, topics_partitions={0, 1}) + ret = assignor.assign(cluster, members) + assert set(ret) == {'C0', 'C1'} diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 7a1e3147a..4cd639739 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -1373,3 +1373,261 @@ def test_do_join_and_sync_async_sync_protocol_name_mismatch(request, broker, see with pytest.raises(Errors.InconsistentGroupProtocolError): seeded_coord._manager.run(seeded_coord._do_join_and_sync_async) + + +# --------------------------------------------------------------------------- +# KIP-429: Incremental Cooperative Rebalancing +# --------------------------------------------------------------------------- + + +def _cooperative_coordinator(client, metrics): + """Build a ConsumerCoordinator configured for cooperative rebalance.""" + from kafka.coordinator.assignors.cooperative_sticky import ( + CooperativeStickyAssignor, + ) + return ConsumerCoordinator( + client, SubscriptionState(), + metrics=metrics, + api_version=(2, 4), + max_poll_interval_ms=300000, + session_timeout_ms=10000, + assignors=(CooperativeStickyAssignor,)) + + +class TestKip429RebalanceProtocolValidation: + """All configured assignors must agree on a single RebalanceProtocol.""" + + def test_default_is_eager(self, coordinator): + """Range / RoundRobin / KIP-54 Sticky all support EAGER only.""" + from kafka.coordinator.assignors.abstract import RebalanceProtocol + assert coordinator._rebalance_protocol == RebalanceProtocol.EAGER + + def test_cooperative_when_only_cooperative_sticky(self, client, metrics): + from kafka.coordinator.assignors.abstract import RebalanceProtocol + coord = _cooperative_coordinator(client, metrics) + try: + assert coord._rebalance_protocol == RebalanceProtocol.COOPERATIVE + finally: + coord.close(timeout_ms=0) + + def test_rejects_mixed_protocols(self, client, metrics): + """A consumer configured with both EAGER and COOPERATIVE + assignors must reject the configuration at init - at JoinGroup + the broker picks one assignor and the consumer has no way to + know which protocol mode to use until that decision happens.""" + from kafka.coordinator.assignors.cooperative_sticky import ( + CooperativeStickyAssignor, + ) + with pytest.raises(Errors.KafkaConfigurationError, match='RebalanceProtocol'): + ConsumerCoordinator( + client, SubscriptionState(), + metrics=metrics, + api_version=(2, 4), + max_poll_interval_ms=300000, + session_timeout_ms=10000, + assignors=(RangePartitionAssignor, CooperativeStickyAssignor)) + + +class TestKip429OnJoinPrepare: + """Under COOPERATIVE, _on_join_prepare must NOT globally revoke + the assignment - that's the whole point of incremental rebalance.""" + + def test_eager_revokes_everything(self, mocker, coordinator): + coordinator.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coordinator._subscription.subscribe(topics=['foobar'], listener=listener) + coordinator._subscription.assign_from_subscribed( + [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]) + coordinator._manager.run(coordinator._on_join_prepare_async, 0, 'member-foo') + listener.on_partitions_revoked.assert_called_once_with( + {TopicPartition('foobar', 0), TopicPartition('foobar', 1)}) + + def test_cooperative_skips_global_revoke(self, mocker, client, metrics): + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coord._subscription.subscribe(topics=['foobar'], listener=listener) + coord._subscription.assign_from_subscribed( + [TopicPartition('foobar', 0), TopicPartition('foobar', 1)]) + coord._manager.run(coord._on_join_prepare_async, 0, 'member-foo') + # KIP-429: no global on_partitions_revoked in the prepare + # phase - individual partitions are revoked later in + # _on_join_complete based on the diff. + listener.on_partitions_revoked.assert_not_called() + finally: + coord.close(timeout_ms=0) + + +class TestKip429OnJoinComplete: + """Under COOPERATIVE, _on_join_complete computes the owned-vs- + assigned diff: revoke removed, add new, no churn on stable.""" + + def _make_assignment_bytes(self, topic, partitions): + from kafka.protocol.consumer.metadata import ConsumerProtocolAssignment + a = ConsumerProtocolAssignment( + version=1, + assigned_partitions=[(topic, sorted(partitions))], + user_data=b'') + return a.encode() + + def test_cooperative_revoke_then_assign_diff(self, mocker, client, metrics): + """Member currently owns [0, 1, 2]; new assignment is [1, 2, 3]. + Listener must see revoked=[0] and assigned=[3], not the full sets.""" + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coord._subscription.subscribe(topics=['t'], listener=listener) + coord._subscription.assign_from_subscribed([ + TopicPartition('t', 0), TopicPartition('t', 1), + TopicPartition('t', 2)]) + + assignment_bytes = self._make_assignment_bytes('t', [1, 2, 3]) + coord._manager.run( + coord._on_join_complete_async, + 42, 'member-1', 'cooperative-sticky', assignment_bytes) + + listener.on_partitions_revoked.assert_called_once_with({TopicPartition('t', 0)}) + listener.on_partitions_assigned.assert_called_once_with({TopicPartition('t', 3)}) + finally: + coord.close(timeout_ms=0) + + def test_cooperative_stable_assignment_no_listener_calls( + self, mocker, client, metrics): + """Owned == assigned -> neither listener method fires.""" + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coord._subscription.subscribe(topics=['t'], listener=listener) + coord._subscription.assign_from_subscribed([ + TopicPartition('t', 0), TopicPartition('t', 1)]) + + assignment_bytes = self._make_assignment_bytes('t', [0, 1]) + coord._manager.run( + coord._on_join_complete_async, + 42, 'member-1', 'cooperative-sticky', assignment_bytes) + + listener.on_partitions_revoked.assert_not_called() + listener.on_partitions_assigned.assert_not_called() + finally: + coord.close(timeout_ms=0) + + def test_cooperative_revoke_triggers_request_rejoin( + self, mocker, client, metrics): + """When any partitions are revoked, request a follow-up + rebalance so the partition lands on its new owner in round 2.""" + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + coord._subscription.subscribe(topics=['t']) + coord._subscription.assign_from_subscribed([ + TopicPartition('t', 0), TopicPartition('t', 1)]) + spy = mocker.spy(coord, 'request_rejoin') + + # New assignment drops partition 1. + assignment_bytes = self._make_assignment_bytes('t', [0]) + coord._manager.run( + coord._on_join_complete_async, + 42, 'member-1', 'cooperative-sticky', assignment_bytes) + + spy.assert_called() + finally: + coord.close(timeout_ms=0) + + def test_cooperative_no_revoke_no_extra_rejoin( + self, mocker, client, metrics): + """If nothing is revoked (assignment only grew or stayed same), + no follow-up rebalance is needed.""" + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + coord._subscription.subscribe(topics=['t']) + coord._subscription.assign_from_subscribed([TopicPartition('t', 0)]) + spy = mocker.spy(coord, 'request_rejoin') + + assignment_bytes = self._make_assignment_bytes('t', [0, 1]) + coord._manager.run( + coord._on_join_complete_async, + 42, 'member-1', 'cooperative-sticky', assignment_bytes) + + spy.assert_not_called() + finally: + coord.close(timeout_ms=0) + + def test_cooperative_assignment_for_unsubscribed_topic_bails( + self, mocker, client, metrics): + """If the leader hands us a partition for a topic we're not + subscribed to (e.g. the topic was deleted under us), + ``assign_from_subscribed`` raises ValueError. We must bail + cleanly: request re-join and skip the listener invocation, + the assignor state update, and the auto-commit deadline reset + - otherwise the user's listener acts on partitions that aren't + actually in our SubscriptionState.""" + coord = _cooperative_coordinator(client, metrics) + try: + coord.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coord._subscription.subscribe(topics=['t'], listener=listener) + coord._subscription.assign_from_subscribed([TopicPartition('t', 0)]) + assignor_spy = mocker.spy(coord._assignors['cooperative-sticky'], 'on_assignment') + rejoin_spy = mocker.spy(coord, 'request_rejoin') + old_deadline = coord.next_auto_commit_deadline + + # Assignment names a topic the consumer isn't subscribed to. + bad_assignment = self._make_assignment_bytes('other-topic', [0]) + coord._manager.run( + coord._on_join_complete_async, + 42, 'member-1', 'cooperative-sticky', bad_assignment) + + # Re-join requested, but no side effects from the rest of + # the cooperative happy path. + rejoin_spy.assert_called_once() + listener.on_partitions_revoked.assert_not_called() + listener.on_partitions_assigned.assert_not_called() + assignor_spy.assert_not_called() + assert coord.next_auto_commit_deadline == old_deadline + # Original assignment is preserved. + assert set(coord._subscription.assigned_partitions()) == {TopicPartition('t', 0)} + finally: + coord.close(timeout_ms=0) + + +class TestOnJoinCompleteBailOnInvalidAssignment: + """The EAGER branch has the same bail-on-ValueError behaviour as + COOPERATIVE - if the leader hands us a topic we don't subscribe + to, we must request re-join without firing the listener or + updating assignor state.""" + + def _make_assignment_bytes(self, topic, partitions): + from kafka.protocol.consumer.metadata import ConsumerProtocolAssignment + a = ConsumerProtocolAssignment( + version=0, + assigned_partitions=[(topic, sorted(partitions))], + user_data=b'') + return a.encode() + + def test_eager_assignment_for_unsubscribed_topic_bails( + self, mocker, coordinator): + coordinator.config['enable_auto_commit'] = False + listener = mocker.MagicMock(spec=ConsumerRebalanceListener) + coordinator._subscription.subscribe(topics=['t'], listener=listener) + coordinator._subscription.assign_from_subscribed([TopicPartition('t', 0)]) + # Pick whichever assignor name is in the default fixture + # (RangePartitionAssignor). + assignor_name = next(iter(coordinator._assignors)) + assignor_spy = mocker.spy( + coordinator._assignors[assignor_name], 'on_assignment') + rejoin_spy = mocker.spy(coordinator, 'request_rejoin') + old_deadline = coordinator.next_auto_commit_deadline + + bad_assignment = self._make_assignment_bytes('other-topic', [0]) + coordinator._manager.run( + coordinator._on_join_complete_async, + 42, 'member-1', assignor_name, bad_assignment) + + rejoin_spy.assert_called_once() + listener.on_partitions_assigned.assert_not_called() + assignor_spy.assert_not_called() + assert coordinator.next_auto_commit_deadline == old_deadline