@@ -271,6 +271,47 @@ def _refresh_controller_id(self):
271
271
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
272
272
.format (version ))
273
273
274
+ def _find_group_coordinator_id_send_request (self , group_id ):
275
+ """Send a FindCoordinatorRequest to a broker.
276
+
277
+ :param group_id: The consumer group ID. This is typically the group
278
+ name as a string.
279
+ :return: A message future
280
+ """
281
+ # TODO add support for dynamically picking version of
282
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
283
+ # When I experimented with this, the coordinator value returned in
284
+ # GroupCoordinatorResponse_v1 didn't match the value returned by
285
+ # GroupCoordinatorResponse_v0 and I couldn't figure out why.
286
+ version = 0 # version = self._matching_api_version(GroupCoordinatorRequest)
287
+ if version <= 0 :
288
+ request = GroupCoordinatorRequest [version ](group_id )
289
+ else :
290
+ raise NotImplementedError (
291
+ "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
292
+ .format (version ))
293
+ return self ._send_request_to_node (self ._client .least_loaded_node (), request )
294
+
295
+ def _find_group_coordinator_id_process_response (self , response ):
296
+ """Process a FindCoordinatorResponse.
297
+
298
+ :param response: a FindCoordinatorResponse.
299
+ :return: The node_id of the broker that is the coordinator.
300
+ """
301
+ if response .API_VERSION <= 0 :
302
+ error_type = Errors .for_code (response .error_code )
303
+ if error_type is not Errors .NoError :
304
+ # Note: When error_type.retriable, Java will retry... see
305
+ # KafkaAdminClient's handleFindCoordinatorError method
306
+ raise error_type (
307
+ "FindCoordinatorRequest failed with response '{}'."
308
+ .format (response ))
309
+ else :
310
+ raise NotImplementedError (
311
+ "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
312
+ .format (response .API_VERSION ))
313
+ return response .coordinator_id
314
+
274
315
def _find_group_coordinator_id (self , group_id ):
275
316
"""Find the broker node_id of the coordinator of the given group.
276
317
@@ -283,35 +324,10 @@ def _find_group_coordinator_id(self, group_id):
283
324
:return: The node_id of the broker that is the coordinator.
284
325
"""
285
326
# Note: Java may change how this is implemented in KAFKA-6791.
286
- #
287
- # TODO add support for dynamically picking version of
288
- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
289
- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
290
- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
291
- gc_request = GroupCoordinatorRequest [0 ](group_id )
292
- future = self ._send_request_to_node (self ._client .least_loaded_node (), gc_request )
293
-
327
+ future = self ._find_group_coordinator_id_send_request (group_id )
294
328
self ._wait_for_futures ([future ])
295
-
296
- gc_response = future .value
297
- # use the extra error checking in add_group_coordinator() rather than
298
- # immediately returning the group coordinator.
299
- success = self ._client .cluster .add_group_coordinator (group_id , gc_response )
300
- if not success :
301
- error_type = Errors .for_code (gc_response .error_code )
302
- assert error_type is not Errors .NoError
303
- # Note: When error_type.retriable, Java will retry... see
304
- # KafkaAdminClient's handleFindCoordinatorError method
305
- raise error_type (
306
- "Could not identify group coordinator for group_id '{}' from response '{}'."
307
- .format (group_id , gc_response ))
308
- group_coordinator = self ._client .cluster .coordinator_for_group (group_id )
309
- # will be None if the coordinator was never populated, which should never happen here
310
- assert group_coordinator is not None
311
- # will be -1 if add_group_coordinator() failed... but by this point the
312
- # error should have been raised.
313
- assert group_coordinator != - 1
314
- return group_coordinator
329
+ response = future .value
330
+ return self ._find_group_coordinator_id_process_response (response )
315
331
316
332
def _send_request_to_node (self , node_id , request ):
317
333
"""Send a Kafka protocol message to a specific broker.
@@ -329,7 +345,6 @@ def _send_request_to_node(self, node_id, request):
329
345
self ._client .poll ()
330
346
return self ._client .send (node_id , request )
331
347
332
-
333
348
def _send_request_to_controller (self , request ):
334
349
"""Send a Kafka protocol message to the cluster controller.
335
350
0 commit comments