From 1d012b46d851d667289a9f19b4d4d1caffaa2e01 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 27 May 2026 20:54:18 -0700 Subject: [PATCH] Consumer: default request_timeout_ms 30s; use request-specific timeout for JoinGroup --- kafka/consumer/group.py | 8 +++++-- kafka/coordinator/base.py | 12 +++++++++- test/consumer/test_coordinator.py | 39 +++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a3416d8df..8e7de1ee7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -298,7 +298,7 @@ class KafkaConsumer: 'fetch_min_bytes': 1, 'fetch_max_bytes': 52428800, 'max_partition_fetch_bytes': 1 * 1024 * 1024, - 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms + 'request_timeout_ms': 30000, 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, @@ -353,7 +353,11 @@ class KafkaConsumer: 'socks5_proxy': None, # deprecated 'kafka_client': KafkaNetClient, } - DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 + # Pre-0.10.1 brokers don't separate session_timeout_ms from + # max_poll_interval_ms; both default to this value when neither is + # user-supplied. Kept under request_timeout_ms (30s) so the strict + # request > session check below doesn't fire on the default path. + DEFAULT_SESSION_TIMEOUT_MS_0_9 = 25000 def __init__(self, *topics, **configs): # Only check for extra config keys in top-level class diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c9e6e841b..c6d94b622 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -710,8 +710,18 @@ async def _do_join_and_sync_async(self): log.debug("Sending JoinGroup (%s) to coordinator %s", join_request, self.coordinator_id) join_send_time = time.monotonic() + # The broker holds JoinGroup open up to rebalance_timeout_ms + # (== max_poll_interval_ms) waiting for every member to join. + # Default request_timeout_ms (30s) would time out a healthy + # rebalance, so override per-request. Matches Java's + # joinGroupTimeoutMs = max(request_timeout_ms, rebalance_timeout_ms + 5s). + join_timeout_ms = max( + self.config['request_timeout_ms'], + self.config['max_poll_interval_ms'] + 5000, + ) join_response = await self._manager.send( - join_request, node_id=self.coordinator_id) + join_request, node_id=self.coordinator_id, + request_timeout_ms=join_timeout_ms) # raises on error; mutates self._generation on success self._process_join_group_response(join_response, join_send_time) diff --git a/test/consumer/test_coordinator.py b/test/consumer/test_coordinator.py index 6e755152c..561ac3d63 100644 --- a/test/consumer/test_coordinator.py +++ b/test/consumer/test_coordinator.py @@ -1134,6 +1134,45 @@ def test_join_group_async_happy_path_follower(request, broker, seeded_coord): assert seeded_coord._heartbeat_enabled is True +def test_join_group_uses_extended_per_request_timeout(request, mocker, broker, seeded_coord): + """JoinGroup must be sent with request_timeout_ms = max(request_timeout_ms, + max_poll_interval_ms + 5000) so the client doesn't time out a healthy + rebalance that the broker is legitimately holding open. Matches Java's + joinGroupTimeoutMs override. + """ + request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED)) + seeded_coord.rejoin_needed = True + seeded_coord.state = MemberState.UNJOINED + broker.respond(JoinGroupRequest, _join_response_object( + leader='leader-x', member_id='member-1', members=[])) + broker.respond(SyncGroupRequest, _sync_response_object( + assignment=ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'').encode())) + + seeded_coord.config['request_timeout_ms'] = 30000 + seeded_coord.config['max_poll_interval_ms'] = 300000 + sends = [] + original_send = seeded_coord._manager.send + + def capturing_send(req, node_id=None, request_timeout_ms=None): + sends.append((type(req).__name__, request_timeout_ms)) + return original_send(req, node_id=node_id, request_timeout_ms=request_timeout_ms) + + mocker.patch.object(seeded_coord._manager, 'send', side_effect=capturing_send) + + seeded_coord._manager.run(seeded_coord.join_group_async, 5000) + + join_sends = [t for name, t in sends if 'JoinGroup' in name] + assert join_sends, "expected at least one JoinGroupRequest send" + # max(30000, 300000 + 5000) = 305000 + assert all(t == 305000 for t in join_sends), \ + "JoinGroup sends used %r, expected 305000" % (join_sends,) + sync_sends = [t for name, t in sends if 'SyncGroup' in name] + assert sync_sends, "expected SyncGroupRequest" + # SyncGroup should NOT get the override - it uses default (None). + assert all(t is None for t in sync_sends), \ + "SyncGroup sends should pass request_timeout_ms=None, got %r" % (sync_sends,) + + def test_join_group_async_retries_on_retriable_error(request, broker, seeded_coord): """First JoinGroup fails with RebalanceInProgress; loop retries and succeeds.""" request.addfinalizer(lambda: setattr(seeded_coord, 'state', MemberState.UNJOINED))