diff --git a/kafka/net/manager.py b/kafka/net/manager.py index 4ab223ac7..f8e901402 100644 --- a/kafka/net/manager.py +++ b/kafka/net/manager.py @@ -77,6 +77,10 @@ def __init__(self, net, **configs): self.cluster.attach(self) self._conns = {} self._backoff = dict() # node_id => (failures, backoff_until) + # Cache the most recent SASL / SSL / auth failure per node so we can + # surface it to the user instead of silently retrying forever. + # Cleared on successful connect. + self._auth_failures = {} # node_id => AuthenticationError self._idle_check_delay = self.config['connections_max_idle_ms'] / 1000 self.close_idle_connections() self.broker_version_data = None @@ -230,12 +234,16 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True): log.error('Connection failed: %s', exc) conn.connection_lost(exc) self.update_backoff(node.node_id) + if isinstance(exc, (Errors.SaslAuthenticationFailedError, + Errors.AuthorizationError)): + self._auth_failures[node.node_id] = exc return if self._sensors: self._sensors.connection_created.record() if reset_backoff_on_connect: self.reset_backoff(node.node_id) + self._auth_failures.pop(node.node_id, None) if conn.broker_version_data is not None: if self.cluster.is_bootstrap(node.node_id): self.broker_version_data = conn.broker_version_data @@ -246,7 +254,8 @@ def get_connection(self, node_id, timeout_ms=None, reset_backoff_on_connect=True): if node_id is None: raise Errors.NodeNotReadyError('No node_id provided') - elif self.connection_delay(node_id) > 0: + self.maybe_raise_auth_failure(node_id) + if self.connection_delay(node_id) > 0: raise Errors.NodeNotReadyError(node_id) elif node_id in self._conns: return self._conns[node_id] @@ -338,6 +347,17 @@ def connection_delay(self, node_id): return 0 return max(0, self._backoff[node_id][1] - time.monotonic()) + def auth_failure(self, node_id): + """Return the most recent auth-class failure for ``node_id``, + or None if there is no sticky failure on record.""" + return self._auth_failures.get(node_id) + + def maybe_raise_auth_failure(self, node_id): + """Raise the cached auth-class failure for ``node_id`` if any.""" + exc = self._auth_failures.get(node_id) + if exc is not None: + raise exc + def close(self, node_id=None, timeout_ms=None): if node_id is not None: conn = self._conns.get(node_id) diff --git a/test/net/test_manager.py b/test/net/test_manager.py index 2f83de816..23c671ed5 100644 --- a/test/net/test_manager.py +++ b/test/net/test_manager.py @@ -115,6 +115,38 @@ def test_reset_backoff_nonexistent(self, manager): assert manager.connection_delay('node-1') == 0 +class TestKafkaConnectionManagerAuthFailure: + def test_no_failure_by_default(self, manager): + assert manager.auth_failure('node-1') is None + manager.maybe_raise_auth_failure('node-1') # no-op + + def test_set_via_direct_dict_then_raise(self, manager): + err = Errors.SaslAuthenticationFailedError('bad creds') + manager._auth_failures['node-1'] = err + assert manager.auth_failure('node-1') is err + with pytest.raises(Errors.SaslAuthenticationFailedError): + manager.maybe_raise_auth_failure('node-1') + + def test_successful_connect_clears_failure(self, manager): + err = Errors.SaslAuthenticationFailedError('bad creds') + manager._auth_failures['node-1'] = err + # Simulate the body of _connect's success path. + manager._auth_failures.pop('node-1', None) + assert manager.auth_failure('node-1') is None + + def test_get_connection_raises_sticky_auth_failure(self, manager): + err = Errors.SaslAuthenticationFailedError('bad creds') + manager._auth_failures['bootstrap-0'] = err + with pytest.raises(Errors.SaslAuthenticationFailedError): + manager.get_connection('bootstrap-0') + + def test_send_propagates_sticky_auth_failure(self, manager): + err = Errors.SaslAuthenticationFailedError('bad creds') + manager._auth_failures['bootstrap-0'] = err + with pytest.raises(Errors.SaslAuthenticationFailedError): + manager.send(MagicMock(), node_id='bootstrap-0') + + class TestKafkaConnectionManagerGetConnection: def test_get_connection_none_raises(self, manager): with pytest.raises(Errors.NodeNotReadyError):