Skip to content

Commit

Permalink
Check broker protocol version on each connection to controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
llamahunter authored and jeffwidman committed Oct 23, 2018
1 parent 6eeaf63 commit 6a88ec0
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,6 @@ def __init__(self, **configs):
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']

if self.config['api_version'] < (0, 10, 0):
raise UnsupportedVersionError(
"Kafka Admin interface not supported for cluster version {} < 0.10.0.0"
.format(self.config['api_version']))
self._closed = False
self._refresh_controller_id()
log.debug('Kafka administration interface started')
Expand Down Expand Up @@ -241,6 +237,11 @@ def _refresh_controller_id(self):
MetadataRequest[1]([])
)
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
raise UnsupportedVersionError(
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
.format(version))

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
Expand Down

0 comments on commit 6a88ec0

Please sign in to comment.