From f1449b81d6644379b1053afc9e03d39101d6ba03 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 21 Mar 2019 08:26:59 -0700 Subject: [PATCH] Wrap SSL sockets after connecting --- kafka/conn.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 28f9f3c38..cdc0a86dd 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -356,14 +356,9 @@ def connect(self): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING - if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): - self._wrap_ssl() - # _wrap_ssl can alter the connection state -- disconnects on failure - # so we need to double check that we are still connecting before - if self.connecting(): - self.config['state_change_callback'](self) - log.info('%s: connecting to %s:%d [%s %s]', self, self.host, - self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) + self.config['state_change_callback'](self) + log.info('%s: connecting to %s:%d [%s %s]', self, self.host, + self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -373,29 +368,29 @@ def connect(self): ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno - except ValueError as err: - # Python 3.7 and higher raises ValueError if a socket - # is already connected - if sys.version_info >= (3, 7): - ret = None - else: - raise # Connection succeeded if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', self) + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', self) self.state = ConnectionStates.HANDSHAKE + self.config['state_change_callback'](self) + # _wrap_ssl can alter the connection state -- disconnects on failure + self._wrap_ssl() + elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING + self.config['state_change_callback'](self) + else: # security_protocol PLAINTEXT log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -486,9 +481,6 @@ def _try_handshake(self): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - # python 3.7 throws OSError - except OSError: - pass except (SSLZeroReturnError, ConnectionError, SSLEOFError): log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))