Skip to content

Commit

Permalink
Fix BrokerConnection.connection_delay() to return milliseconds (#1414)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 8, 2018
1 parent 4c383da commit b33a651
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
11 changes: 9 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,16 @@ def blacked_out(self):
return False

def connection_delay(self):
time_waited_ms = time.time() - (self.last_attempt or 0)
"""
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
number to handle slow/stalled connections.
"""
time_waited = time.time() - (self.last_attempt or 0)
if self.state is ConnectionStates.DISCONNECTED:
return max(self._reconnect_backoff - time_waited_ms, 0)
return max(self._reconnect_backoff - time_waited, 0) * 1000
elif self.connecting():
return 0
else:
Expand Down
9 changes: 9 additions & 0 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ def test_blacked_out(conn):
assert conn.blacked_out() is True


def test_connection_delay(conn):
conn.last_attempt = time.time()
assert round(conn.connection_delay()) == round(conn.config['reconnect_backoff_ms'])
conn.state = ConnectionStates.CONNECTING
assert conn.connection_delay() == 0
conn.state = ConnectionStates.CONNECTED
assert conn.connection_delay() == float('inf')


def test_connected(conn):
assert conn.connected() is False
conn.state = ConnectionStates.CONNECTED
Expand Down

0 comments on commit b33a651

Please sign in to comment.