Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def __init__(self, net, **configs):
self.cluster.attach(self)
self._conns = {}
self._backoff = dict() # node_id => (failures, backoff_until)
# Cache the most recent SASL / SSL / auth failure per node so we can
# surface it to the user instead of silently retrying forever.
# Cleared on successful connect.
self._auth_failures = {} # node_id => AuthenticationError
self._idle_check_delay = self.config['connections_max_idle_ms'] / 1000
self.close_idle_connections()
self.broker_version_data = None
Expand Down Expand Up @@ -230,12 +234,16 @@ async def _connect(self, node, conn, reset_backoff_on_connect=True):
log.error('Connection failed: %s', exc)
conn.connection_lost(exc)
self.update_backoff(node.node_id)
if isinstance(exc, (Errors.SaslAuthenticationFailedError,
Errors.AuthorizationError)):
self._auth_failures[node.node_id] = exc
return

if self._sensors:
self._sensors.connection_created.record()
if reset_backoff_on_connect:
self.reset_backoff(node.node_id)
self._auth_failures.pop(node.node_id, None)
if conn.broker_version_data is not None:
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = conn.broker_version_data
Expand All @@ -246,7 +254,8 @@ def get_connection(self, node_id, timeout_ms=None,
reset_backoff_on_connect=True):
if node_id is None:
raise Errors.NodeNotReadyError('No node_id provided')
elif self.connection_delay(node_id) > 0:
self.maybe_raise_auth_failure(node_id)
if self.connection_delay(node_id) > 0:
raise Errors.NodeNotReadyError(node_id)
elif node_id in self._conns:
return self._conns[node_id]
Expand Down Expand Up @@ -338,6 +347,17 @@ def connection_delay(self, node_id):
return 0
return max(0, self._backoff[node_id][1] - time.monotonic())

def auth_failure(self, node_id):
"""Return the most recent auth-class failure for ``node_id``,
or None if there is no sticky failure on record."""
return self._auth_failures.get(node_id)

def maybe_raise_auth_failure(self, node_id):
"""Raise the cached auth-class failure for ``node_id`` if any."""
exc = self._auth_failures.get(node_id)
if exc is not None:
raise exc

def close(self, node_id=None, timeout_ms=None):
if node_id is not None:
conn = self._conns.get(node_id)
Expand Down
32 changes: 32 additions & 0 deletions test/net/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,38 @@ def test_reset_backoff_nonexistent(self, manager):
assert manager.connection_delay('node-1') == 0


class TestKafkaConnectionManagerAuthFailure:
def test_no_failure_by_default(self, manager):
assert manager.auth_failure('node-1') is None
manager.maybe_raise_auth_failure('node-1') # no-op

def test_set_via_direct_dict_then_raise(self, manager):
err = Errors.SaslAuthenticationFailedError('bad creds')
manager._auth_failures['node-1'] = err
assert manager.auth_failure('node-1') is err
with pytest.raises(Errors.SaslAuthenticationFailedError):
manager.maybe_raise_auth_failure('node-1')

def test_successful_connect_clears_failure(self, manager):
err = Errors.SaslAuthenticationFailedError('bad creds')
manager._auth_failures['node-1'] = err
# Simulate the body of _connect's success path.
manager._auth_failures.pop('node-1', None)
assert manager.auth_failure('node-1') is None

def test_get_connection_raises_sticky_auth_failure(self, manager):
err = Errors.SaslAuthenticationFailedError('bad creds')
manager._auth_failures['bootstrap-0'] = err
with pytest.raises(Errors.SaslAuthenticationFailedError):
manager.get_connection('bootstrap-0')

def test_send_propagates_sticky_auth_failure(self, manager):
err = Errors.SaslAuthenticationFailedError('bad creds')
manager._auth_failures['bootstrap-0'] = err
with pytest.raises(Errors.SaslAuthenticationFailedError):
manager.send(MagicMock(), node_id='bootstrap-0')


class TestKafkaConnectionManagerGetConnection:
def test_get_connection_none_raises(self, manager):
with pytest.raises(Errors.NodeNotReadyError):
Expand Down
Loading