From 97d46ed9242fd370b4c34610af512185ccb67add Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 26 May 2026 16:26:31 +0000 Subject: [PATCH] KIP-559: bump JoinGroup v7 / SyncGroup v5; verify protocol_type/name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit KIP-559 makes the Kafka protocol friendlier with L7 proxies by ensuring that JoinGroup and SyncGroup messages explicitly carry the group's ProtocolType / ProtocolName, so each message can be parsed independently. The wire schemas already covered the new fields (JoinGroup v7, SyncGroup v5 — both via generated stubs). Apply the client-side bits: - coordinator/base: send JoinGroupRequest max_version=7, SyncGroupRequest max_version=5, populating protocol_type and protocol_name on the SyncGroupRequest. - coordinator/base: validate response.protocol_type on the JoinGroupResponse and both protocol_type and protocol_name on the SyncGroupResponse; raise InconsistentGroupProtocolError on mismatch (treated equivalently to a broker-reported INCONSISTENT_GROUP_PROTOCOL). - tests: cover join/sync protocol_type and sync protocol_name mismatches. Co-authored-by: Shelley --- kafka/coordinator/base.py | 35 +++++++++++++++++++++-- test/consumer/test_coordinator.py | 47 ++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 654c5f038..c9e6e841b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -537,6 +537,14 @@ def _process_join_group_response(self, response, send_time): log.debug("Received JoinGroup response: %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: + # KIP-559: starting with v7 the response carries the protocol_type; + # validate it matches what this member sent (None on older versions). + if response.protocol_type is not None and response.protocol_type != self.protocol_type(): + log.error("JoinGroup for group %s returned inconsistent protocol_type %s (expected %s)", + self.group_id, response.protocol_type, self.protocol_type()) + raise Errors.InconsistentGroupProtocolError( + "JoinGroupResponse protocol_type %r does not match group protocol_type %r" + % (response.protocol_type, self.protocol_type())) if self._sensors: self._sensors.join_latency.record((time.monotonic() - send_time) * 1000) with self._lock: @@ -612,6 +620,23 @@ def _process_sync_group_response(self, response, send_time): log.debug("Received SyncGroup response: %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: + # KIP-559: starting with v5 the response carries the protocol_type and + # protocol_name; validate they match what this member is using + # (both None on older versions). + if response.protocol_type is not None and response.protocol_type != self.protocol_type(): + log.error("SyncGroup for group %s returned inconsistent protocol_type %s (expected %s)", + self.group_id, response.protocol_type, self.protocol_type()) + raise Errors.InconsistentGroupProtocolError( + "SyncGroupResponse protocol_type %r does not match group protocol_type %r" + % (response.protocol_type, self.protocol_type())) + if (response.protocol_name is not None + and self._generation is not Generation.NO_GENERATION + and response.protocol_name != self._generation.protocol): + log.error("SyncGroup for group %s returned inconsistent protocol_name %s (expected %s)", + self.group_id, response.protocol_name, self._generation.protocol) + raise Errors.InconsistentGroupProtocolError( + "SyncGroupResponse protocol_name %r does not match group protocol_name %r" + % (response.protocol_name, self._generation.protocol)) if self._sensors: self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000) return response.assignment @@ -681,7 +706,7 @@ async def _do_join_and_sync_async(self): group_instance_id=self.group_instance_id, protocol_type=self.protocol_type(), protocols=self.group_protocols(), - max_version=6) + max_version=7) log.debug("Sending JoinGroup (%s) to coordinator %s", join_request, self.coordinator_id) join_send_time = time.monotonic() @@ -702,8 +727,10 @@ async def _do_join_and_sync_async(self): generation_id=self._generation.generation_id, member_id=self._generation.member_id, group_instance_id=self.group_instance_id, + protocol_type=self.protocol_type(), + protocol_name=self._generation.protocol, assignments=group_assignment.items(), - max_version=4) + max_version=5) log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, sync_request) else: @@ -712,8 +739,10 @@ async def _do_join_and_sync_async(self): generation_id=self._generation.generation_id, member_id=self._generation.member_id, group_instance_id=self.group_instance_id, + protocol_type=self.protocol_type(), + protocol_name=self._generation.protocol, assignments=[], - max_version=4) + max_version=5) log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s", self.group_id, self.coordinator_id, sync_request) diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 431d163f4..7a1e3147a 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -977,24 +977,26 @@ def test_process_sync_group_response(request, coordinator, error_code, error_typ def _join_response_object(error_code=0, generation_id=42, member_id='member-1', leader='member-1', + protocol_type='consumer', protocol_name='range', members=None): return JoinGroupResponse( throttle_time_ms=0, error_code=error_code, generation_id=generation_id, - protocol_type='consumer', + protocol_type=protocol_type, protocol_name=protocol_name, leader=leader, member_id=member_id, members=members or []) -def _sync_response_object(error_code=0, assignment=b''): +def _sync_response_object(error_code=0, assignment=b'', + protocol_type='consumer', protocol_name='range'): return SyncGroupResponse( throttle_time_ms=0, error_code=error_code, - protocol_type='consumer', - protocol_name='range', + protocol_type=protocol_type, + protocol_name=protocol_name, assignment=assignment) @@ -1334,3 +1336,40 @@ async def fake_send(): future = coordinator.lookup_coordinator() coordinator._client.poll(future=future, timeout_ms=1000) assert future.failed() + + +def test_do_join_and_sync_async_join_protocol_type_mismatch(request, broker, seeded_coord): + """KIP-559: JoinGroupResponse with mismatched protocol_type must raise + InconsistentGroupProtocolError.""" + request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED)) + broker.respond(JoinGroupRequest, _join_response_object( + leader='leader-x', member_id='member-1', members=[], + protocol_type='not-consumer')) + + with pytest.raises(Errors.InconsistentGroupProtocolError): + seeded_coord._manager.run(seeded_coord._do_join_and_sync_async) + + +def test_do_join_and_sync_async_sync_protocol_type_mismatch(request, broker, seeded_coord): + """KIP-559: SyncGroupResponse with mismatched protocol_type must raise.""" + request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED)) + broker.respond(JoinGroupRequest, _join_response_object( + leader='leader-x', member_id='member-1', members=[])) + broker.respond(SyncGroupRequest, _sync_response_object( + protocol_type='not-consumer')) + + with pytest.raises(Errors.InconsistentGroupProtocolError): + seeded_coord._manager.run(seeded_coord._do_join_and_sync_async) + + +def test_do_join_and_sync_async_sync_protocol_name_mismatch(request, broker, seeded_coord): + """KIP-559: SyncGroupResponse with mismatched protocol_name must raise.""" + request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED)) + broker.respond(JoinGroupRequest, _join_response_object( + leader='leader-x', member_id='member-1', members=[], + protocol_name='range')) + broker.respond(SyncGroupRequest, _sync_response_object( + protocol_name='roundrobin')) + + with pytest.raises(Errors.InconsistentGroupProtocolError): + seeded_coord._manager.run(seeded_coord._do_join_and_sync_async)