Skip to content

Commit

Permalink
Dont do client wakeup when sending from sender thread (#1761)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 24, 2019
1 parent ce9c1d2 commit d388b48
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
11 changes: 7 additions & 4 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,15 @@ def _conn_state_change(self, node_id, conn):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()

def maybe_connect(self, node_id):
def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):
self._connecting.add(node_id)
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
self.wakeup()
if wakeup:
self.wakeup()
return True
return False

Expand Down Expand Up @@ -499,14 +500,15 @@ def _can_send_request(self, node_id):
return False
return conn.connected() and conn.can_send_more()

def send(self, node_id, request):
def send(self, node_id, request, wakeup=True):
"""Send a request to a specific node. Bytes are placed on an
internal per-connection send-queue. Actual network I/O will be
triggered in a subsequent call to .poll()
Arguments:
node_id (int): destination node
request (Struct): request object (not-encoded)
wakeup (bool): optional flag to disable thread-wakeup
Raises:
AssertionError: if node_id is not in current cluster metadata
Expand All @@ -526,7 +528,8 @@ def send(self, node_id, request):
# Wakeup signal is useful in case another thread is
# blocked waiting for incoming network traffic while holding
# the client lock in poll().
self.wakeup()
if wakeup:
self.wakeup()

return future

Expand Down
5 changes: 3 additions & 2 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ def run_once(self):
# remove any nodes we aren't ready to send to
not_ready_timeout = float('inf')
for node in list(ready_nodes):
if not self._client.ready(node):
if not self._client.is_ready(node):
log.debug('Node %s not ready; delaying produce of accumulated batch', node)
self._client.maybe_connect(node, wakeup=False)
ready_nodes.remove(node)
not_ready_timeout = min(not_ready_timeout,
self._client.connection_delay(node))
Expand Down Expand Up @@ -144,7 +145,7 @@ def run_once(self):
for node_id, request in six.iteritems(requests):
batches = batches_by_node[node_id]
log.debug('Sending Produce Request: %r', request)
(self._client.send(node_id, request)
(self._client.send(node_id, request, wakeup=False)
.add_callback(
self._handle_produce_response, node_id, time.time(), batches)
.add_errback(
Expand Down

0 comments on commit d388b48

Please sign in to comment.