Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,14 @@ def _process_join_group_response(self, response, send_time):
log.debug("Received JoinGroup response: %s", response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
# KIP-559: starting with v7 the response carries the protocol_type;
# validate it matches what this member sent (None on older versions).
if response.protocol_type is not None and response.protocol_type != self.protocol_type():
log.error("JoinGroup for group %s returned inconsistent protocol_type %s (expected %s)",
self.group_id, response.protocol_type, self.protocol_type())
raise Errors.InconsistentGroupProtocolError(
"JoinGroupResponse protocol_type %r does not match group protocol_type %r"
% (response.protocol_type, self.protocol_type()))
if self._sensors:
self._sensors.join_latency.record((time.monotonic() - send_time) * 1000)
with self._lock:
Expand Down Expand Up @@ -612,6 +620,23 @@ def _process_sync_group_response(self, response, send_time):
log.debug("Received SyncGroup response: %s", response)
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
# KIP-559: starting with v5 the response carries the protocol_type and
# protocol_name; validate they match what this member is using
# (both None on older versions).
if response.protocol_type is not None and response.protocol_type != self.protocol_type():
log.error("SyncGroup for group %s returned inconsistent protocol_type %s (expected %s)",
self.group_id, response.protocol_type, self.protocol_type())
raise Errors.InconsistentGroupProtocolError(
"SyncGroupResponse protocol_type %r does not match group protocol_type %r"
% (response.protocol_type, self.protocol_type()))
if (response.protocol_name is not None
and self._generation is not Generation.NO_GENERATION
and response.protocol_name != self._generation.protocol):
log.error("SyncGroup for group %s returned inconsistent protocol_name %s (expected %s)",
self.group_id, response.protocol_name, self._generation.protocol)
raise Errors.InconsistentGroupProtocolError(
"SyncGroupResponse protocol_name %r does not match group protocol_name %r"
% (response.protocol_name, self._generation.protocol))
if self._sensors:
self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000)
return response.assignment
Expand Down Expand Up @@ -681,7 +706,7 @@ async def _do_join_and_sync_async(self):
group_instance_id=self.group_instance_id,
protocol_type=self.protocol_type(),
protocols=self.group_protocols(),
max_version=6)
max_version=7)
log.debug("Sending JoinGroup (%s) to coordinator %s",
join_request, self.coordinator_id)
join_send_time = time.monotonic()
Expand All @@ -702,8 +727,10 @@ async def _do_join_and_sync_async(self):
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
protocol_type=self.protocol_type(),
protocol_name=self._generation.protocol,
assignments=group_assignment.items(),
max_version=4)
max_version=5)
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, sync_request)
else:
Expand All @@ -712,8 +739,10 @@ async def _do_join_and_sync_async(self):
generation_id=self._generation.generation_id,
member_id=self._generation.member_id,
group_instance_id=self.group_instance_id,
protocol_type=self.protocol_type(),
protocol_name=self._generation.protocol,
assignments=[],
max_version=4)
max_version=5)
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, sync_request)

Expand Down
47 changes: 43 additions & 4 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,24 +977,26 @@ def test_process_sync_group_response(request, coordinator, error_code, error_typ

def _join_response_object(error_code=0, generation_id=42,
member_id='member-1', leader='member-1',
protocol_type='consumer',
protocol_name='range', members=None):
return JoinGroupResponse(
throttle_time_ms=0,
error_code=error_code,
generation_id=generation_id,
protocol_type='consumer',
protocol_type=protocol_type,
protocol_name=protocol_name,
leader=leader,
member_id=member_id,
members=members or [])


def _sync_response_object(error_code=0, assignment=b''):
def _sync_response_object(error_code=0, assignment=b'',
protocol_type='consumer', protocol_name='range'):
return SyncGroupResponse(
throttle_time_ms=0,
error_code=error_code,
protocol_type='consumer',
protocol_name='range',
protocol_type=protocol_type,
protocol_name=protocol_name,
assignment=assignment)


Expand Down Expand Up @@ -1334,3 +1336,40 @@ async def fake_send():
future = coordinator.lookup_coordinator()
coordinator._client.poll(future=future, timeout_ms=1000)
assert future.failed()


def test_do_join_and_sync_async_join_protocol_type_mismatch(request, broker, seeded_coord):
"""KIP-559: JoinGroupResponse with mismatched protocol_type must raise
InconsistentGroupProtocolError."""
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
broker.respond(JoinGroupRequest, _join_response_object(
leader='leader-x', member_id='member-1', members=[],
protocol_type='not-consumer'))

with pytest.raises(Errors.InconsistentGroupProtocolError):
seeded_coord._manager.run(seeded_coord._do_join_and_sync_async)


def test_do_join_and_sync_async_sync_protocol_type_mismatch(request, broker, seeded_coord):
"""KIP-559: SyncGroupResponse with mismatched protocol_type must raise."""
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
broker.respond(JoinGroupRequest, _join_response_object(
leader='leader-x', member_id='member-1', members=[]))
broker.respond(SyncGroupRequest, _sync_response_object(
protocol_type='not-consumer'))

with pytest.raises(Errors.InconsistentGroupProtocolError):
seeded_coord._manager.run(seeded_coord._do_join_and_sync_async)


def test_do_join_and_sync_async_sync_protocol_name_mismatch(request, broker, seeded_coord):
"""KIP-559: SyncGroupResponse with mismatched protocol_name must raise."""
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
broker.respond(JoinGroupRequest, _join_response_object(
leader='leader-x', member_id='member-1', members=[],
protocol_name='range'))
broker.respond(SyncGroupRequest, _sync_response_object(
protocol_name='roundrobin'))

with pytest.raises(Errors.InconsistentGroupProtocolError):
seeded_coord._manager.run(seeded_coord._do_join_and_sync_async)
Loading