diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index 5ce863037..3dc2e441b 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -4,6 +4,7 @@ import logging import socket from kafka.client_async import KafkaClient, selectors +import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError, NodeNotReadyError, NotControllerError) @@ -11,6 +12,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest) +from kafka.protocol.commit import GroupCoordinatorRequest from kafka.protocol.metadata import MetadataRequest from kafka.version import __version__ @@ -243,6 +245,44 @@ def _refresh_controller_id(self): "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." .format(version)) + def _find_group_coordinator_id(self, group_id): + """Find the broker node_id of the coordinator of the given group. + + Sends a FindCoordinatorRequest message to the cluster. Will block until + the FindCoordinatorResponse is received. Any errors are immediately + raised. + + :param group_id: The consumer group ID. This is typically the group + name as a string. + :return: The node_id of the broker that is the coordinator. + """ + # Note: Java may change how this is implemented in KAFKA-6791. + # + # TODO add support for dynamically picking version of + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. + # When I experimented with this, GroupCoordinatorResponse_v1 didn't + # match GroupCoordinatorResponse_v0 and I couldn't figure out why. + gc_request = GroupCoordinatorRequest[0](group_id) + gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) + # use the extra error checking in add_group_coordinator() rather than + # immediately returning the group coordinator. + success = self._client.cluster.add_group_coordinator(group_id, gc_response) + if not success: + error_type = Errors.for_code(gc_response.error_code) + assert error_type is not Errors.NoError + # Note: When error_type.retriable, Java will retry... see + # KafkaAdminClient's handleFindCoordinatorError method + raise error_type( + "Could not identify group coordinator for group_id '{}' from response '{}'." + .format(group_id, gc_response)) + group_coordinator = self._client.cluster.coordinator_for_group(group_id) + # will be None if the coordinator was never populated, which should never happen here + assert group_coordinator is not None + # will be -1 if add_group_coordinator() failed... but by this point the + # error should have been raised. + assert group_coordinator != -1 + return group_coordinator + def _send_request_to_node(self, node, request): """Send a kafka protocol message to a specific broker. Will block until the message result is received.