Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class KafkaConsumer:
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms
'request_timeout_ms': 30000,
'retry_backoff_ms': 100,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 30000,
Expand Down Expand Up @@ -353,7 +353,11 @@ class KafkaConsumer:
'socks5_proxy': None, # deprecated
'kafka_client': KafkaNetClient,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
# Pre-0.10.1 brokers don't separate session_timeout_ms from
# max_poll_interval_ms; both default to this value when neither is
# user-supplied. Kept under request_timeout_ms (30s) so the strict
# request > session check below doesn't fire on the default path.
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 25000

def __init__(self, *topics, **configs):
# Only check for extra config keys in top-level class
Expand Down
12 changes: 11 additions & 1 deletion kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,18 @@ async def _do_join_and_sync_async(self):
log.debug("Sending JoinGroup (%s) to coordinator %s",
join_request, self.coordinator_id)
join_send_time = time.monotonic()
# The broker holds JoinGroup open up to rebalance_timeout_ms
# (== max_poll_interval_ms) waiting for every member to join.
# Default request_timeout_ms (30s) would time out a healthy
# rebalance, so override per-request. Matches Java's
# joinGroupTimeoutMs = max(request_timeout_ms, rebalance_timeout_ms + 5s).
join_timeout_ms = max(
self.config['request_timeout_ms'],
self.config['max_poll_interval_ms'] + 5000,
)
join_response = await self._manager.send(
join_request, node_id=self.coordinator_id)
join_request, node_id=self.coordinator_id,
request_timeout_ms=join_timeout_ms)
# raises on error; mutates self._generation on success
self._process_join_group_response(join_response, join_send_time)

Expand Down
39 changes: 39 additions & 0 deletions test/consumer/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,45 @@ def test_join_group_async_happy_path_follower(request, broker, seeded_coord):
assert seeded_coord._heartbeat_enabled is True


def test_join_group_uses_extended_per_request_timeout(request, mocker, broker, seeded_coord):
"""JoinGroup must be sent with request_timeout_ms = max(request_timeout_ms,
max_poll_interval_ms + 5000) so the client doesn't time out a healthy
rebalance that the broker is legitimately holding open. Matches Java's
joinGroupTimeoutMs override.
"""
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
seeded_coord.rejoin_needed = True
seeded_coord.state = MemberState.UNJOINED
broker.respond(JoinGroupRequest, _join_response_object(
leader='leader-x', member_id='member-1', members=[]))
broker.respond(SyncGroupRequest, _sync_response_object(
assignment=ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'').encode()))

seeded_coord.config['request_timeout_ms'] = 30000
seeded_coord.config['max_poll_interval_ms'] = 300000
sends = []
original_send = seeded_coord._manager.send

def capturing_send(req, node_id=None, request_timeout_ms=None):
sends.append((type(req).__name__, request_timeout_ms))
return original_send(req, node_id=node_id, request_timeout_ms=request_timeout_ms)

mocker.patch.object(seeded_coord._manager, 'send', side_effect=capturing_send)

seeded_coord._manager.run(seeded_coord.join_group_async, 5000)

join_sends = [t for name, t in sends if 'JoinGroup' in name]
assert join_sends, "expected at least one JoinGroupRequest send"
# max(30000, 300000 + 5000) = 305000
assert all(t == 305000 for t in join_sends), \
"JoinGroup sends used %r, expected 305000" % (join_sends,)
sync_sends = [t for name, t in sends if 'SyncGroup' in name]
assert sync_sends, "expected SyncGroupRequest"
# SyncGroup should NOT get the override - it uses default (None).
assert all(t is None for t in sync_sends), \
"SyncGroup sends should pass request_timeout_ms=None, got %r" % (sync_sends,)


def test_join_group_async_retries_on_retriable_error(request, broker, seeded_coord):
"""First JoinGroup fails with RebalanceInProgress; loop retries and succeeds."""
request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))
Expand Down
Loading