Skip to content

Commit fb9bebc

Browse files
committed
use_remote_lookup(proxy_url) classmethod; add debug statement for proxy init; consolidate buffer_out; fix port H encoding
1 parent b3b9a86 commit fb9bebc

File tree

2 files changed

+28
-21
lines changed

2 files changed

+28
-21
lines changed

kafka/conn.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ def _dns_lookup(self):
326326
return True
327327

328328
def _next_afi_sockaddr(self):
329-
if self._socks5_proxy and self._socks5_proxy.use_remote_lookup():
329+
if self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):
330330
return (socket.AF_UNSPEC, (self.host, self.port))
331331

332332
if not self._gai:
@@ -382,6 +382,7 @@ def connect(self):
382382
self._sock_afi, self._sock_addr = next_lookup
383383
try:
384384
if self.config["socks5_proxy"] is not None:
385+
log.debug('%s: initializing Socks5 proxy at %s', self, self.config["socks5_proxy"])
385386
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
386387
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
387388
else:
@@ -867,7 +868,7 @@ def connection_delay(self):
867868
if self.disconnected() or self.connecting():
868869
if len(self._gai) > 0:
869870
return 0
870-
elif self._socks5_proxy and self._socks5_proxy.use_remote_lookup():
871+
elif Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]):
871872
return 0
872873
else:
873874
time_waited = time.time() - self.last_attempt

kafka/socks5_wrapper.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
6464
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
6565
return []
6666

67-
def use_remote_lookup(self):
67+
@classmethod
68+
def use_remote_lookup(cls, proxy_url):
69+
return urlparse(proxy_url).scheme == 'socks5h'
70+
71+
def _use_remote_lookup(self):
6872
return self._proxy_url.scheme == 'socks5h'
6973

7074
def socket(self, family, sock_type):
@@ -190,7 +194,7 @@ def connect_ex(self, addr):
190194
return errno.ECONNREFUSED
191195

192196
if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
193-
if self.use_remote_lookup():
197+
if self._use_remote_lookup():
194198
addr_type = 3
195199
addr_len = len(addr[0])
196200
elif self._target_afi == socket.AF_INET:
@@ -205,27 +209,29 @@ def connect_ex(self, addr):
205209
self._sock.close()
206210
return errno.ECONNREFUSED
207211

208-
if self.use_remote_lookup():
209-
self._buffer_out = struct.pack(
210-
"!bbbbb{}sh".format(addr_len),
211-
5, # version
212-
1, # command: connect
213-
0, # reserved
214-
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
212+
self._buffer_out = struct.pack(
213+
"!bbbb",
214+
5, # version
215+
1, # command: connect
216+
0, # reserved
217+
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
218+
)
219+
# Addr format depends on type
220+
if addr_type == 3:
221+
# len + domain name (no null terminator)
222+
self._buffer_out += struct.pack(
223+
"!b{}s".format(addr_len),
215224
addr_len,
216-
addr[0].encode('ascii'), # host
217-
addr[1], # port
225+
addr[0].encode('ascii'),
218226
)
219227
else:
220-
self._buffer_out = struct.pack(
221-
"!bbbb{}sh".format(addr_len),
222-
5, # version
223-
1, # command: connect
224-
0, # reserved
225-
addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name
226-
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
227-
addr[1], # port
228+
# either 4 (type 1) or 16 (type 4) bytes of actual address
229+
self._buffer_out += struct.pack(
230+
"!{}s".format(addr_len),
231+
socket.inet_pton(self._target_afi, addr[0]),
228232
)
233+
self._buffer_out += struct.pack("!H", addr[1]) # port
234+
229235
self._state = ProxyConnectionStates.REQUESTING
230236

231237
if self._state == ProxyConnectionStates.REQUESTING:

0 commit comments

Comments
 (0)