diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index d4ae815a4..e78bdbfa7 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -189,10 +189,6 @@ def __init__(self, **configs): if self.config['api_version'] is None: self.config['api_version'] = self._client.config['api_version'] - if self.config['api_version'] < (0, 10, 0): - raise UnsupportedVersionError( - "Kafka Admin interface not supported for cluster version {} < 0.10.0.0" - .format(self.config['api_version'])) self._closed = False self._refresh_controller_id() log.debug('Kafka administration interface started') @@ -241,6 +237,11 @@ def _refresh_controller_id(self): MetadataRequest[1]([]) ) self._controller_id = response.controller_id + version = self._client.check_version(self._controller_id) + if version < (0, 10, 0): + raise UnsupportedVersionError( + "Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0" + .format(version)) def _send_request_to_node(self, node, request): """Send a kafka protocol message to a specific broker. Will block until the message result is received.