Skip to content

Commit

Permalink
Add BrokerConnection.connect_blocking() (#1411)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 9, 2018
1 parent 4cbeb2e commit 1ffdd5c
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 36 deletions.
12 changes: 1 addition & 11 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,7 @@ def _get_conn(self, host, port, afi):
)

conn = self._conns[host_key]
conn.connect()
if conn.connected():
return conn

timeout = time.time() + self.timeout
while time.time() < timeout and conn.connecting():
if conn.connect() is ConnectionStates.CONNECTED:
break
else:
time.sleep(0.05)
else:
if not conn.connect_blocking(self.timeout):
conn.close()
raise ConnectionError("%s:%s (%s)" % (host, port, afi))
return conn
Expand Down
6 changes: 1 addition & 5 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,7 @@ def _bootstrap(self, hosts):
state_change_callback=cb,
node_id='bootstrap',
**self.config)
bootstrap.connect()
while bootstrap.connecting():
self._selector.select(1)
bootstrap.connect()
if not bootstrap.connected():
if not bootstrap.connect_blocking():
bootstrap.close()
continue
future = bootstrap.send(metadata_request)
Expand Down
64 changes: 47 additions & 17 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,58 @@ def __init__(self, host, port, afi, **configs):
self.config['metric_group_prefix'],
self.node_id)

def _dns_lookup(self):
self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
self.host, self.port, self.afi)
return False
return True

def _next_afi_host_port(self):
if not self._gai:
self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
self.host, self.port, self.afi)
if not self._dns_lookup():
return

afi, _, __, ___, sockaddr = self._gai.pop(0)
host, port = sockaddr[:2]
return (afi, host, port)

def connect_blocking(self, timeout=float('inf')):
if self.connected():
return True
timeout += time.time()
# First attempt to perform dns lookup
# note that the underlying interface, socket.getaddrinfo,
# has no explicit timeout so we may exceed the user-specified timeout
while time.time() < timeout:
if self._dns_lookup():
break
else:
return False

# Loop once over all returned dns entries
selector = None
while self._gai:
while time.time() < timeout:
self.connect()
if self.connected():
if selector is not None:
selector.close()
return True
elif self.connecting():
if selector is None:
selector = self.config['selector']()
selector.register(self._sock, selectors.EVENT_WRITE)
selector.select(1)
elif self.disconnected():
if selector is not None:
selector.close()
selector = None
break
else:
break
return False

def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
Expand Down Expand Up @@ -903,19 +943,9 @@ def filter(self, record):
((0, 8, 0), MetadataRequest[0]([])),
]

def connect():
self.connect()
if self.connected():
return
timeout_at = time.time() + timeout
while time.time() < timeout_at and self.connecting():
if self.connect() is ConnectionStates.CONNECTED:
return
time.sleep(0.05)
raise Errors.NodeNotReadyError()

for version, request in test_cases:
connect()
if not self.connect_blocking(timeout):
raise Errors.NodeNotReadyError()
f = self.send(request)
# HACK: sleeping to wait for socket to send bytes
time.sleep(0.1)
Expand Down
1 change: 1 addition & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _set_conn_state(state):
return state
conn._set_conn_state = _set_conn_state
conn.connect.side_effect = lambda: conn.state
conn.connect_blocking.return_value = True
conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
Expand Down
8 changes: 5 additions & 3 deletions test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,22 @@ def test_bootstrap_success(conn):
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.connect_blocking.assert_called_with()
conn.send.assert_called_once_with(MetadataRequest[0]([]))
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])


def test_bootstrap_failure(conn):
conn.state = ConnectionStates.DISCONNECTED
conn.connect_blocking.return_value = False
cli = KafkaClient(api_version=(0, 9))
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
conn.connect.assert_called_with()
conn.connect_blocking.assert_called_with()
conn.close.assert_called_with()
assert cli._bootstrap_fails == 1
assert cli.cluster.brokers() == set()
Expand All @@ -95,6 +96,7 @@ def test_can_connect(cli, conn):
conn.blacked_out.return_value = True
assert not cli._can_connect(0)


def test_maybe_connect(cli, conn):
try:
# Node not in metadata, raises AssertionError
Expand Down

0 comments on commit 1ffdd5c

Please sign in to comment.