@@ -249,7 +249,11 @@ def _refresh_controller_id(self):
249
249
version = self ._matching_api_version (MetadataRequest )
250
250
if 1 <= version <= 6 :
251
251
request = MetadataRequest [version ]()
252
- response = self ._send_request_to_node (self ._client .least_loaded_node (), request )
252
+ future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
253
+
254
+ self ._wait_for_futures ([future ])
255
+
256
+ response = future .value
253
257
controller_id = response .controller_id
254
258
# verify the controller is new enough to support our requests
255
259
controller_version = self ._client .check_version (controller_id )
@@ -281,7 +285,11 @@ def _find_group_coordinator_id(self, group_id):
281
285
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
282
286
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
283
287
gc_request = GroupCoordinatorRequest [0 ](group_id )
284
- gc_response = self ._send_request_to_node (self ._client .least_loaded_node (), gc_request )
288
+ future = self ._send_request_to_node (self ._client .least_loaded_node (), gc_request )
289
+
290
+ self ._wait_for_futures ([future ])
291
+
292
+ gc_response = future .value
285
293
# use the extra error checking in add_group_coordinator() rather than
286
294
# immediately returning the group coordinator.
287
295
success = self ._client .cluster .add_group_coordinator (group_id , gc_response )
@@ -304,23 +312,19 @@ def _find_group_coordinator_id(self, group_id):
304
312
def _send_request_to_node (self , node_id , request ):
305
313
"""Send a Kafka protocol message to a specific broker.
306
314
307
- Will block until the message result is received .
315
+ Returns a future that may be polled for status and results .
308
316
309
317
:param node_id: The broker id to which to send the message.
310
318
:param request: The message to send.
311
- :return: The Kafka protocol response for the message .
319
+ :return: A future object that may be polled for status and results .
312
320
:exception: The exception if the message could not be sent.
313
321
"""
314
322
while not self ._client .ready (node_id ):
315
323
# poll until the connection to broker is ready, otherwise send()
316
324
# will fail with NodeNotReadyError
317
325
self ._client .poll ()
318
- future = self ._client .send (node_id , request )
319
- self ._client .poll (future = future )
320
- if future .succeeded ():
321
- return future .value
322
- else :
323
- raise future .exception # pylint: disable-msg=raising-bad-type
326
+ return self ._client .send (node_id , request )
327
+
324
328
325
329
def _send_request_to_controller (self , request ):
326
330
"""Send a Kafka protocol message to the cluster controller.
@@ -333,7 +337,11 @@ def _send_request_to_controller(self, request):
333
337
tries = 2 # in case our cached self._controller_id is outdated
334
338
while tries :
335
339
tries -= 1
336
- response = self ._send_request_to_node (self ._controller_id , request )
340
+ future = self ._send_request_to_node (self ._controller_id , request )
341
+
342
+ self ._wait_for_futures ([future ])
343
+
344
+ response = future .value
337
345
# In Java, the error fieldname is inconsistent:
338
346
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
339
347
# - DeleteTopicsResponse uses topic_error_codes
@@ -490,7 +498,11 @@ def describe_configs(self, config_resources, include_synonyms=False):
490
498
raise NotImplementedError (
491
499
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
492
500
.format (version ))
493
- return self ._send_request_to_node (self ._client .least_loaded_node (), request )
501
+ future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
502
+
503
+ self ._wait_for_futures ([future ])
504
+
505
+ return future .value
494
506
495
507
@staticmethod
496
508
def _convert_alter_config_resource_request (config_resource ):
@@ -529,7 +541,11 @@ def alter_configs(self, config_resources):
529
541
# // a single request that may be sent to any broker.
530
542
#
531
543
# So this is currently broken as it always sends to the least_loaded_node()
532
- return self ._send_request_to_node (self ._client .least_loaded_node (), request )
544
+ future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
545
+
546
+ self ._wait_for_futures ([future ])
547
+
548
+ return future .value
533
549
534
550
# alter replica logs dir protocol not yet implemented
535
551
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -605,42 +621,50 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
605
621
partition assignments.
606
622
"""
607
623
group_descriptions = []
624
+ futures = []
608
625
version = self ._matching_api_version (DescribeGroupsRequest )
609
626
for group_id in group_ids :
610
627
if group_coordinator_id is not None :
611
628
this_groups_coordinator_id = group_coordinator_id
612
629
else :
613
630
this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
631
+
614
632
if version <= 1 :
615
633
# Note: KAFKA-6788 A potential optimization is to group the
616
634
# request per coordinator and send one request with a list of
617
635
# all consumer groups. Java still hasn't implemented this
618
636
# because the error checking is hard to get right when some
619
637
# groups error and others don't.
620
638
request = DescribeGroupsRequest [version ](groups = (group_id ,))
621
- response = self ._send_request_to_node (this_groups_coordinator_id , request )
622
- assert len (response .groups ) == 1
623
- # TODO need to implement converting the response tuple into
624
- # a more accessible interface like a namedtuple and then stop
625
- # hardcoding tuple indices here. Several Java examples,
626
- # including KafkaAdminClient.java
627
- group_description = response .groups [0 ]
628
- error_code = group_description [0 ]
629
- error_type = Errors .for_code (error_code )
630
- # Java has the note: KAFKA-6789, we can retry based on the error code
631
- if error_type is not Errors .NoError :
632
- raise error_type (
633
- "Request '{}' failed with response '{}'."
634
- .format (request , response ))
635
- # TODO Java checks the group protocol type, and if consumer
636
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
637
- # the members' partition assignments... that hasn't yet been
638
- # implemented here so just return the raw struct results
639
- group_descriptions .append (group_description )
639
+ futures .append (self ._send_request_to_node (this_groups_coordinator_id , request ))
640
640
else :
641
641
raise NotImplementedError (
642
642
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
643
643
.format (version ))
644
+
645
+ self ._wait_for_futures (futures )
646
+
647
+ for future in futures :
648
+ response = future .value
649
+ assert len (response .groups ) == 1
650
+ # TODO need to implement converting the response tuple into
651
+ # a more accessible interface like a namedtuple and then stop
652
+ # hardcoding tuple indices here. Several Java examples,
653
+ # including KafkaAdminClient.java
654
+ group_description = response .groups [0 ]
655
+ error_code = group_description [0 ]
656
+ error_type = Errors .for_code (error_code )
657
+ # Java has the note: KAFKA-6789, we can retry based on the error code
658
+ if error_type is not Errors .NoError :
659
+ raise error_type (
660
+ "Request '{}' failed with response '{}'."
661
+ .format (request , response ))
662
+ # TODO Java checks the group protocol type, and if consumer
663
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
664
+ # the members' partition assignments... that hasn't yet been
665
+ # implemented here so just return the raw struct results
666
+ group_descriptions .append (group_description )
667
+
644
668
return group_descriptions
645
669
646
670
def list_consumer_groups (self , broker_ids = None ):
@@ -673,13 +697,19 @@ def list_consumer_groups(self, broker_ids=None):
673
697
# consumer groups move to new brokers that haven't yet been queried,
674
698
# then the same group could be returned by multiple brokers.
675
699
consumer_groups = set ()
700
+ futures = []
676
701
if broker_ids is None :
677
702
broker_ids = [broker .nodeId for broker in self ._client .cluster .brokers ()]
678
703
version = self ._matching_api_version (ListGroupsRequest )
679
704
if version <= 2 :
680
705
request = ListGroupsRequest [version ]()
681
706
for broker_id in broker_ids :
682
- response = self ._send_request_to_node (broker_id , request )
707
+ futures .append (self ._send_request_to_node (broker_id , request ))
708
+
709
+ self ._wait_for_futures (futures )
710
+
711
+ for future in futures :
712
+ response = future .value
683
713
error_type = Errors .for_code (response .error_code )
684
714
if error_type is not Errors .NoError :
685
715
raise error_type (
@@ -738,7 +768,10 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
738
768
topics_partitions_dict [topic ].add (partition )
739
769
topics_partitions = list (six .iteritems (topics_partitions_dict ))
740
770
request = OffsetFetchRequest [version ](group_id , topics_partitions )
741
- response = self ._send_request_to_node (group_coordinator_id , request )
771
+ future = self ._send_request_to_node (group_coordinator_id , request )
772
+ self ._wait_for_futures ([future ])
773
+ response = future .value
774
+
742
775
if version > 1 : # OffsetFetchResponse_v1 lacks a top-level error_code
743
776
error_type = Errors .for_code (response .error_code )
744
777
if error_type is not Errors .NoError :
@@ -764,3 +797,11 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
764
797
765
798
# delete groups protocol not yet implemented
766
799
# Note: send the request to the group's coordinator.
800
+
801
+ def _wait_for_futures (self , futures ):
802
+ while not all (future .succeeded () for future in futures ):
803
+ for future in futures :
804
+ self ._client .poll (future = future )
805
+
806
+ if future .failed ():
807
+ raise future .exception # pylint: disable-msg=raising-bad-type
0 commit comments