@@ -349,7 +349,7 @@ def _send_request_to_controller(self, request):
349
349
# one of these attributes and that they always unpack into
350
350
# (topic, error_code) tuples.
351
351
topic_error_tuples = (response .topic_errors if hasattr (response , 'topic_errors' )
352
- else response .topic_error_codes )
352
+ else response .topic_error_codes )
353
353
# Also small py2/py3 compatibility -- py3 can ignore extra values
354
354
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
355
355
# So for now we have to map across the list and explicitly drop any
@@ -501,8 +501,8 @@ def describe_configs(self, config_resources, include_synonyms=False):
501
501
future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
502
502
503
503
self ._wait_for_futures ([future ])
504
-
505
- return future . value
504
+ response = future . value
505
+ return response
506
506
507
507
@staticmethod
508
508
def _convert_alter_config_resource_request (config_resource ):
@@ -544,8 +544,8 @@ def alter_configs(self, config_resources):
544
544
future = self ._send_request_to_node (self ._client .least_loaded_node (), request )
545
545
546
546
self ._wait_for_futures ([future ])
547
-
548
- return future . value
547
+ response = future . value
548
+ return response
549
549
550
550
# alter replica logs dir protocol not yet implemented
551
551
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -602,6 +602,54 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
602
602
# describe delegation_token protocol not yet implemented
603
603
# Note: send the request to the least_loaded_node()
604
604
605
+ def _describe_consumer_groups_send_request (self , group_id , group_coordinator_id ):
606
+ """Send a DescribeGroupsRequest to the group's coordinator.
607
+
608
+ :param group_id: The group name as a string
609
+ :param group_coordinator_id: The node_id of the groups' coordinator
610
+ broker.
611
+ :return: A message future.
612
+ """
613
+ version = self ._matching_api_version (DescribeGroupsRequest )
614
+ if version <= 1 :
615
+ # Note: KAFKA-6788 A potential optimization is to group the
616
+ # request per coordinator and send one request with a list of
617
+ # all consumer groups. Java still hasn't implemented this
618
+ # because the error checking is hard to get right when some
619
+ # groups error and others don't.
620
+ request = DescribeGroupsRequest [version ](groups = (group_id ,))
621
+ else :
622
+ raise NotImplementedError (
623
+ "Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
624
+ .format (version ))
625
+ return self ._send_request_to_node (group_coordinator_id , request )
626
+
627
+ def _describe_consumer_groups_process_response (self , response ):
628
+ """Process a DescribeGroupsResponse into a group description."""
629
+ if response .API_VERSION <= 1 :
630
+ assert len (response .groups ) == 1
631
+ # TODO need to implement converting the response tuple into
632
+ # a more accessible interface like a namedtuple and then stop
633
+ # hardcoding tuple indices here. Several Java examples,
634
+ # including KafkaAdminClient.java
635
+ group_description = response .groups [0 ]
636
+ error_code = group_description [0 ]
637
+ error_type = Errors .for_code (error_code )
638
+ # Java has the note: KAFKA-6789, we can retry based on the error code
639
+ if error_type is not Errors .NoError :
640
+ raise error_type (
641
+ "DescribeGroupsResponse failed with response '{}'."
642
+ .format (response ))
643
+ # TODO Java checks the group protocol type, and if consumer
644
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
645
+ # the members' partition assignments... that hasn't yet been
646
+ # implemented here so just return the raw struct results
647
+ else :
648
+ raise NotImplementedError (
649
+ "Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
650
+ .format (response .API_VERSION ))
651
+ return group_description
652
+
605
653
def describe_consumer_groups (self , group_ids , group_coordinator_id = None ):
606
654
"""Describe a set of consumer groups.
607
655
@@ -622,51 +670,52 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
622
670
"""
623
671
group_descriptions = []
624
672
futures = []
625
- version = self ._matching_api_version (DescribeGroupsRequest )
626
673
for group_id in group_ids :
627
674
if group_coordinator_id is not None :
628
675
this_groups_coordinator_id = group_coordinator_id
629
676
else :
630
677
this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
631
-
632
- if version <= 1 :
633
- # Note: KAFKA-6788 A potential optimization is to group the
634
- # request per coordinator and send one request with a list of
635
- # all consumer groups. Java still hasn't implemented this
636
- # because the error checking is hard to get right when some
637
- # groups error and others don't.
638
- request = DescribeGroupsRequest [version ](groups = (group_id ,))
639
- futures .append (self ._send_request_to_node (this_groups_coordinator_id , request ))
640
- else :
641
- raise NotImplementedError (
642
- "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
643
- .format (version ))
678
+ f = self ._describe_consumer_groups_send_request (group_id , this_groups_coordinator_id )
679
+ futures .append (f )
644
680
645
681
self ._wait_for_futures (futures )
646
682
647
683
for future in futures :
648
684
response = future .value
649
- assert len (response .groups ) == 1
650
- # TODO need to implement converting the response tuple into
651
- # a more accessible interface like a namedtuple and then stop
652
- # hardcoding tuple indices here. Several Java examples,
653
- # including KafkaAdminClient.java
654
- group_description = response .groups [0 ]
655
- error_code = group_description [0 ]
656
- error_type = Errors .for_code (error_code )
657
- # Java has the note: KAFKA-6789, we can retry based on the error code
658
- if error_type is not Errors .NoError :
659
- raise error_type (
660
- "Request '{}' failed with response '{}'."
661
- .format (request , response ))
662
- # TODO Java checks the group protocol type, and if consumer
663
- # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
664
- # the members' partition assignments... that hasn't yet been
665
- # implemented here so just return the raw struct results
685
+ group_description = self ._describe_consumer_groups_process_response (response )
666
686
group_descriptions .append (group_description )
667
687
668
688
return group_descriptions
669
689
690
+ def _list_consumer_groups_send_request (self , broker_id ):
691
+ """Send a ListGroupsRequest to a broker.
692
+
693
+ :param broker_id: The broker's node_id.
694
+ :return: A message future
695
+ """
696
+ version = self ._matching_api_version (ListGroupsRequest )
697
+ if version <= 2 :
698
+ request = ListGroupsRequest [version ]()
699
+ else :
700
+ raise NotImplementedError (
701
+ "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient."
702
+ .format (version ))
703
+ return self ._send_request_to_node (broker_id , request )
704
+
705
+ def _list_consumer_groups_process_response (self , response ):
706
+ """Process a ListGroupsResponse into a list of groups."""
707
+ if response .API_VERSION <= 2 :
708
+ error_type = Errors .for_code (response .error_code )
709
+ if error_type is not Errors .NoError :
710
+ raise error_type (
711
+ "ListGroupsRequest failed with response '{}'."
712
+ .format (response ))
713
+ else :
714
+ raise NotImplementedError (
715
+ "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
716
+ .format (response .API_VERSION ))
717
+ return response .groups
718
+
670
719
def list_consumer_groups (self , broker_ids = None ):
671
720
"""List all consumer groups known to the cluster.
672
721
@@ -697,60 +746,26 @@ def list_consumer_groups(self, broker_ids=None):
697
746
# consumer groups move to new brokers that haven't yet been queried,
698
747
# then the same group could be returned by multiple brokers.
699
748
consumer_groups = set ()
700
- futures = []
701
749
if broker_ids is None :
702
750
broker_ids = [broker .nodeId for broker in self ._client .cluster .brokers ()]
703
- version = self ._matching_api_version (ListGroupsRequest )
704
- if version <= 2 :
705
- request = ListGroupsRequest [version ]()
706
- for broker_id in broker_ids :
707
- futures .append (self ._send_request_to_node (broker_id , request ))
751
+ futures = [self ._list_consumer_groups_send_request (b ) for b in broker_ids ]
708
752
709
- self ._wait_for_futures (futures )
753
+ self ._wait_for_futures (futures )
710
754
711
- for future in futures :
712
- response = future .value
713
- error_type = Errors .for_code (response .error_code )
714
- if error_type is not Errors .NoError :
715
- raise error_type (
716
- "Request '{}' failed with response '{}'."
717
- .format (request , response ))
718
- consumer_groups .update (response .groups )
719
- else :
720
- raise NotImplementedError (
721
- "Support for ListGroups v{} has not yet been added to KafkaAdminClient."
722
- .format (version ))
755
+ for future in futures :
756
+ response = future .value
757
+ consumer_groups .update (self ._list_consumer_groups_process_response (response ))
723
758
return list (consumer_groups )
724
759
725
- def list_consumer_group_offsets (self , group_id , group_coordinator_id = None ,
726
- partitions = None ):
727
- """Fetch Consumer Group Offsets.
728
-
729
- Note:
730
- This does not verify that the group_id or partitions actually exist
731
- in the cluster.
732
-
733
- As soon as any error is encountered, it is immediately raised.
760
+ def _list_consumer_group_offsets_send_request (self , group_id ,
761
+ group_coordinator_id , partitions = None ):
762
+ """Send an OffsetFetchRequest to a broker.
734
763
735
764
:param group_id: The consumer group id name for which to fetch offsets.
736
765
:param group_coordinator_id: The node_id of the group's coordinator
737
- broker. If set to None, will query the cluster to find the group
738
- coordinator. Explicitly specifying this can be useful to prevent
739
- that extra network round trip if you already know the group
740
- coordinator. Default: None.
741
- :param partitions: A list of TopicPartitions for which to fetch
742
- offsets. On brokers >= 0.10.2, this can be set to None to fetch all
743
- known offsets for the consumer group. Default: None.
744
- :return dictionary: A dictionary with TopicPartition keys and
745
- OffsetAndMetada values. Partitions that are not specified and for
746
- which the group_id does not have a recorded offset are omitted. An
747
- offset value of `-1` indicates the group_id has no offset for that
748
- TopicPartition. A `-1` can only happen for partitions that are
749
- explicitly specified.
766
+ broker.
767
+ :return: A message future
750
768
"""
751
- group_offsets_listing = {}
752
- if group_coordinator_id is None :
753
- group_coordinator_id = self ._find_group_coordinator_id (group_id )
754
769
version = self ._matching_api_version (OffsetFetchRequest )
755
770
if version <= 3 :
756
771
if partitions is None :
@@ -768,32 +783,80 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
768
783
topics_partitions_dict [topic ].add (partition )
769
784
topics_partitions = list (six .iteritems (topics_partitions_dict ))
770
785
request = OffsetFetchRequest [version ](group_id , topics_partitions )
771
- future = self ._send_request_to_node (group_coordinator_id , request )
772
- self ._wait_for_futures ([future ])
773
- response = future .value
786
+ else :
787
+ raise NotImplementedError (
788
+ "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient."
789
+ .format (version ))
790
+ return self ._send_request_to_node (group_coordinator_id , request )
791
+
792
+ def _list_consumer_group_offsets_process_response (self , response ):
793
+ """Process an OffsetFetchResponse.
774
794
775
- if version > 1 : # OffsetFetchResponse_v1 lacks a top-level error_code
795
+ :param response: an OffsetFetchResponse.
796
+ :return: A dictionary composed of TopicPartition keys and
797
+ OffsetAndMetada values.
798
+ """
799
+ if response .API_VERSION <= 3 :
800
+
801
+ # OffsetFetchResponse_v1 lacks a top-level error_code
802
+ if response .API_VERSION > 1 :
776
803
error_type = Errors .for_code (response .error_code )
777
804
if error_type is not Errors .NoError :
778
805
# optionally we could retry if error_type.retriable
779
806
raise error_type (
780
- "Request '{}' failed with response '{}'."
781
- .format (request , response ))
807
+ "OffsetFetchResponse failed with response '{}'."
808
+ .format (response ))
809
+
782
810
# transform response into a dictionary with TopicPartition keys and
783
811
# OffsetAndMetada values--this is what the Java AdminClient returns
812
+ offsets = {}
784
813
for topic , partitions in response .topics :
785
814
for partition , offset , metadata , error_code in partitions :
786
815
error_type = Errors .for_code (error_code )
787
816
if error_type is not Errors .NoError :
788
817
raise error_type (
789
- "Unable to fetch offsets for group_id {}, topic {}, partition {}"
790
- .format (group_id , topic , partition ))
791
- group_offsets_listing [TopicPartition (topic , partition )] = OffsetAndMetadata (offset , metadata )
818
+ "Unable to fetch consumer group offsets for topic {}, partition {}"
819
+ .format (topic , partition ))
820
+ offsets [TopicPartition (topic , partition )] = OffsetAndMetadata (offset , metadata )
792
821
else :
793
822
raise NotImplementedError (
794
- "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
795
- .format (version ))
796
- return group_offsets_listing
823
+ "Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
824
+ .format (response .API_VERSION ))
825
+ return offsets
826
+
827
+ def list_consumer_group_offsets (self , group_id , group_coordinator_id = None ,
828
+ partitions = None ):
829
+ """Fetch Consumer Offsets for a single consumer group.
830
+
831
+ Note:
832
+ This does not verify that the group_id or partitions actually exist
833
+ in the cluster.
834
+
835
+ As soon as any error is encountered, it is immediately raised.
836
+
837
+ :param group_id: The consumer group id name for which to fetch offsets.
838
+ :param group_coordinator_id: The node_id of the group's coordinator
839
+ broker. If set to None, will query the cluster to find the group
840
+ coordinator. Explicitly specifying this can be useful to prevent
841
+ that extra network round trip if you already know the group
842
+ coordinator. Default: None.
843
+ :param partitions: A list of TopicPartitions for which to fetch
844
+ offsets. On brokers >= 0.10.2, this can be set to None to fetch all
845
+ known offsets for the consumer group. Default: None.
846
+ :return dictionary: A dictionary with TopicPartition keys and
847
+ OffsetAndMetada values. Partitions that are not specified and for
848
+ which the group_id does not have a recorded offset are omitted. An
849
+ offset value of `-1` indicates the group_id has no offset for that
850
+ TopicPartition. A `-1` can only happen for partitions that are
851
+ explicitly specified.
852
+ """
853
+ if group_coordinator_id is None :
854
+ group_coordinator_id = self ._find_group_coordinator_id (group_id )
855
+ future = self ._list_consumer_group_offsets_send_request (
856
+ group_id , group_coordinator_id , partitions )
857
+ self ._wait_for_futures ([future ])
858
+ response = future .value
859
+ return self ._list_consumer_group_offsets_process_response (response )
797
860
798
861
# delete groups protocol not yet implemented
799
862
# Note: send the request to the group's coordinator.
0 commit comments