Skip to content

Commit

Permalink
Wrap SSL sockets after connecting (#1754)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 22, 2019
1 parent 64f70b5 commit f2f2bfe
Showing 1 changed file with 11 additions and 19 deletions.
30 changes: 11 additions & 19 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,14 +356,9 @@ def connect(self):

self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
# _wrap_ssl can alter the connection state -- disconnects on failure
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand All @@ -373,29 +368,29 @@ def connect(self):
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
except ValueError as err:
# Python 3.7 and higher raises ValueError if a socket
# is already connected
if sys.version_info >= (3, 7):
ret = None
else:
raise

# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', self)

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
self.config['state_change_callback'](self)
# _wrap_ssl can alter the connection state -- disconnects on failure
self._wrap_ssl()

elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
self.config['state_change_callback'](self)

else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
self.config['state_change_callback'](self)

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
Expand Down Expand Up @@ -486,9 +481,6 @@ def _try_handshake(self):
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
pass
# python 3.7 throws OSError
except OSError:
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
Expand Down

0 comments on commit f2f2bfe

Please sign in to comment.