Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap SSL sockets after connecting #1754

Merged
merged 1 commit into from
Mar 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,14 +356,9 @@ def connect(self):

self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
# _wrap_ssl can alter the connection state -- disconnects on failure
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
Expand All @@ -373,29 +368,29 @@ def connect(self):
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
except ValueError as err:
# Python 3.7 and higher raises ValueError if a socket
# is already connected
if sys.version_info >= (3, 7):
ret = None
else:
raise

# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', self)

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
self.config['state_change_callback'](self)
# _wrap_ssl can alter the connection state -- disconnects on failure
self._wrap_ssl()

elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
self.config['state_change_callback'](self)

else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
self.config['state_change_callback'](self)
self.config['state_change_callback'](self)

# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
Expand Down Expand Up @@ -486,9 +481,6 @@ def _try_handshake(self):
# old ssl in python2.6 will swallow all SSLErrors here...
except (SSLWantReadError, SSLWantWriteError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also check whether python < 2.7 before pass'ing? Just in the off-chance these exceptions get-used later?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, see the import hackery above. this is an old problem. we could try to check exception error codes etc, but unless there's a real user asking for help w/ old python installs I'm not that interested in adding more compatibility layers here.

Plus python 2.7 is EOL very soon...

pass
# python 3.7 throws OSError
except OSError:
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
Expand Down