@@ -222,8 +222,10 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
222222 ensureTermAtLeast (sourceNode , publishRequest .getAcceptedState ().term ());
223223 final PublishResponse publishResponse = coordinationState .get ().handlePublishRequest (publishRequest );
224224
225- if (sourceNode .equals (getLocalNode ()) == false ) {
226- becomeFollower ("handlePublishRequest" , sourceNode );
225+ if (sourceNode .equals (getLocalNode ())) {
226+ preVoteCollector .update (getPreVoteResponse (), getLocalNode ());
227+ } else {
228+ becomeFollower ("handlePublishRequest" , sourceNode ); // also updates preVoteCollector
227229 }
228230
229231 return new PublishWithJoinResponse (publishResponse ,
@@ -254,27 +256,31 @@ private void closePrevotingAndElectionScheduler() {
254256 }
255257
256258 private void updateMaxTermSeen (final long term ) {
257- maxTermSeen .updateAndGet (oldMaxTerm -> Math .max (oldMaxTerm , term ));
258- // TODO if we are leader here, and there is no publication in flight, then we should bump our term
259- // (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but
260- // do check for this after the publication completes)
259+ final long updatedMaxTermSeen = maxTermSeen .updateAndGet (oldMaxTerm -> Math .max (oldMaxTerm , term ));
260+ synchronized (mutex ) {
261+ if (mode == Mode .LEADER && publicationInProgress () == false && updatedMaxTermSeen > getCurrentTerm ()) {
262+ // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
263+ // since we check whether a term bump is needed at the end of the publication too.
264+ ensureTermAtLeast (getLocalNode (), updatedMaxTermSeen );
265+ startElection ();
266+ }
267+ }
261268 }
262269
263- // TODO: make private again after removing term-bump workaround
264- void startElection () {
270+ private void startElection () {
265271 synchronized (mutex ) {
266272 // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
267273 // to check our mode again here.
268274 if (mode == Mode .CANDIDATE ) {
269275 final StartJoinRequest startJoinRequest
270276 = new StartJoinRequest (getLocalNode (), Math .max (getCurrentTerm (), maxTermSeen .get ()) + 1 );
277+ logger .debug ("starting election with {}" , startJoinRequest );
271278 getDiscoveredNodes ().forEach (node -> joinHelper .sendStartJoinRequest (startJoinRequest , node ));
272279 }
273280 }
274281 }
275282
276- // TODO: make private again after removing term-bump workaround
277- Optional <Join > ensureTermAtLeast (DiscoveryNode sourceNode , long targetTerm ) {
283+ private Optional <Join > ensureTermAtLeast (DiscoveryNode sourceNode , long targetTerm ) {
278284 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
279285 if (getCurrentTerm () < targetTerm ) {
280286 return Optional .of (joinLeaderInTerm (new StartJoinRequest (sourceNode , targetTerm )));
@@ -289,9 +295,10 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
289295 lastJoin = Optional .of (join );
290296 peerFinder .setCurrentTerm (getCurrentTerm ());
291297 if (mode != Mode .CANDIDATE ) {
292- becomeCandidate ("joinLeaderInTerm" ); // updates followersChecker
298+ becomeCandidate ("joinLeaderInTerm" ); // updates followersChecker and preVoteCollector
293299 } else {
294300 followersChecker .updateFastResponseState (getCurrentTerm (), mode );
301+ preVoteCollector .update (getPreVoteResponse (), null );
295302 }
296303 return join ;
297304 }
@@ -485,6 +492,8 @@ public void invariant() {
485492 assert becomingMaster || getStateForMasterService ().nodes ().getMasterNodeId () != null : getStateForMasterService ();
486493 assert leaderCheckScheduler == null : leaderCheckScheduler ;
487494 assert applierState .nodes ().getMasterNodeId () == null || getLocalNode ().equals (applierState .nodes ().getMasterNode ());
495+ assert preVoteCollector .getLeader () == getLocalNode () : preVoteCollector ;
496+ assert preVoteCollector .getPreVoteResponse ().equals (getPreVoteResponse ()) : preVoteCollector ;
488497
489498 final boolean activePublication = currentPublication .map (CoordinatorPublication ::isActiveForCurrentLeader ).orElse (false );
490499 if (becomingMaster && activePublication == false ) {
@@ -517,6 +526,8 @@ public void invariant() {
517526 assert leaderCheckScheduler != null ;
518527 assert followersChecker .getKnownFollowers ().isEmpty ();
519528 assert currentPublication .map (Publication ::isCommitted ).orElse (true );
529+ assert preVoteCollector .getLeader ().equals (lastKnownLeader .get ()) : preVoteCollector ;
530+ assert preVoteCollector .getPreVoteResponse ().equals (getPreVoteResponse ()) : preVoteCollector ;
520531 } else {
521532 assert mode == Mode .CANDIDATE ;
522533 assert joinAccumulator instanceof JoinHelper .CandidateJoinAccumulator ;
@@ -528,6 +539,8 @@ public void invariant() {
528539 assert followersChecker .getKnownFollowers ().isEmpty ();
529540 assert applierState .nodes ().getMasterNodeId () == null ;
530541 assert currentPublication .map (Publication ::isCommitted ).orElse (true );
542+ assert preVoteCollector .getLeader () == null : preVoteCollector ;
543+ assert preVoteCollector .getPreVoteResponse ().equals (getPreVoteResponse ()) : preVoteCollector ;
531544 }
532545 }
533546 }
@@ -537,7 +550,7 @@ boolean hasJoinVoteFrom(DiscoveryNode localNode) {
537550 return coordinationState .get ().containsJoinVoteFor (localNode );
538551 }
539552
540- void handleJoin (Join join ) {
553+ private void handleJoin (Join join ) {
541554 synchronized (mutex ) {
542555 ensureTermAtLeast (getLocalNode (), join .getTerm ()).ifPresent (this ::handleJoin );
543556
@@ -547,7 +560,7 @@ void handleJoin(Join join) {
547560 try {
548561 coordinationState .get ().handleJoin (join );
549562 } catch (CoordinationStateRejectedException e ) {
550- logger .debug ("failed to add join, ignoring" , e );
563+ logger .debug (new ParameterizedMessage ( "failed to add {} - ignoring" , join ) , e );
551564 }
552565 } else {
553566 coordinationState .get ().handleJoin (join ); // this might fail and bubble up the exception
@@ -753,6 +766,11 @@ class CoordinatorPublication extends Publication {
753766 private final AckListener ackListener ;
754767 private final ActionListener <Void > publishListener ;
755768
769+ // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
770+ // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
771+ private final List <Join > receivedJoins = new ArrayList <>();
772+ private boolean receivedJoinsProcessed ;
773+
756774 CoordinatorPublication (PublishRequest publishRequest , ListenableFuture <Void > localNodeAckEvent , AckListener ackListener ,
757775 ActionListener <Void > publishListener ) {
758776 super (Coordinator .this .settings , publishRequest ,
@@ -790,6 +808,7 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) {
790808
791809 assert currentPublication .get () == this ;
792810 currentPublication = Optional .empty ();
811+ logger .debug ("publication ended unsuccessfully: {}" , this );
793812
794813 // check if node has not already switched modes (by bumping term)
795814 if (isActiveForCurrentLeader ()) {
@@ -812,6 +831,10 @@ public void onResponse(Void ignore) {
812831 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
813832 assert committed ;
814833
834+ receivedJoins .forEach (CoordinatorPublication .this ::handleAssociatedJoin );
835+ assert receivedJoinsProcessed == false ;
836+ receivedJoinsProcessed = true ;
837+
815838 clusterApplier .onNewClusterState (CoordinatorPublication .this .toString (), () -> applierState ,
816839 new ClusterApplyListener () {
817840 @ Override
@@ -828,6 +851,7 @@ public void onSuccess(String source) {
828851 synchronized (mutex ) {
829852 assert currentPublication .get () == CoordinatorPublication .this ;
830853 currentPublication = Optional .empty ();
854+ logger .debug ("publication ended successfully: {}" , CoordinatorPublication .this );
831855 // trigger term bump if new term was found during publication
832856 updateMaxTermSeen (getCurrentTerm ());
833857 }
@@ -850,6 +874,13 @@ public void onFailure(Exception e) {
850874 }, EsExecutors .newDirectExecutorService ());
851875 }
852876
877+ private void handleAssociatedJoin (Join join ) {
878+ if (join .getTerm () == getCurrentTerm () && hasJoinVoteFrom (join .getSourceNode ()) == false ) {
879+ logger .trace ("handling {}" , join );
880+ handleJoin (join );
881+ }
882+ }
883+
853884 @ Override
854885 protected boolean isPublishQuorum (CoordinationState .VoteCollection votes ) {
855886 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
@@ -867,10 +898,26 @@ protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourc
867898 @ Override
868899 protected void onJoin (Join join ) {
869900 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
870- if (join .getTerm () == getCurrentTerm ()) {
871- handleJoin (join );
901+ if (receivedJoinsProcessed ) {
902+ // a late response may arrive after the state has been locally applied, meaning that receivedJoins has already been
903+ // processed, so we have to handle this late response here.
904+ handleAssociatedJoin (join );
905+ } else {
906+ receivedJoins .add (join );
907+ }
908+ }
909+
910+ @ Override
911+ protected void onMissingJoin (DiscoveryNode discoveryNode ) {
912+ assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
913+ // The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
914+ // node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
915+ // of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
916+ if (hasJoinVoteFrom (discoveryNode ) == false ) {
917+ final long term = publishRequest .getAcceptedState ().term ();
918+ logger .debug ("onMissingJoin: no join vote from {}, bumping term to exceed {}" , discoveryNode , term );
919+ updateMaxTermSeen (term + 1 );
872920 }
873- // TODO: what to do on missing join?
874921 }
875922
876923 @ Override
0 commit comments