Skip to content

Commit

Permalink
Maintain shadow cluster metadata for bootstrapping (#1753)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored Mar 22, 2019
1 parent 0bc7518 commit af2dd48
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 35 deletions.
35 changes: 10 additions & 25 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,7 @@ def _can_bootstrap(self):

def _can_connect(self, node_id):
if node_id not in self._conns:
# cluster.broker_metadata() is stateful when called w/ 'bootstrap'
# (it cycles through all of the bootstrap servers)
# so we short-circuit here and assume that we should always have
# some bootstrap_servers config to power bootstrap broker_metadata
if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
if self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
Expand All @@ -272,7 +268,7 @@ def _conn_state_change(self, node_id, conn):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)

if node_id == 'bootstrap':
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()

elif conn.connected():
Expand All @@ -290,12 +286,13 @@ def _conn_state_change(self, node_id, conn):

self._idle_expiry_manager.update(node_id)

if node_id == 'bootstrap':
if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails = 0

elif 'bootstrap' in self._conns:
bootstrap = self._conns.pop('bootstrap')
bootstrap.close()
else:
for node_id in list(self._conns.keys()):
if self.cluster.is_bootstrap(node_id):
self._conns.pop(node_id).close()

# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
Expand All @@ -314,7 +311,7 @@ def _conn_state_change(self, node_id, conn):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)

if node_id == 'bootstrap':
if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails += 1

elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
Expand All @@ -337,10 +334,6 @@ def _should_recycle_connection(self, conn):
if not conn.disconnected():
return False

# Always recycled disconnected bootstraps
elif conn.node_id == 'bootstrap':
return True

# Otherwise, only recycle when broker metadata has changed
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
Expand All @@ -361,10 +354,6 @@ def _maybe_connect(self, node_id):
conn = self._conns.get(node_id)

if conn is None:
# Note that when bootstrapping, each call to broker_metadata may
# return a different host/port. So we need to be careful to only
# call when necessary to avoid skipping some possible bootstrap
# source.
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)

Expand Down Expand Up @@ -703,7 +692,7 @@ def least_loaded_node(self):
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
are not subject to a reconnect backoff). If no node metadata has been
obtained, will return 'bootstrap' (subject to exponential backoff).
obtained, will return a bootstrap node (subject to exponential backoff).
Returns:
node_id or None if no suitable node was found
Expand All @@ -730,10 +719,6 @@ def least_loaded_node(self):
if found is not None:
return found

elif not nodes and self._can_bootstrap():
self._last_bootstrap = time.time()
return 'bootstrap'

return None

def set_topics(self, topics):
Expand Down Expand Up @@ -791,7 +776,7 @@ def _maybe_refresh_metadata(self):

if self._can_send_request(node_id):
topics = list(self._topics)
if not topics and node_id == 'bootstrap':
if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])

if self.cluster.need_all_topic_metadata or not topics:
Expand Down
21 changes: 11 additions & 10 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ClusterMetadata(object):
DEFAULT_CONFIG = {
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'bootstrap_servers': 'localhost',
'bootstrap_servers': [],
}

def __init__(self, **configs):
Expand Down Expand Up @@ -70,18 +70,22 @@ def _generate_bootstrap_brokers(self):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])

while True:
for host, port, afi in bootstrap_hosts:
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
brokers = {}
for i, (host, port, _) in enumerate(bootstrap_hosts):
node_id = 'bootstrap-%s' % i
brokers[node_id] = BrokerMetadata(node_id, host, port, None)
return brokers

def is_bootstrap(self, node_id):
return node_id in self._bootstrap_brokers

def brokers(self):
"""Get all BrokerMetadata
Returns:
set: {BrokerMetadata, ...}
"""
return set(self._brokers.values())
return set(self._brokers.values()) or set(self._bootstrap_brokers.values())

def broker_metadata(self, broker_id):
"""Get BrokerMetadata
Expand All @@ -92,10 +96,7 @@ def broker_metadata(self, broker_id):
Returns:
BrokerMetadata or None if not found
"""
if broker_id == 'bootstrap':
return next(self._bootstrap_brokers)

return self._brokers.get(broker_id)
return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)

def partitions_for_topic(self, topic):
"""Return set of all partitions for topic (whether available or not)
Expand Down

0 comments on commit af2dd48

Please sign in to comment.