Skip to content

Commit

Permalink
Avoid call to wakeup from sender thread for maybe_connect
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Mar 23, 2019
1 parent 3c5dfef commit 545cdb1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
5 changes: 3 additions & 2 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,15 @@ def _conn_state_change(self, node_id, conn):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()

def maybe_connect(self, node_id):
def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):
self._connecting.add(node_id)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
self.wakeup()
if wakeup:
self.wakeup()
return True
return False

Expand Down
3 changes: 2 additions & 1 deletion kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ def run_once(self):
# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
for node in list(ready_nodes):
if not self._client.ready(node):
if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
Expand Down

0 comments on commit 545cdb1

Please sign in to comment.