From a58e17395c812a84074fd4f45eb70bbe79ca2105 Mon Sep 17 00:00:00 2001 From: zivbleich Date: Wed, 27 May 2026 11:15:06 +0300 Subject: [PATCH] feat: Add HTTP CONNECT proxy support (RFC 7231 s4.3.6) Adds HttpConnectProxy, registered for the `http` scheme, which tunnels broker connections through an HTTP CONNECT proxy. Pass proxy_url='http://host:port' (or 'http://user:pass@host:port' for basic auth) to KafkaConsumer/KafkaProducer/KafkaAdminClient. Co-Authored-By: Claude Sonnet 4.6 --- CHANGES.md | 1 + kafka/net/__init__.py | 3 +- kafka/net/http_connect.py | 144 ++++++++++++++++++++++++++++++++++ test/net/test_http_connect.py | 116 +++++++++++++++++++++++++++ test/net/test_inet.py | 9 ++- 5 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 kafka/net/http_connect.py create mode 100644 test/net/test_http_connect.py diff --git a/CHANGES.md b/CHANGES.md index 17da77790..141f3f2ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,7 @@ Networking * kafka.net: SSL transport support (#2813) * kafka.net: SASL authentication support (#2814) * kafka.net: SOCKS5 proxy support (#2815) +* kafka.net: HTTP CONNECT proxy support (RFC 7231 s4.3.6) (#2990) * kafka.net: Metrics tracking (#2834) * KafkaNetClient: drop-in replacement for KafkaClient using kafka.net (#2816) * Future.__await__ support (#2811) diff --git a/kafka/net/__init__.py b/kafka/net/__init__.py index 1fa999b16..f7cc11384 100644 --- a/kafka/net/__init__.py +++ b/kafka/net/__init__.py @@ -3,6 +3,7 @@ from .manager import KafkaConnectionManager from .metrics import KafkaConnectionMetrics, KafkaManagerMetrics from .selector import NetworkSelector +from .http_connect import HttpConnectProxy from .socks5 import Socks5Proxy from .transport import KafkaTCPTransport, KafkaSSLTransport from .wakeup_notifier import WakeupNotifier @@ -13,6 +14,6 @@ __all__ = [ 'KafkaConnection', 'create_connection', 'KafkaNetSocket', 'KafkaConnectionManager', 'KafkaConnectionMetrics', 'KafkaManagerMetrics', - 'NetworkSelector', 'Socks5Proxy', 'KafkaTCPTransport', 'KafkaSSLTransport', + 'NetworkSelector', 'HttpConnectProxy', 'Socks5Proxy', 'KafkaTCPTransport', 'KafkaSSLTransport', 'WakeupNotifier', 'KafkaNetClient', ] diff --git a/kafka/net/http_connect.py b/kafka/net/http_connect.py new file mode 100644 index 000000000..ec646730a --- /dev/null +++ b/kafka/net/http_connect.py @@ -0,0 +1,144 @@ +import base64 +import errno +import logging +import random +import socket +from urllib.parse import urlparse + +from kafka.errors import KafkaConnectionError +from kafka.net.inet import KafkaNetSocket + + +log = logging.getLogger(__name__) + +_WOULD_BLOCK = {errno.EWOULDBLOCK, errno.EAGAIN} +_MAX_RESPONSE_SIZE = 65536 + + +class _States: + DISCONNECTED = '' + CONNECTING = '' + SENDING = '' + READING = '' + COMPLETE = '' + + +class HttpConnectProxy(KafkaNetSocket): + """Tunnels broker connections through an HTTP CONNECT proxy (RFC 7231 s4.3.6). + + Registered for the ``http`` scheme -- pass ``proxy_url='http://host:port'`` + to KafkaConsumer/KafkaProducer/KafkaAdminClient. + + Basic proxy auth is supported via URL credentials: ``http://user:pass@host:8080``. + Broker hostnames are always forwarded unresolved so the proxy handles DNS. + """ + + SCHEMES = ('http',) + + def __init__(self, proxy_url): + self._proxy_url = urlparse(proxy_url) + self._sock = None + self._state = _States.DISCONNECTED + self._send_buf = b'' + self._recv_buf = b'' + self._proxy_addr = self._get_proxy_addr() + + def _get_proxy_addr(self): + addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, proxy=True) + if not addrs: + raise KafkaConnectionError('Unable to resolve proxy_url via dns') + return random.choice(addrs) + + def dns_lookup(self, host, port, proxy=False): + if proxy: + return super().dns_lookup(host, port, raise_error=True) + # Always forward broker hostname unresolved; the proxy handles DNS + return [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', (host, port))] + + def socket(self, family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP): + self._target_afi = family + proxy_family, _, _, _, _ = self._proxy_addr + self._sock = socket.socket(proxy_family, sock_type, proto) + return self._sock + + def connect_ex(self, sock, addr): + assert sock is self._sock + + if self._state == _States.DISCONNECTED: + self._state = _States.CONNECTING + + if self._state == _States.CONNECTING: + ret = self._do_connecting(addr) + if ret is not None: + return ret + + if self._state == _States.SENDING: + ret = self._do_sending() + if ret is not None: + return ret + + if self._state == _States.READING: + ret = self._do_reading() + if ret is not None: + return ret + + if self._state == _States.COMPLETE: + return 0 + + return errno.ECONNREFUSED + + def _do_connecting(self, addr): + _, _, _, _, proxy_sockaddr = self._proxy_addr + ret = self._sock.connect_ex(proxy_sockaddr) + if ret and ret != errno.EISCONN: + return ret + host, port = addr[0], addr[1] + headers = 'CONNECT {0}:{1} HTTP/1.1\r\nHost: {0}:{1}\r\n'.format(host, port) + if self._proxy_url.username and self._proxy_url.password: + credentials = base64.b64encode( + '{0}:{1}'.format(self._proxy_url.username, self._proxy_url.password).encode() + ).decode() + headers += 'Proxy-Authorization: Basic {}\r\n'.format(credentials) + self._send_buf = (headers + '\r\n').encode() + self._state = _States.SENDING + return None + + def _do_sending(self): + while self._send_buf: + try: + sent = self._sock.send(self._send_buf) + if sent == 0: + log.error('Proxy closed connection while sending CONNECT request') + return errno.ECONNREFUSED + self._send_buf = self._send_buf[sent:] + except OSError as exc: + if exc.errno in _WOULD_BLOCK: + return errno.EWOULDBLOCK + raise + self._state = _States.READING + return None + + def _do_reading(self): + while b'\r\n\r\n' not in self._recv_buf: + try: + chunk = self._sock.recv(4096) + if not chunk: + log.error('Proxy closed connection during CONNECT handshake') + self._sock.close() + return errno.ECONNREFUSED + self._recv_buf += chunk + if len(self._recv_buf) > _MAX_RESPONSE_SIZE: + log.error('Proxy response exceeded %d bytes without end-of-headers', _MAX_RESPONSE_SIZE) + self._sock.close() + return errno.ECONNREFUSED + except OSError as exc: + if exc.errno in _WOULD_BLOCK: + return errno.EWOULDBLOCK + raise + first_line = self._recv_buf.split(b'\r\n')[0] + if b' 200 ' in first_line or first_line.endswith(b' 200'): + self._state = _States.COMPLETE + return None + log.error('HTTP CONNECT to proxy failed: %r', first_line) + self._sock.close() + return errno.ECONNREFUSED diff --git a/test/net/test_http_connect.py b/test/net/test_http_connect.py new file mode 100644 index 000000000..a1df08595 --- /dev/null +++ b/test/net/test_http_connect.py @@ -0,0 +1,116 @@ +import errno +import socket +from unittest.mock import MagicMock, patch + +import pytest + +from kafka.net.http_connect import HttpConnectProxy +from kafka.net.inet import KafkaNetSocket + + +_FAKE_PROXY_ADDR = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('1.2.3.4', 8080)) + + +def _make_proxy(url='http://proxy:8080'): + with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]): + proxy = HttpConnectProxy(url) + proxy._sock = MagicMock() + return proxy + + +class TestHttpConnectProxyRegistry: + def test_registered_for_http_scheme(self): + with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]): + obj = KafkaNetSocket('http://proxy:8080') + assert isinstance(obj, HttpConnectProxy) + + def test_unregistered_scheme_raises(self): + with pytest.raises(ValueError, match='Unsupported proxy url scheme'): + KafkaNetSocket('socks4://proxy:8080') + + +class TestHttpConnectProxyDnsLookup: + def test_broker_lookup_returns_unresolved(self): + proxy = _make_proxy() + result = proxy.dns_lookup('broker.kafka.internal', 9092) + assert result == [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('broker.kafka.internal', 9092))] + + def test_proxy_lookup_delegates_to_super(self): + with patch('socket.getaddrinfo', return_value=[_FAKE_PROXY_ADDR]) as mock_gai: + proxy = _make_proxy() + proxy.dns_lookup('proxy', 8080, proxy=True) + mock_gai.assert_called() + + +class TestHttpConnectProxySocket: + def test_uses_proxy_family_not_broker_family(self): + proxy = _make_proxy() + with patch('socket.socket') as mock_ctor: + proxy.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) + mock_ctor.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) + + +class TestHttpConnectProxyConnectEx: + def test_success(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n' + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 + + def test_success_no_reason_phrase(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.return_value = b'HTTP/1.1 200\r\n\r\n' + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 + + def test_basic_auth_header_sent_when_credentials_in_url(self): + import base64 + proxy = _make_proxy('http://user:pass@proxy:8080') + proxy._sock.connect_ex.return_value = 0 + sent = [] + proxy._sock.send.side_effect = lambda b: sent.append(b) or len(b) + proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n' + proxy.connect_ex(proxy._sock, ('broker', 9092)) + request = b''.join(sent).decode() + expected = base64.b64encode(b'user:pass').decode() + assert 'Proxy-Authorization: Basic {}'.format(expected) in request + + def test_non_200_response_returns_econnrefused(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.return_value = b'HTTP/1.1 407 Proxy Authentication Required\r\n\r\n' + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED + + def test_ewouldblock_while_sending(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = OSError(errno.EWOULDBLOCK, 'would block') + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK + + def test_ewouldblock_while_reading(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.side_effect = OSError(errno.EWOULDBLOCK, 'would block') + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK + + def test_resumes_after_ewouldblock(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.side_effect = [ + OSError(errno.EWOULDBLOCK, 'would block'), + b'HTTP/1.1 200 Connection Established\r\n\r\n', + ] + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0 + + def test_eof_during_handshake_returns_econnrefused(self): + proxy = _make_proxy() + proxy._sock.connect_ex.return_value = 0 + proxy._sock.send.side_effect = lambda b: len(b) + proxy._sock.recv.return_value = b'' + assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED diff --git a/test/net/test_inet.py b/test/net/test_inet.py index d320beba0..d3ef76be2 100644 --- a/test/net/test_inet.py +++ b/test/net/test_inet.py @@ -7,6 +7,7 @@ from kafka.net.selector import NetworkSelector from kafka.net.inet import create_connection, KafkaNetSocket from kafka.net.socks5 import Socks5Proxy +from kafka.net.http_connect import HttpConnectProxy import kafka.errors as Errors @@ -239,13 +240,19 @@ def test_socks5h(self): factory = KafkaNetSocket('socks5h://foo.bar') assert isinstance(factory, Socks5Proxy) + def test_http(self): + assert 'http' in KafkaNetSocket._registry + with patch('kafka.net.http_connect.HttpConnectProxy._get_proxy_addr'): + factory = KafkaNetSocket('http://proxy:8080') + assert isinstance(factory, HttpConnectProxy) + def test_default(self): factory = KafkaNetSocket() assert type(factory) is KafkaNetSocket def test_unknown_scheme_raises(self): with pytest.raises(ValueError, match='Unsupported proxy url scheme'): - KafkaNetSocket('http://proxy:8080') + KafkaNetSocket('ftp://proxy:8080') def test_no_scheme_raises(self): with pytest.raises(ValueError, match='scheme'):