From 98119e9b9f58e3d5de68dca18e16e9ad8bf77375 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 8 Mar 2018 13:13:00 -0500 Subject: [PATCH] Defer version check until after bootstrap succeeds --- kafka/client_async.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58155b880..b4366c890 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -196,6 +196,9 @@ def __init__(self, **configs): assert self.config['api_version'] in self.API_VERSIONS, ( 'api_version [{0}] must be one of: {1}'.format( self.config['api_version'], str(self.API_VERSIONS))) + else: + # This should get updated after a successful bootstrap + self.config['api_version'] = (0, 0) self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata @@ -228,11 +231,6 @@ def __init__(self, **configs): self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) - # Check Broker Version if not set explicitly - if self.config['api_version'] is None: - check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 - self.config['api_version'] = self.check_version(timeout=check_timeout) - def _bootstrap(self, hosts): log.info('Bootstrapping cluster metadata from %s', hosts) # Exponential backoff if bootstrap fails @@ -245,7 +243,7 @@ def _bootstrap(self, hosts): time.sleep(next_at - now) self._last_bootstrap = time.time() - if self.config['api_version'] is None or self.config['api_version'] < (0, 10): + if self.config['api_version'] < (0, 10): metadata_request = MetadataRequest[0]([]) else: metadata_request = MetadataRequest[1](None) @@ -283,7 +281,13 @@ def _bootstrap(self, hosts): else: bootstrap.close() self._bootstrap_fails = 0 + + # Check Broker Version if not set explicitly + if self.config['api_version'] == (0, 0): + check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 + self.config['api_version'] = self.check_version(timeout=check_timeout) break + # No bootstrap found... else: log.error('Unable to bootstrap from %s', hosts) @@ -821,10 +825,10 @@ def check_version(self, node_id=None, timeout=2, strict=False): This is only possible if node_id is None. Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... + (0, 0) returned if the version cannot be determined, + typically due to networking. Raises: - NodeNotReadyError (if node_id is provided) - NoBrokersAvailable (if node_id is None) UnrecognizedBrokerVersion: please file bug if seen! AssertionError (if strict=True): please file bug if seen! """ @@ -835,7 +839,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): # which can block for an increasing backoff period try_node = node_id or self.least_loaded_node() if try_node is None: - raise Errors.NoBrokersAvailable() + return (0, 0) self._maybe_connect(try_node) conn = self._conns[try_node] @@ -847,15 +851,14 @@ def check_version(self, node_id=None, timeout=2, strict=False): version = conn.check_version(timeout=remaining, strict=strict) return version except Errors.NodeNotReadyError: - # Only raise to user if this is a node-specific request if node_id is not None: - raise + return (0, 0) finally: self._refresh_on_disconnects = True # Timeout else: - raise Errors.NoBrokersAvailable() + return (0, 0) def wakeup(self): with self._wake_lock: