Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
import abc
from enum import IntEnum

from kafka.protocol.consumer.metadata import (
ConsumerProtocolSubscription, ConsumerProtocolAssignment,
)


class RebalanceProtocol(IntEnum):
"""KIP-429: rebalance protocol mode for a partition assignor.

EAGER - pre-KIP-429 behaviour: every member revokes its full
assignment before JoinGroup, then receives a fresh assignment in
SyncGroup. Simple but causes a "stop the world" pause on every
rebalance.

COOPERATIVE - KIP-429 incremental rebalance: members keep their
existing assignment across JoinGroup; the leader's assignment
indicates the partitions that need to move; only revoked
partitions are released, and only newly-assigned partitions
invoke the listener. A second rebalance round picks up partitions
that were revoked in round 1.
"""
EAGER = 0
COOPERATIVE = 1


class AbstractPartitionAssignor(metaclass=abc.ABCMeta):
"""
Abstract assignor implementation which does some common grunt work (in particular collecting
Expand All @@ -16,6 +36,18 @@ def name(self):
""".name should be a string identifying the assignor"""
pass

def supported_protocols(self):
"""Return the list of :class:`RebalanceProtocol` modes this
assignor supports, in order of preference.

Default is ``[EAGER]`` - every legacy assignor (Range,
RoundRobin, the original Sticky from KIP-54) behaves this
way. Override in subclasses that participate in KIP-429
incremental cooperative rebalancing (e.g.
``CooperativeStickyAssignor``).
"""
return [RebalanceProtocol.EAGER]

@abc.abstractmethod
def assign(self, cluster, members):
"""Perform group assignment given cluster metadata and member subscriptions
Expand Down
167 changes: 167 additions & 0 deletions kafka/coordinator/assignors/cooperative_sticky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""KIP-429 cooperative sticky partition assignor.

Wraps :class:`StickyPartitionAssignor` (KIP-54) with the two-phase
"incremental cooperative" rebalancing protocol:

* Members keep their assignment across JoinGroup - no global revoke.
* The leader runs the sticky algorithm to compute the *ideal* final
assignment, then identifies any partition that is moving from one
owner to another and *removes it from the new owner's first-round
assignment*. The current owner sees its assignment shrink, revokes
the lost partition, and the broker is signaled (via
``request_rejoin``) that another rebalance is needed.
* Round two: the freshly-revoked partition is owned by nobody; the
sticky algorithm now gives it to its intended new owner.

This avoids the "stop the world" pause that EAGER mode imposes - each
member only pauses while it's processing the specific partitions
moving in or out of its own assignment.

References:
* KIP-429: https://cwiki.apache.org/confluence/x/vAclBg
* Java: org.apache.kafka.clients.consumer.CooperativeStickyAssignor
"""

from collections import defaultdict

from kafka.coordinator.assignors.abstract import RebalanceProtocol
from kafka.coordinator.assignors.sticky.sticky_assignor import (
StickyAssignmentExecutor,
StickyAssignorMemberMetadataV1,
StickyPartitionAssignor,
)
from kafka.protocol.consumer.metadata import (
ConsumerProtocolAssignment, ConsumerProtocolSubscription,
)
from kafka.structs import TopicPartition


# Wire version 1 of ConsumerProtocolSubscription is what KIP-429 added.
# Members advertise their currently-owned partitions in the
# ``owned_partitions`` field so the leader can compute the diff.
_COOPERATIVE_SUBSCRIPTION_VERSION = 1


class CooperativeStickyAssignor(StickyPartitionAssignor):
"""KIP-429 cooperative variant of the sticky assignor.

Behaviorally identical to :class:`StickyPartitionAssignor` for
final partition placement (it inherits the same algorithm) - the
only difference is that movements are staged across two rebalance
rounds so no member ever sees a partition assigned to it while
another member still owns it.
"""

name = "cooperative-sticky"
# Bump the wire metadata to v1 so OwnedPartitions is encoded on
# JoinGroup. The leader reads .owned_partitions to compute the
# set of partitions that are moving.
version = _COOPERATIVE_SUBSCRIPTION_VERSION

def supported_protocols(self):
# COOPERATIVE only - mixing this assignor with eager assignors
# in the same consumer is rejected at consumer init time
# (see KafkaConsumer.__init__ validation).
return [RebalanceProtocol.COOPERATIVE]

def metadata(self, topics):
# Encode OwnedPartitions (v1+) so the leader can compute the
# cooperative diff. The base class uses StickyAssignorUserData
# for the same purpose in v0 - under cooperative we surface
# the owned set via the dedicated schema field instead.
SubTP = ConsumerProtocolSubscription.TopicPartition
owned_partitions = []
if self.member_assignment is not None:
by_topic = defaultdict(list)
for tp in self.member_assignment:
by_topic[tp.topic].append(tp.partition)
owned_partitions = [
SubTP(topic=t, partitions=sorted(parts))
for t, parts in by_topic.items()
]
return ConsumerProtocolSubscription(
version=self.version,
topics=sorted(topics),
user_data=b'',
owned_partitions=owned_partitions,
)

@classmethod
def parse_member_metadata(cls, metadata):
"""Decode a member's ``ConsumerProtocolSubscription``.

Cooperative members carry owned partitions in the
``owned_partitions`` schema field (v1+) rather than the
``user_data`` blob the legacy sticky assignor uses. Returns
the same ``StickyAssignorMemberMetadataV1`` shape so the
underlying sticky algorithm can consume it unchanged.
"""
member_partitions = []
# owned_partitions is a list of TopicPartition data containers
# (v1+); on v0 metadata the field is absent - treat as empty.
for tp in getattr(metadata, 'owned_partitions', None) or ():
for partition in tp.partitions:
member_partitions.append(TopicPartition(tp.topic, partition))

generation = metadata.generation_id
return StickyAssignorMemberMetadataV1(
partitions=member_partitions,
generation=metadata.generation_id, # requires schema v2, defaults to -1
subscription=list(metadata.topics),
)

def assign(self, cluster, members):
"""Cooperative two-phase assignment.

1. Compute the ideal final sticky assignment.
2. Build a map of currently-owned partitions across all
members from their ``OwnedPartitions``.
3. For any partition whose final owner differs from its
current owner, remove it from the new owner's first-round
assignment. The current owner sees its assignment shrink,
revokes the partition, and re-joins; on round two the
partition is unowned and the algorithm assigns it.
"""
members_metadata = {
member.member_id: self.parse_member_metadata(member.metadata)
for member in members
}
executor = StickyAssignmentExecutor(cluster, members_metadata)
executor.perform_initial_assignment()
executor.balance()
# Expose for diagnostic tests (matches parent behaviour).
self._latest_partition_movements = executor.partition_movements

# Map: partition -> current_owner_member_id (None if unowned).
currently_owned = {}
for member_id, parsed in members_metadata.items():
for tp in parsed.partitions:
currently_owned[tp] = member_id

# Build the round-1 assignment: drop any partition that's
# moving (final owner != current owner). The current owner
# will revoke it in _on_join_complete; the broker will see
# the consumer re-join and round 2 will land the partition
# on the intended new owner.
#
# ``executor.get_final_assignment`` returns the canonical wire
# shape: ``[(topic, [partition, ...]), ...]``. We rebuild the
# same shape after filtering so the encoder is happy.
cooperative = {}
for member in members:
member_id = member.member_id
kept_by_topic = defaultdict(list)
for topic, parts in executor.get_final_assignment(member_id):
for p in parts:
tp = TopicPartition(topic, p)
current_owner = currently_owned.get(tp)
if current_owner is None or current_owner == member_id:
# Either nobody owned it, or this member
# already owns it. Safe to assign now.
kept_by_topic[topic].append(p)
# else: partition is moving; defer to round 2.
assigned_partitions = sorted(
(t, sorted(ps)) for t, ps in kept_by_topic.items())
cooperative[member_id] = ConsumerProtocolAssignment(
self.version, assigned_partitions, b'')
return cooperative
133 changes: 120 additions & 13 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time

from kafka.coordinator.base import BaseCoordinator, Generation
from kafka.coordinator.assignors.abstract import RebalanceProtocol
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, client, subscription, **configs):

self._subscription = subscription
self._is_leader = False
self._rebalance_protocol = None
self._joined_subscription = set()
self._metadata_snapshot = self._build_metadata_snapshot(subscription, self._cluster)
self._assignment_snapshot = None
Expand Down Expand Up @@ -141,9 +143,37 @@ def __init__(self, client, subscription, **configs):
for klass in self.config['assignors']:
assignor = klass()
self._assignors[assignor.name] = assignor
# KIP-429: all configured assignors must agree on a single
# RebalanceProtocol mode. Mixing EAGER and COOPERATIVE
# assignors in the same consumer is unsafe - at JoinGroup time
# the broker picks one assignor, and the consumer needs to
# know up front whether to do eager (full) or cooperative
# (incremental) revocation.
self._rebalance_protocol = self._validate_rebalance_protocol()
self._cluster.request_update()
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))

def _validate_rebalance_protocol(self):
"""Return the single :class:`RebalanceProtocol` mode that all
configured assignors support; raise
:class:`KafkaConfigurationError` if they don't agree.
"""
if not self._assignors:
return RebalanceProtocol.EAGER
common = None
for assignor in self._assignors.values():
supported = set(assignor.supported_protocols())
common = supported if common is None else common & supported
if not common:
names = [a.name for a in self._assignors.values()]
raise Errors.KafkaConfigurationError(
"Specified partition_assignment_strategy assignors %s do not"
" support a common RebalanceProtocol. Mixing EAGER and"
" COOPERATIVE assignors in a single consumer is not"
" supported." % (names,))
# Pick the highest mode they all agree on (EAGER < COOPERATIVE).
return max(common)

@property
def _use_offset_apis(self):
return self.config['api_version'] >= (0, 8, 1)
Expand Down Expand Up @@ -265,12 +295,77 @@ async def _on_join_complete_async(self, generation, member_id, protocol,
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)

assignment = ConsumerProtocolAssignment.decode(member_assignment_bytes)
new_assigned = set(assignment.partitions())

# KIP-429: under COOPERATIVE, compute the diff between what we
# currently own and what the leader just assigned. Revoke the
# partitions we lost; only invoke on_partitions_assigned for
# the newly-added ones. If we lost any partitions, request a
# follow-up rebalance so the revoked partitions can land on
# their intended new owner.
if self._rebalance_protocol == RebalanceProtocol.COOPERATIVE:
currently_owned = set(self._subscription.assigned_partitions())
revoked = currently_owned - new_assigned
added = new_assigned - currently_owned

try:
self._subscription.assign_from_subscribed(sorted(new_assigned))
except ValueError as e:
log.warning("Cooperative assignment rejected: %s."
" Probably due to a deleted topic."
" Requesting re-join.", e)
self.request_rejoin()
return

assignor.on_assignment(assignment, generation)
self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval

log.info("Cooperative rebalance complete for group %s:"
" owned=%s, assigned=%s, revoked=%s, added=%s",
self.group_id, currently_owned, new_assigned, revoked, added)

if self._subscription.rebalance_listener:
if revoked:
try:
await self._invoke_rebalance_listener_async(
'on_partitions_revoked', revoked)
except Exception:
log.exception(
"User provided rebalance listener %s for group %s"
" failed on_partitions_revoked: %s",
self._subscription.rebalance_listener,
self.group_id, revoked)
if added:
try:
await self._invoke_rebalance_listener_async(
'on_partitions_assigned', added)
except Exception:
log.exception(
"User provided rebalance listener %s for group %s"
" failed on_partitions_assigned: %s",
self._subscription.rebalance_listener,
self.group_id, added)

if revoked:
# Round 2: the partitions we just revoked should now
# be unowned cluster-wide and can be assigned to
# their intended new owners. Trigger a follow-up
# rebalance to surface those assignments.
log.info("Triggering follow-up rebalance for group %s to"
" complete cooperative move of %d partition(s)",
self.group_id, len(revoked))
self.request_rejoin()
return

# EAGER mode (legacy): replace the full assignment and invoke
# on_partitions_assigned with the entire new set.
try:
self._subscription.assign_from_subscribed(assignment.partitions())
except ValueError as e:
log.warning("%s. Probably due to a deleted topic. Requesting Re-join" % e)
log.warning("Assignment rejected: %s. Probably due to a"
" deleted topic. Requesting re-join.", e)
self.request_rejoin()
return

# give the assignor a chance to update internal state
# based on the received assignment
Expand Down Expand Up @@ -409,18 +504,30 @@ async def _on_join_prepare_async(self, generation, member_id, timeout_ms=None):
log.exception("Pre-rebalance offset commit failed: This is likely"
" to cause duplicate message delivery")

# execute the user's callback before rebalance
log.info("Revoking previously assigned partitions %s for group %s",
self._subscription.assigned_partitions(), self.group_id)
if self._subscription.rebalance_listener:
try:
revoked = set(self._subscription.assigned_partitions())
await self._invoke_rebalance_listener_async(
'on_partitions_revoked', revoked)
except Exception:
log.exception("User provided subscription rebalance listener %s"
" for group %s failed on_partitions_revoked",
self._subscription.rebalance_listener, self.group_id)
# Under EAGER, notify the user that the full current
# assignment is about to be revoked so they can flush state /
# commit offsets before the rebalance. The partitions remain
# in self._subscription.assignment until _on_join_complete
# replaces it via assign_from_subscribed - this listener call
# is a *notification*, not the actual state mutation.
#
# Under COOPERATIVE we skip the notification entirely: members
# keep their assignment across JoinGroup and only the
# individual partitions that actually moved are revoked in
# _on_join_complete_async (computed from the owned-vs-assigned
# diff).
if self._rebalance_protocol == RebalanceProtocol.EAGER:
log.info("Revoking previously assigned partitions %s for group %s",
self._subscription.assigned_partitions(), self.group_id)
if self._subscription.rebalance_listener:
try:
revoked = set(self._subscription.assigned_partitions())
await self._invoke_rebalance_listener_async(
'on_partitions_revoked', revoked)
except Exception:
log.exception("User provided subscription rebalance listener %s"
" for group %s failed on_partitions_revoked",
self._subscription.rebalance_listener, self.group_id)

self._is_leader = False
self._subscription.reset_group_subscription()
Expand Down
Loading
Loading