Skip to content

Commit 4df9f63

Browse files
committed
Fix send to controller
The controller send error handling was completely broken. It also pinned the metadata version unnecessarily. Additionally, several of the methods were sending to the controller but either that was unnecessary, or just plain wrong. So updated following the pattern of the Java Admin client.
1 parent d67157c commit 4df9f63

File tree

1 file changed

+90
-44
lines changed

1 file changed

+90
-44
lines changed

kafka/admin/kafka.py

+90-44
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
import logging
55
import socket
66
from kafka.client_async import KafkaClient, selectors
7+
import kafka.errors as Errors
78
from kafka.errors import (
8-
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
9-
NodeNotReadyError, NotControllerError)
9+
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
10+
UnrecognizedBrokerVersion)
1011
from kafka.metrics import MetricConfig, Metrics
1112
from kafka.protocol.admin import (
1213
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -230,17 +231,22 @@ def _validate_timeout(self, timeout_ms):
230231
return timeout_ms or self.config['request_timeout_ms']
231232

232233
def _refresh_controller_id(self):
233-
"""Determine the kafka cluster controller
234-
"""
235-
response = self._send_request_to_node(
236-
self._client.least_loaded_node(),
237-
MetadataRequest[1]([])
238-
)
239-
self._controller_id = response.controller_id
240-
version = self._client.check_version(self._controller_id)
241-
if version < (0, 10, 0):
242-
raise IncompatibleBrokerVersion(
243-
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
234+
"""Determine the kafka cluster controller."""
235+
version = self._matching_api_version(MetadataRequest)
236+
if 1 <= version <= 6:
237+
request = MetadataRequest[version]()
238+
response = self._send_request_to_node(self._client.least_loaded_node(), request)
239+
controller_id = response.controller_id
240+
# verify the controller is new enough to support our requests
241+
controller_version = self._client.check_version(controller_id)
242+
if controller_version < (0, 10, 0):
243+
raise IncompatibleBrokerVersion(
244+
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
245+
.format(controller_version))
246+
self._controller_id = controller_id
247+
else:
248+
raise UnrecognizedBrokerVersion(
249+
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
244250
.format(version))
245251

246252
def _send_request_to_node(self, node, request):
@@ -261,22 +267,34 @@ def _send_request_to_node(self, node, request):
261267
else:
262268
raise future.exception # pylint: disable-msg=raising-bad-type
263269

264-
def _send(self, request):
265-
"""Send a kafka protocol message to the cluster controller. Will block until the message result is received.
270+
def _send_request_to_controller(self, request):
271+
"""Send a kafka protocol message to the cluster controller.
272+
273+
Will block until the message result is received.
266274
267275
:param request: The message to send
268-
:return The kafka protocol response for the message
269-
:exception NodeNotReadyError: If the controller connection can't be established
276+
:return: The kafka protocol response for the message
270277
"""
271-
remaining_tries = 2
272-
while remaining_tries > 0:
273-
remaining_tries = remaining_tries - 1
274-
try:
275-
return self._send_request_to_node(self._controller_id, request)
276-
except (NotControllerError, KafkaConnectionError) as e:
277-
# controller changed? refresh it
278-
self._refresh_controller_id()
279-
raise NodeNotReadyError(self._controller_id)
278+
tries = 2 # in case our cached self._controller_id is outdated
279+
while tries:
280+
tries -= 1
281+
response = self._send_request_to_node(self._controller_id, request)
282+
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
283+
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
284+
error_type = Errors.for_code(error_code)
285+
if tries and isinstance(error_type, NotControllerError):
286+
# No need to inspect the rest of the errors for
287+
# non-retriable errors because NotControllerError should
288+
# either be thrown for all errors or no errors.
289+
self._refresh_controller_id()
290+
break
291+
elif error_type is not Errors.NoError:
292+
raise error_type(
293+
"Request '{}' failed with response '{}'."
294+
.format(request, response))
295+
else:
296+
return response
297+
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
280298

281299
@staticmethod
282300
def _convert_new_topic_request(new_topic):
@@ -322,7 +340,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
322340
raise NotImplementedError(
323341
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
324342
.format(version))
325-
return self._send(request)
343+
return self._send_request_to_controller(request)
326344

327345
def delete_topics(self, topics, timeout_ms=None):
328346
"""Delete topics from the cluster
@@ -342,19 +360,25 @@ def delete_topics(self, topics, timeout_ms=None):
342360
raise NotImplementedError(
343361
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
344362
.format(version))
345-
return self._send(request)
363+
return self._send_request_to_controller(request)
346364

347365
# list topics functionality is in ClusterMetadata
366+
# Note: if implemented here, send the request to the least_loaded_node()
348367

349368
# describe topics functionality is in ClusterMetadata
369+
# Note: if implemented here, send the request to the controller
350370

351371
# describe cluster functionality is in ClusterMetadata
372+
# Note: if implemented here, send the request to the least_loaded_node()
352373

353-
# describe_acls protocol not implemented
374+
# describe_acls protocol not yet implemented
375+
# Note: send the request to the least_loaded_node()
354376

355-
# create_acls protocol not implemented
377+
# create_acls protocol not yet implemented
378+
# Note: send the request to the least_loaded_node()
356379

357-
# delete_acls protocol not implemented
380+
# delete_acls protocol not yet implemented
381+
# Note: send the request to the least_loaded_node()
358382

359383
@staticmethod
360384
def _convert_describe_config_resource_request(config_resource):
@@ -394,7 +418,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
394418
raise NotImplementedError(
395419
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
396420
.format(version))
397-
return self._send(request)
421+
return self._send_request_to_node(self._client.least_loaded_node(), request)
398422

399423
@staticmethod
400424
def _convert_alter_config_resource_request(config_resource):
@@ -409,6 +433,12 @@ def _convert_alter_config_resource_request(config_resource):
409433
def alter_configs(self, config_resources):
410434
"""Alter configuration parameters of one or more kafka resources.
411435
436+
Warning:
437+
This is currently broken for BROKER resources because those must be
438+
sent to that specific broker, versus this always picks the
439+
least-loaded node. See the comment in the source code for details.
440+
We would happily accept a PR fixing this.
441+
412442
:param config_resources: An array of ConfigResource objects.
413443
:return: Appropriate version of AlterConfigsResponse class
414444
"""
@@ -421,11 +451,19 @@ def alter_configs(self, config_resources):
421451
raise NotImplementedError(
422452
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
423453
.format(version))
424-
return self._send(request)
454+
# TODO the Java client has the note:
455+
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
456+
# // and send the request to that specific broker. Other resources are grouped together into
457+
# // a single request that may be sent to any broker.
458+
#
459+
# So this is currently broken as it always sends to the least_loaded_node()
460+
return self._send_request_to_node(self._client.least_loaded_node(), request)
425461

426-
# alter replica logs dir protocol not implemented
462+
# alter replica logs dir protocol not yet implemented
463+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
427464

428-
# describe log dirs protocol not implemented
465+
# describe log dirs protocol not yet implemented
466+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
429467

430468
@staticmethod
431469
def _convert_create_partitions_request(topic_name, new_partitions):
@@ -458,17 +496,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
458496
raise NotImplementedError(
459497
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
460498
.format(version))
461-
return self._send(request)
499+
return self._send_request_to_controller(request)
462500

463-
# delete records protocol not implemented
501+
# delete records protocol not yet implemented
502+
# Note: send the request to the partition leaders
464503

465-
# create delegation token protocol not implemented
504+
# create delegation token protocol not yet implemented
505+
# Note: send the request to the least_loaded_node()
466506

467-
# renew delegation token protocol not implemented
507+
# renew delegation token protocol not yet implemented
508+
# Note: send the request to the least_loaded_node()
468509

469-
# expire delegation_token protocol not implemented
510+
# expire delegation_token protocol not yet implemented
511+
# Note: send the request to the least_loaded_node()
470512

471-
# describe delegation_token protocol not implemented
513+
# describe delegation_token protocol not yet implemented
514+
# Note: send the request to the least_loaded_node()
472515

473516
def describe_consumer_groups(self, group_ids):
474517
"""Describe a set of consumer groups.
@@ -485,7 +528,8 @@ def describe_consumer_groups(self, group_ids):
485528
raise NotImplementedError(
486529
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
487530
.format(version))
488-
return self._send(request)
531+
# TODO this is completely broken, as it needs to send to the group coordinator
532+
# return self._send(request)
489533

490534
def list_consumer_groups(self):
491535
"""List all consumer groups known to the cluster.
@@ -499,6 +543,8 @@ def list_consumer_groups(self):
499543
raise NotImplementedError(
500544
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
501545
.format(version))
502-
return self._send(request)
546+
# TODO this is completely broken, as it needs to send to the group coordinator
547+
# return self._send(request)
503548

504-
# delete groups protocol not implemented
549+
# delete groups protocol not yet implemented
550+
# Note: send the request to the group's coordinator.

0 commit comments

Comments
 (0)