-
Notifications
You must be signed in to change notification settings - Fork 1.5k
kafka.net: Add HttpConnectProxy #2990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+271
−2
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| import base64 | ||
| import errno | ||
| import logging | ||
| import random | ||
| import socket | ||
| from urllib.parse import urlparse | ||
|
|
||
| from kafka.errors import KafkaConnectionError | ||
| from kafka.net.inet import KafkaNetSocket | ||
|
|
||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
| _WOULD_BLOCK = {errno.EWOULDBLOCK, errno.EAGAIN} | ||
| _MAX_RESPONSE_SIZE = 65536 | ||
|
|
||
|
|
||
| class _States: | ||
| DISCONNECTED = '<disconnected>' | ||
| CONNECTING = '<connecting>' | ||
| SENDING = '<sending>' | ||
| READING = '<reading>' | ||
| COMPLETE = '<complete>' | ||
|
|
||
|
|
||
| class HttpConnectProxy(KafkaNetSocket): | ||
| """Tunnels broker connections through an HTTP CONNECT proxy (RFC 7231 s4.3.6). | ||
|
|
||
| Registered for the ``http`` scheme -- pass ``proxy_url='http://host:port'`` | ||
| to KafkaConsumer/KafkaProducer/KafkaAdminClient. | ||
|
|
||
| Basic proxy auth is supported via URL credentials: ``http://user:pass@host:8080``. | ||
| Broker hostnames are always forwarded unresolved so the proxy handles DNS. | ||
| """ | ||
|
|
||
| SCHEMES = ('http',) | ||
|
|
||
| def __init__(self, proxy_url): | ||
| self._proxy_url = urlparse(proxy_url) | ||
| self._sock = None | ||
| self._state = _States.DISCONNECTED | ||
| self._send_buf = b'' | ||
| self._recv_buf = b'' | ||
| self._proxy_addr = self._get_proxy_addr() | ||
|
|
||
| def _get_proxy_addr(self): | ||
| addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, proxy=True) | ||
| if not addrs: | ||
| raise KafkaConnectionError('Unable to resolve proxy_url via dns') | ||
| return random.choice(addrs) | ||
|
|
||
| def dns_lookup(self, host, port, proxy=False): | ||
| if proxy: | ||
| return super().dns_lookup(host, port, raise_error=True) | ||
| # Always forward broker hostname unresolved; the proxy handles DNS | ||
| return [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', (host, port))] | ||
|
|
||
| def socket(self, family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP): | ||
| self._target_afi = family | ||
| proxy_family, _, _, _, _ = self._proxy_addr | ||
| self._sock = socket.socket(proxy_family, sock_type, proto) | ||
| return self._sock | ||
|
|
||
| def connect_ex(self, sock, addr): | ||
| assert sock is self._sock | ||
|
|
||
| if self._state == _States.DISCONNECTED: | ||
| self._state = _States.CONNECTING | ||
|
|
||
| if self._state == _States.CONNECTING: | ||
| ret = self._do_connecting(addr) | ||
| if ret is not None: | ||
| return ret | ||
|
|
||
| if self._state == _States.SENDING: | ||
| ret = self._do_sending() | ||
| if ret is not None: | ||
| return ret | ||
|
|
||
| if self._state == _States.READING: | ||
| ret = self._do_reading() | ||
| if ret is not None: | ||
| return ret | ||
|
|
||
| if self._state == _States.COMPLETE: | ||
| return 0 | ||
|
|
||
| return errno.ECONNREFUSED | ||
|
|
||
| def _do_connecting(self, addr): | ||
| _, _, _, _, proxy_sockaddr = self._proxy_addr | ||
| ret = self._sock.connect_ex(proxy_sockaddr) | ||
| if ret and ret != errno.EISCONN: | ||
| return ret | ||
| host, port = addr[0], addr[1] | ||
| headers = 'CONNECT {0}:{1} HTTP/1.1\r\nHost: {0}:{1}\r\n'.format(host, port) | ||
| if self._proxy_url.username and self._proxy_url.password: | ||
| credentials = base64.b64encode( | ||
| '{0}:{1}'.format(self._proxy_url.username, self._proxy_url.password).encode() | ||
| ).decode() | ||
| headers += 'Proxy-Authorization: Basic {}\r\n'.format(credentials) | ||
| self._send_buf = (headers + '\r\n').encode() | ||
| self._state = _States.SENDING | ||
| return None | ||
|
|
||
| def _do_sending(self): | ||
| while self._send_buf: | ||
| try: | ||
| sent = self._sock.send(self._send_buf) | ||
| if sent == 0: | ||
| log.error('Proxy closed connection while sending CONNECT request') | ||
| return errno.ECONNREFUSED | ||
| self._send_buf = self._send_buf[sent:] | ||
| except OSError as exc: | ||
| if exc.errno in _WOULD_BLOCK: | ||
| return errno.EWOULDBLOCK | ||
| raise | ||
| self._state = _States.READING | ||
| return None | ||
|
|
||
| def _do_reading(self): | ||
| while b'\r\n\r\n' not in self._recv_buf: | ||
| try: | ||
| chunk = self._sock.recv(4096) | ||
| if not chunk: | ||
| log.error('Proxy closed connection during CONNECT handshake') | ||
| self._sock.close() | ||
| return errno.ECONNREFUSED | ||
| self._recv_buf += chunk | ||
| if len(self._recv_buf) > _MAX_RESPONSE_SIZE: | ||
| log.error('Proxy response exceeded %d bytes without end-of-headers', _MAX_RESPONSE_SIZE) | ||
| self._sock.close() | ||
| return errno.ECONNREFUSED | ||
| except OSError as exc: | ||
| if exc.errno in _WOULD_BLOCK: | ||
| return errno.EWOULDBLOCK | ||
| raise | ||
| first_line = self._recv_buf.split(b'\r\n')[0] | ||
| if b' 200 ' in first_line or first_line.endswith(b' 200'): | ||
| self._state = _States.COMPLETE | ||
| return None | ||
| log.error('HTTP CONNECT to proxy failed: %r', first_line) | ||
| self._sock.close() | ||
| return errno.ECONNREFUSED |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| import errno | ||
| import socket | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from kafka.net.http_connect import HttpConnectProxy | ||
| from kafka.net.inet import KafkaNetSocket | ||
|
|
||
|
|
||
| _FAKE_PROXY_ADDR = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('1.2.3.4', 8080)) | ||
|
|
||
|
|
||
| def _make_proxy(url='http://proxy:8080'): | ||
| with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]): | ||
| proxy = HttpConnectProxy(url) | ||
| proxy._sock = MagicMock() | ||
| return proxy | ||
|
|
||
|
|
||
| class TestHttpConnectProxyRegistry: | ||
| def test_registered_for_http_scheme(self): | ||
| with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]): | ||
| obj = KafkaNetSocket('http://proxy:8080') | ||
| assert isinstance(obj, HttpConnectProxy) | ||
|
|
||
| def test_unregistered_scheme_raises(self): | ||
| with pytest.raises(ValueError, match='Unsupported proxy url scheme'): | ||
| KafkaNetSocket('socks4://proxy:8080') | ||
|
|
||
|
|
||
| class TestHttpConnectProxyDnsLookup: | ||
| def test_broker_lookup_returns_unresolved(self): | ||
| proxy = _make_proxy() | ||
| result = proxy.dns_lookup('broker.kafka.internal', 9092) | ||
| assert result == [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('broker.kafka.internal', 9092))] | ||
|
|
||
| def test_proxy_lookup_delegates_to_super(self): | ||
| with patch('socket.getaddrinfo', return_value=[_FAKE_PROXY_ADDR]) as mock_gai: | ||
| proxy = _make_proxy() | ||
| proxy.dns_lookup('proxy', 8080, proxy=True) | ||
| mock_gai.assert_called() | ||
|
|
||
|
|
||
| class TestHttpConnectProxySocket: | ||
| def test_uses_proxy_family_not_broker_family(self): | ||
| proxy = _make_proxy() | ||
| with patch('socket.socket') as mock_ctor: | ||
| proxy.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) | ||
| mock_ctor.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) | ||
|
|
||
|
|
||
| class TestHttpConnectProxyConnectEx: | ||
| def test_success(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n' | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 | ||
|
|
||
| def test_success_no_reason_phrase(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.return_value = b'HTTP/1.1 200\r\n\r\n' | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 | ||
|
|
||
| def test_basic_auth_header_sent_when_credentials_in_url(self): | ||
| import base64 | ||
| proxy = _make_proxy('http://user:pass@proxy:8080') | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| sent = [] | ||
| proxy._sock.send.side_effect = lambda b: sent.append(b) or len(b) | ||
| proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n' | ||
| proxy.connect_ex(proxy._sock, ('broker', 9092)) | ||
| request = b''.join(sent).decode() | ||
| expected = base64.b64encode(b'user:pass').decode() | ||
| assert 'Proxy-Authorization: Basic {}'.format(expected) in request | ||
|
|
||
| def test_non_200_response_returns_econnrefused(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.return_value = b'HTTP/1.1 407 Proxy Authentication Required\r\n\r\n' | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED | ||
|
|
||
| def test_ewouldblock_while_sending(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = OSError(errno.EWOULDBLOCK, 'would block') | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK | ||
|
|
||
| def test_ewouldblock_while_reading(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.side_effect = OSError(errno.EWOULDBLOCK, 'would block') | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK | ||
|
|
||
| def test_resumes_after_ewouldblock(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.side_effect = [ | ||
| OSError(errno.EWOULDBLOCK, 'would block'), | ||
| b'HTTP/1.1 200 Connection Established\r\n\r\n', | ||
| ] | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 | ||
|
|
||
| def test_eof_during_handshake_returns_econnrefused(self): | ||
| proxy = _make_proxy() | ||
| proxy._sock.connect_ex.return_value = 0 | ||
| proxy._sock.send.side_effect = lambda b: len(b) | ||
| proxy._sock.recv.return_value = b'' | ||
| assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test class seems redundant w/ test_inet.py -- I think we can remove