From 240da069c65b508fc1c754a11df7f615babdcee1 Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Sat, 17 Nov 2018 02:04:28 -0800 Subject: [PATCH] Fix send to controller The controller send error handling was completely broken. It also pinned the metadata version unnecessarily. Additionally, several of the methods were sending to the controller but either that was unnecessary, or just plain wrong. So updated following the pattern of the Java Admin client. --- kafka/admin/kafka.py | 124 +++++++++++++++++++++++++++++-------------- 1 file changed, 85 insertions(+), 39 deletions(-) diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index 01db6a98c..6cbc6f4b6 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -4,9 +4,10 @@ import logging import socket from kafka.client_async import KafkaClient, selectors +from kafka import errors as Errors from kafka.errors import ( - IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, - NodeNotReadyError, NotControllerError) + IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, + UnrecognizedBrokerVersion) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, @@ -240,17 +241,22 @@ def _validate_timeout(self, timeout_ms): return timeout_ms or self.config['request_timeout_ms'] def _refresh_controller_id(self): - """Determine the kafka cluster controller - """ - response = self._send_request_to_node( - self._client.least_loaded_node(), - MetadataRequest[1]([]) - ) - self._controller_id = response.controller_id - version = self._client.check_version(self._controller_id) - if version < (0, 10, 0): - raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + """Determine the kafka cluster controller.""" + version = self._matching_api_version(MetadataRequest) + if 1 <= version <= 6: + request = MetadataRequest[version]() + response = self._send_request_to_node(self._client.least_loaded_node(), request) + controller_id = response.controller_id + # verify the controller is new enough to support our requests + controller_version = self._client.check_version(controller_id) + if controller_version < (0, 10, 0): + raise IncompatibleBrokerVersion( + "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + .format(controller_version)) + self._controller_id = controller_id + else: + raise UnrecognizedBrokerVersion( + "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}." .format(version)) def _send_request_to_node(self, node, request): @@ -271,22 +277,34 @@ def _send_request_to_node(self, node, request): else: raise future.exception # pylint: disable-msg=raising-bad-type - def _send(self, request): - """Send a kafka protocol message to the cluster controller. Will block until the message result is received. + def _send_request_to_controller(self, request): + """Send a kafka protocol message to the cluster controller. + + Will block until the message result is received. :param request: The message to send - :return The kafka protocol response for the message - :exception NodeNotReadyError: If the controller connection can't be established + :return: The kafka protocol response for the message """ - remaining_tries = 2 - while remaining_tries > 0: - remaining_tries = remaining_tries - 1 - try: - return self._send_request_to_node(self._controller_id, request) - except (NotControllerError, KafkaConnectionError) as e: - # controller changed? refresh it - self._refresh_controller_id() - raise NodeNotReadyError(self._controller_id) + tries = 2 # in case our cached self._controller_id is outdated + while tries: + tries -= 1 + response = self._send_request_to_node(self._controller_id, request) + # DeleteTopicsResponse returns topic_error_codes rather than topic_errors + for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes): + error_type = Errors.for_code(error_code) + if tries and isinstance(error_type, NotControllerError): + # No need to inspect the rest of the errors for + # non-retriable errors because NotControllerError should + # either be thrown for all errors or no errors. + self._refresh_controller_id() + break + elif error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + else: + return response + raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") @staticmethod def _convert_new_topic_request(new_topic): @@ -332,7 +350,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None): raise NotImplementedError( "Support for CreateTopics v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) def delete_topics(self, topics, timeout_ms=None): """Delete topics from the cluster @@ -352,19 +370,25 @@ def delete_topics(self, topics, timeout_ms=None): raise NotImplementedError( "Support for DeleteTopics v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) # list topics functionality is in ClusterMetadata + # Note: if implemented here, send the request to the least_loaded_node() # describe topics functionality is in ClusterMetadata + # Note: if implemented here, send the request to the controller # describe cluster functionality is in ClusterMetadata + # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not implemented + # describe_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() - # create_acls protocol not implemented + # create_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() - # delete_acls protocol not implemented + # delete_acls protocol not yet implemented + # Note: send the request to the least_loaded_node() @staticmethod def _convert_describe_config_resource_request(config_resource): @@ -404,7 +428,7 @@ def describe_configs(self, config_resources, include_synonyms=None): raise NotImplementedError( "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_alter_config_resource_request(config_resource): @@ -419,6 +443,12 @@ def _convert_alter_config_resource_request(config_resource): def alter_configs(self, config_resources): """Alter configuration parameters of one or more kafka resources. + Warning: + This is currently broken for BROKER resources because those must be + sent to that specific broker, versus this always picks the + least-loaded node. See the comment in the source code for details. + We would happily accept a PR fixing this. + :param config_resources: An array of ConfigResource objects. :return: Appropriate version of AlterConfigsResponse class """ @@ -431,11 +461,19 @@ def alter_configs(self, config_resources): raise NotImplementedError( "Support for AlterConfigs v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO the Java client has the note: + # // We must make a separate AlterConfigs request for every BROKER resource we want to alter + # // and send the request to that specific broker. Other resources are grouped together into + # // a single request that may be sent to any broker. + # + # So this is currently broken as it always sends to the least_loaded_node() + return self._send_request_to_node(self._client.least_loaded_node(), request) - # alter replica logs dir protocol not implemented + # alter replica logs dir protocol not yet implemented + # Note: have to lookup the broker with the replica assignment and send the request to that broker - # describe log dirs protocol not implemented + # describe log dirs protocol not yet implemented + # Note: have to lookup the broker with the replica assignment and send the request to that broker @staticmethod def _convert_create_partitions_request(topic_name, new_partitions): @@ -468,17 +506,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non raise NotImplementedError( "Support for CreatePartitions v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + return self._send_request_to_controller(request) - # delete records protocol not implemented + # delete records protocol not yet implemented + # Note: send the request to the partition leaders # create delegation token protocol not implemented + # Note: send the request to the least_loaded_node() # renew delegation token protocol not implemented + # Note: send the request to the least_loaded_node() # expire delegation_token protocol not implemented + # Note: send the request to the least_loaded_node() # describe delegation_token protocol not implemented + # Note: send the request to the least_loaded_node() def describe_consumer_groups(self, group_ids): """Describe a set of consumer groups. @@ -495,7 +538,8 @@ def describe_consumer_groups(self, group_ids): raise NotImplementedError( "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO this is completely broken, as it needs to send to the group coordinator + # return self._send(request) def list_consumer_groups(self): """List all consumer groups known to the cluster. @@ -509,6 +553,8 @@ def list_consumer_groups(self): raise NotImplementedError( "Support for ListGroups v{} has not yet been added to KafkaAdmin." .format(version)) - return self._send(request) + # TODO this is completely broken, as it needs to send to the group coordinator + # return self._send(request) # delete groups protocol not implemented + # Note: send the request to the group's coordinator.