@@ -331,18 +331,13 @@ def _handle_join_success(self, member_assignment_bytes):
331331 with self ._lock :
332332 log .info ("Successfully joined group %s with generation %s" ,
333333 self .group_id , self ._generation .generation_id )
334- self .join_future = None
335334 self .state = MemberState .STABLE
336- self .rejoining = False
337- self ._heartbeat_thread .enable ()
338- self ._on_join_complete (self ._generation .generation_id ,
339- self ._generation .member_id ,
340- self ._generation .protocol ,
341- member_assignment_bytes )
335+ self .rejoin_needed = False
336+ if self ._heartbeat_thread :
337+ self ._heartbeat_thread .enable ()
342338
343339 def _handle_join_failure (self , _ ):
344340 with self ._lock :
345- self .join_future = None
346341 self .state = MemberState .UNJOINED
347342
348343 def ensure_active_group (self ):
@@ -351,7 +346,7 @@ def ensure_active_group(self):
351346 if self ._heartbeat_thread is None :
352347 self ._start_heartbeat_thread ()
353348
354- while self .need_rejoin ():
349+ while self .need_rejoin () or self . _rejoin_incomplete () :
355350 self .ensure_coordinator_ready ()
356351
357352 # call on_join_prepare if needed. We set a flag
@@ -382,6 +377,12 @@ def ensure_active_group(self):
382377 # This ensures that we do not mistakenly attempt to rejoin
383378 # before the pending rebalance has completed.
384379 if self .join_future is None :
380+ # Fence off the heartbeat thread explicitly so that it cannot
381+ # interfere with the join group. Note that this must come after
382+ # the call to _on_join_prepare since we must be able to continue
383+ # sending heartbeats if that callback takes some time.
384+ self ._heartbeat_thread .disable ()
385+
385386 self .state = MemberState .REBALANCING
386387 future = self ._send_join_group_request ()
387388
@@ -402,7 +403,16 @@ def ensure_active_group(self):
402403
403404 self ._client .poll (future = future )
404405
405- if future .failed ():
406+ if future .succeeded ():
407+ self ._on_join_complete (self ._generation .generation_id ,
408+ self ._generation .member_id ,
409+ self ._generation .protocol ,
410+ future .value )
411+ self .join_future = None
412+ self .rejoining = False
413+
414+ else :
415+ self .join_future = None
406416 exception = future .exception
407417 if isinstance (exception , (Errors .UnknownMemberIdError ,
408418 Errors .RebalanceInProgressError ,
@@ -412,6 +422,9 @@ def ensure_active_group(self):
412422 raise exception # pylint: disable-msg=raising-bad-type
413423 time .sleep (self .config ['retry_backoff_ms' ] / 1000 )
414424
425+ def _rejoin_incomplete (self ):
426+ return self .join_future is not None
427+
415428 def _send_join_group_request (self ):
416429 """Join the group and return the assignment for the next generation.
417430
@@ -497,7 +510,6 @@ def _handle_join_group_response(self, future, send_time, response):
497510 self ._generation = Generation (response .generation_id ,
498511 response .member_id ,
499512 response .group_protocol )
500- self .rejoin_needed = False
501513
502514 if response .leader_id == response .member_id :
503515 log .info ("Elected group leader -- performing partition"
0 commit comments