Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Networking
* kafka.net: SSL transport support (#2813)
* kafka.net: SASL authentication support (#2814)
* kafka.net: SOCKS5 proxy support (#2815)
* kafka.net: HTTP CONNECT proxy support (RFC 7231 s4.3.6) (#2990)
* kafka.net: Metrics tracking (#2834)
* KafkaNetClient: drop-in replacement for KafkaClient using kafka.net (#2816)
* Future.__await__ support (#2811)
Expand Down
3 changes: 2 additions & 1 deletion kafka/net/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .manager import KafkaConnectionManager
from .metrics import KafkaConnectionMetrics, KafkaManagerMetrics
from .selector import NetworkSelector
from .http_connect import HttpConnectProxy
from .socks5 import Socks5Proxy
from .transport import KafkaTCPTransport, KafkaSSLTransport
from .wakeup_notifier import WakeupNotifier
Expand All @@ -13,6 +14,6 @@
__all__ = [
'KafkaConnection', 'create_connection', 'KafkaNetSocket',
'KafkaConnectionManager', 'KafkaConnectionMetrics', 'KafkaManagerMetrics',
'NetworkSelector', 'Socks5Proxy', 'KafkaTCPTransport', 'KafkaSSLTransport',
'NetworkSelector', 'HttpConnectProxy', 'Socks5Proxy', 'KafkaTCPTransport', 'KafkaSSLTransport',
'WakeupNotifier', 'KafkaNetClient',
]
144 changes: 144 additions & 0 deletions kafka/net/http_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import base64
import errno
import logging
import random
import socket
from urllib.parse import urlparse

from kafka.errors import KafkaConnectionError
from kafka.net.inet import KafkaNetSocket


log = logging.getLogger(__name__)

_WOULD_BLOCK = {errno.EWOULDBLOCK, errno.EAGAIN}
_MAX_RESPONSE_SIZE = 65536


class _States:
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
SENDING = '<sending>'
READING = '<reading>'
COMPLETE = '<complete>'


class HttpConnectProxy(KafkaNetSocket):
"""Tunnels broker connections through an HTTP CONNECT proxy (RFC 7231 s4.3.6).

Registered for the ``http`` scheme -- pass ``proxy_url='http://host:port'``
to KafkaConsumer/KafkaProducer/KafkaAdminClient.

Basic proxy auth is supported via URL credentials: ``http://user:pass@host:8080``.
Broker hostnames are always forwarded unresolved so the proxy handles DNS.
"""

SCHEMES = ('http',)

def __init__(self, proxy_url):
self._proxy_url = urlparse(proxy_url)
self._sock = None
self._state = _States.DISCONNECTED
self._send_buf = b''
self._recv_buf = b''
self._proxy_addr = self._get_proxy_addr()

def _get_proxy_addr(self):
addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, proxy=True)
if not addrs:
raise KafkaConnectionError('Unable to resolve proxy_url via dns')
return random.choice(addrs)

def dns_lookup(self, host, port, proxy=False):
if proxy:
return super().dns_lookup(host, port, raise_error=True)
# Always forward broker hostname unresolved; the proxy handles DNS
return [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', (host, port))]

def socket(self, family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP):
self._target_afi = family
proxy_family, _, _, _, _ = self._proxy_addr
self._sock = socket.socket(proxy_family, sock_type, proto)
return self._sock

def connect_ex(self, sock, addr):
assert sock is self._sock

if self._state == _States.DISCONNECTED:
self._state = _States.CONNECTING

if self._state == _States.CONNECTING:
ret = self._do_connecting(addr)
if ret is not None:
return ret

if self._state == _States.SENDING:
ret = self._do_sending()
if ret is not None:
return ret

if self._state == _States.READING:
ret = self._do_reading()
if ret is not None:
return ret

if self._state == _States.COMPLETE:
return 0

return errno.ECONNREFUSED

def _do_connecting(self, addr):
_, _, _, _, proxy_sockaddr = self._proxy_addr
ret = self._sock.connect_ex(proxy_sockaddr)
if ret and ret != errno.EISCONN:
return ret
host, port = addr[0], addr[1]
headers = 'CONNECT {0}:{1} HTTP/1.1\r\nHost: {0}:{1}\r\n'.format(host, port)
if self._proxy_url.username and self._proxy_url.password:
credentials = base64.b64encode(
'{0}:{1}'.format(self._proxy_url.username, self._proxy_url.password).encode()
).decode()
headers += 'Proxy-Authorization: Basic {}\r\n'.format(credentials)
self._send_buf = (headers + '\r\n').encode()
self._state = _States.SENDING
return None

def _do_sending(self):
while self._send_buf:
try:
sent = self._sock.send(self._send_buf)
if sent == 0:
log.error('Proxy closed connection while sending CONNECT request')
return errno.ECONNREFUSED
self._send_buf = self._send_buf[sent:]
except OSError as exc:
if exc.errno in _WOULD_BLOCK:
return errno.EWOULDBLOCK
raise
self._state = _States.READING
return None

def _do_reading(self):
while b'\r\n\r\n' not in self._recv_buf:
try:
chunk = self._sock.recv(4096)
if not chunk:
log.error('Proxy closed connection during CONNECT handshake')
self._sock.close()
return errno.ECONNREFUSED
self._recv_buf += chunk
if len(self._recv_buf) > _MAX_RESPONSE_SIZE:
log.error('Proxy response exceeded %d bytes without end-of-headers', _MAX_RESPONSE_SIZE)
self._sock.close()
return errno.ECONNREFUSED
except OSError as exc:
if exc.errno in _WOULD_BLOCK:
return errno.EWOULDBLOCK
raise
first_line = self._recv_buf.split(b'\r\n')[0]
if b' 200 ' in first_line or first_line.endswith(b' 200'):
self._state = _States.COMPLETE
return None
log.error('HTTP CONNECT to proxy failed: %r', first_line)
self._sock.close()
return errno.ECONNREFUSED
116 changes: 116 additions & 0 deletions test/net/test_http_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import errno
import socket
from unittest.mock import MagicMock, patch

import pytest

from kafka.net.http_connect import HttpConnectProxy
from kafka.net.inet import KafkaNetSocket


_FAKE_PROXY_ADDR = (socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('1.2.3.4', 8080))


def _make_proxy(url='http://proxy:8080'):
with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]):
proxy = HttpConnectProxy(url)
proxy._sock = MagicMock()
return proxy


class TestHttpConnectProxyRegistry:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test class seems redundant w/ test_inet.py -- I think we can remove

def test_registered_for_http_scheme(self):
with patch.object(HttpConnectProxy, 'dns_lookup', return_value=[_FAKE_PROXY_ADDR]):
obj = KafkaNetSocket('http://proxy:8080')
assert isinstance(obj, HttpConnectProxy)

def test_unregistered_scheme_raises(self):
with pytest.raises(ValueError, match='Unsupported proxy url scheme'):
KafkaNetSocket('socks4://proxy:8080')


class TestHttpConnectProxyDnsLookup:
def test_broker_lookup_returns_unresolved(self):
proxy = _make_proxy()
result = proxy.dns_lookup('broker.kafka.internal', 9092)
assert result == [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('broker.kafka.internal', 9092))]

def test_proxy_lookup_delegates_to_super(self):
with patch('socket.getaddrinfo', return_value=[_FAKE_PROXY_ADDR]) as mock_gai:
proxy = _make_proxy()
proxy.dns_lookup('proxy', 8080, proxy=True)
mock_gai.assert_called()


class TestHttpConnectProxySocket:
def test_uses_proxy_family_not_broker_family(self):
proxy = _make_proxy()
with patch('socket.socket') as mock_ctor:
proxy.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
mock_ctor.assert_called_once_with(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)


class TestHttpConnectProxyConnectEx:
def test_success(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n'
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0

def test_success_no_reason_phrase(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.return_value = b'HTTP/1.1 200\r\n\r\n'
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0

def test_basic_auth_header_sent_when_credentials_in_url(self):
import base64
proxy = _make_proxy('http://user:pass@proxy:8080')
proxy._sock.connect_ex.return_value = 0
sent = []
proxy._sock.send.side_effect = lambda b: sent.append(b) or len(b)
proxy._sock.recv.return_value = b'HTTP/1.1 200 Connection Established\r\n\r\n'
proxy.connect_ex(proxy._sock, ('broker', 9092))
request = b''.join(sent).decode()
expected = base64.b64encode(b'user:pass').decode()
assert 'Proxy-Authorization: Basic {}'.format(expected) in request

def test_non_200_response_returns_econnrefused(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.return_value = b'HTTP/1.1 407 Proxy Authentication Required\r\n\r\n'
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED

def test_ewouldblock_while_sending(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = OSError(errno.EWOULDBLOCK, 'would block')
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK

def test_ewouldblock_while_reading(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.side_effect = OSError(errno.EWOULDBLOCK, 'would block')
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK

def test_resumes_after_ewouldblock(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.side_effect = [
OSError(errno.EWOULDBLOCK, 'would block'),
b'HTTP/1.1 200 Connection Established\r\n\r\n',
]
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.EWOULDBLOCK
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == 0

def test_eof_during_handshake_returns_econnrefused(self):
proxy = _make_proxy()
proxy._sock.connect_ex.return_value = 0
proxy._sock.send.side_effect = lambda b: len(b)
proxy._sock.recv.return_value = b''
assert proxy.connect_ex(proxy._sock, ('broker', 9092)) == errno.ECONNREFUSED
9 changes: 8 additions & 1 deletion test/net/test_inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from kafka.net.selector import NetworkSelector
from kafka.net.inet import create_connection, KafkaNetSocket
from kafka.net.socks5 import Socks5Proxy
from kafka.net.http_connect import HttpConnectProxy
import kafka.errors as Errors


Expand Down Expand Up @@ -239,13 +240,19 @@ def test_socks5h(self):
factory = KafkaNetSocket('socks5h://foo.bar')
assert isinstance(factory, Socks5Proxy)

def test_http(self):
assert 'http' in KafkaNetSocket._registry
with patch('kafka.net.http_connect.HttpConnectProxy._get_proxy_addr'):
factory = KafkaNetSocket('http://proxy:8080')
assert isinstance(factory, HttpConnectProxy)

def test_default(self):
factory = KafkaNetSocket()
assert type(factory) is KafkaNetSocket

def test_unknown_scheme_raises(self):
with pytest.raises(ValueError, match='Unsupported proxy url scheme'):
KafkaNetSocket('http://proxy:8080')
KafkaNetSocket('ftp://proxy:8080')

def test_no_scheme_raises(self):
with pytest.raises(ValueError, match='scheme'):
Expand Down
Loading