Skip to content

Commit 8cd99b7

Browse files
committed
Fix send to controller
The controller send error handling was completely broken. It also pinned the metadata version unnecessarily. Additionally, several of the methods were sending to the controller but either that was unnecessary, or just plain wrong. So updated following the pattern of the Java Admin client.
1 parent dd8d304 commit 8cd99b7

File tree

1 file changed

+85
-39
lines changed

1 file changed

+85
-39
lines changed

kafka/admin/kafka.py

+85-39
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import logging
55
import socket
66
from kafka.client_async import KafkaClient, selectors
7+
import kafka.errors as Errors
78
from kafka.errors import (
8-
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
9-
NodeNotReadyError, NotControllerError)
9+
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
10+
UnrecognizedBrokerVersion)
1011
from kafka.metrics import MetricConfig, Metrics
1112
from kafka.protocol.admin import (
1213
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -240,17 +241,22 @@ def _validate_timeout(self, timeout_ms):
240241
return timeout_ms or self.config['request_timeout_ms']
241242

242243
def _refresh_controller_id(self):
243-
"""Determine the kafka cluster controller
244-
"""
245-
response = self._send_request_to_node(
246-
self._client.least_loaded_node(),
247-
MetadataRequest[1]([])
248-
)
249-
self._controller_id = response.controller_id
250-
version = self._client.check_version(self._controller_id)
251-
if version < (0, 10, 0):
252-
raise IncompatibleBrokerVersion(
253-
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
244+
"""Determine the kafka cluster controller."""
245+
version = self._matching_api_version(MetadataRequest)
246+
if 1 <= version <= 6:
247+
request = MetadataRequest[version]()
248+
response = self._send_request_to_node(self._client.least_loaded_node(), request)
249+
controller_id = response.controller_id
250+
# verify the controller is new enough to support our requests
251+
controller_version = self._client.check_version(controller_id)
252+
if controller_version < (0, 10, 0):
253+
raise IncompatibleBrokerVersion(
254+
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
255+
.format(controller_version))
256+
self._controller_id = controller_id
257+
else:
258+
raise UnrecognizedBrokerVersion(
259+
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
254260
.format(version))
255261

256262
def _send_request_to_node(self, node, request):
@@ -271,22 +277,34 @@ def _send_request_to_node(self, node, request):
271277
else:
272278
raise future.exception # pylint: disable-msg=raising-bad-type
273279

274-
def _send(self, request):
275-
"""Send a kafka protocol message to the cluster controller. Will block until the message result is received.
280+
def _send_request_to_controller(self, request):
281+
"""Send a kafka protocol message to the cluster controller.
282+
283+
Will block until the message result is received.
276284
277285
:param request: The message to send
278-
:return The kafka protocol response for the message
279-
:exception NodeNotReadyError: If the controller connection can't be established
286+
:return: The kafka protocol response for the message
280287
"""
281-
remaining_tries = 2
282-
while remaining_tries > 0:
283-
remaining_tries = remaining_tries - 1
284-
try:
285-
return self._send_request_to_node(self._controller_id, request)
286-
except (NotControllerError, KafkaConnectionError) as e:
287-
# controller changed? refresh it
288-
self._refresh_controller_id()
289-
raise NodeNotReadyError(self._controller_id)
288+
tries = 2 # in case our cached self._controller_id is outdated
289+
while tries:
290+
tries -= 1
291+
response = self._send_request_to_node(self._controller_id, request)
292+
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
293+
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
294+
error_type = Errors.for_code(error_code)
295+
if tries and isinstance(error_type, NotControllerError):
296+
# No need to inspect the rest of the errors for
297+
# non-retriable errors because NotControllerError should
298+
# either be thrown for all errors or no errors.
299+
self._refresh_controller_id()
300+
break
301+
elif error_type is not Errors.NoError:
302+
raise error_type(
303+
"Request '{}' failed with response '{}'."
304+
.format(request, response))
305+
else:
306+
return response
307+
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
290308

291309
@staticmethod
292310
def _convert_new_topic_request(new_topic):
@@ -332,7 +350,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
332350
raise NotImplementedError(
333351
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
334352
.format(version))
335-
return self._send(request)
353+
return self._send_request_to_controller(request)
336354

337355
def delete_topics(self, topics, timeout_ms=None):
338356
"""Delete topics from the cluster
@@ -352,19 +370,25 @@ def delete_topics(self, topics, timeout_ms=None):
352370
raise NotImplementedError(
353371
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
354372
.format(version))
355-
return self._send(request)
373+
return self._send_request_to_controller(request)
356374

357375
# list topics functionality is in ClusterMetadata
376+
# Note: if implemented here, send the request to the least_loaded_node()
358377

359378
# describe topics functionality is in ClusterMetadata
379+
# Note: if implemented here, send the request to the controller
360380

361381
# describe cluster functionality is in ClusterMetadata
382+
# Note: if implemented here, send the request to the least_loaded_node()
362383

363-
# describe_acls protocol not implemented
384+
# describe_acls protocol not yet implemented
385+
# Note: send the request to the least_loaded_node()
364386

365-
# create_acls protocol not implemented
387+
# create_acls protocol not yet implemented
388+
# Note: send the request to the least_loaded_node()
366389

367-
# delete_acls protocol not implemented
390+
# delete_acls protocol not yet implemented
391+
# Note: send the request to the least_loaded_node()
368392

369393
@staticmethod
370394
def _convert_describe_config_resource_request(config_resource):
@@ -404,7 +428,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
404428
raise NotImplementedError(
405429
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
406430
.format(version))
407-
return self._send(request)
431+
return self._send_request_to_node(self._client.least_loaded_node(), request)
408432

409433
@staticmethod
410434
def _convert_alter_config_resource_request(config_resource):
@@ -419,6 +443,12 @@ def _convert_alter_config_resource_request(config_resource):
419443
def alter_configs(self, config_resources):
420444
"""Alter configuration parameters of one or more kafka resources.
421445
446+
Warning:
447+
This is currently broken for BROKER resources because those must be
448+
sent to that specific broker, versus this always picks the
449+
least-loaded node. See the comment in the source code for details.
450+
We would happily accept a PR fixing this.
451+
422452
:param config_resources: An array of ConfigResource objects.
423453
:return: Appropriate version of AlterConfigsResponse class
424454
"""
@@ -431,11 +461,19 @@ def alter_configs(self, config_resources):
431461
raise NotImplementedError(
432462
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
433463
.format(version))
434-
return self._send(request)
464+
# TODO the Java client has the note:
465+
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
466+
# // and send the request to that specific broker. Other resources are grouped together into
467+
# // a single request that may be sent to any broker.
468+
#
469+
# So this is currently broken as it always sends to the least_loaded_node()
470+
return self._send_request_to_node(self._client.least_loaded_node(), request)
435471

436-
# alter replica logs dir protocol not implemented
472+
# alter replica logs dir protocol not yet implemented
473+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
437474

438-
# describe log dirs protocol not implemented
475+
# describe log dirs protocol not yet implemented
476+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
439477

440478
@staticmethod
441479
def _convert_create_partitions_request(topic_name, new_partitions):
@@ -468,17 +506,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
468506
raise NotImplementedError(
469507
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
470508
.format(version))
471-
return self._send(request)
509+
return self._send_request_to_controller(request)
472510

473-
# delete records protocol not implemented
511+
# delete records protocol not yet implemented
512+
# Note: send the request to the partition leaders
474513

475514
# create delegation token protocol not implemented
515+
# Note: send the request to the least_loaded_node()
476516

477517
# renew delegation token protocol not implemented
518+
# Note: send the request to the least_loaded_node()
478519

479520
# expire delegation_token protocol not implemented
521+
# Note: send the request to the least_loaded_node()
480522

481523
# describe delegation_token protocol not implemented
524+
# Note: send the request to the least_loaded_node()
482525

483526
def describe_consumer_groups(self, group_ids):
484527
"""Describe a set of consumer groups.
@@ -495,7 +538,8 @@ def describe_consumer_groups(self, group_ids):
495538
raise NotImplementedError(
496539
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
497540
.format(version))
498-
return self._send(request)
541+
# TODO this is completely broken, as it needs to send to the group coordinator
542+
# return self._send(request)
499543

500544
def list_consumer_groups(self):
501545
"""List all consumer groups known to the cluster.
@@ -509,6 +553,8 @@ def list_consumer_groups(self):
509553
raise NotImplementedError(
510554
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
511555
.format(version))
512-
return self._send(request)
556+
# TODO this is completely broken, as it needs to send to the group coordinator
557+
# return self._send(request)
513558

514559
# delete groups protocol not implemented
560+
# Note: send the request to the group's coordinator.

0 commit comments

Comments
 (0)