2727import org .elasticsearch .cluster .ClusterState .Builder ;
2828import org .elasticsearch .cluster .ClusterState .VotingConfiguration ;
2929import org .elasticsearch .cluster .ClusterStateTaskConfig ;
30+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
3031import org .elasticsearch .cluster .block .ClusterBlocks ;
3132import org .elasticsearch .cluster .coordination .FollowersChecker .FollowerCheckRequest ;
3233import org .elasticsearch .cluster .coordination .JoinHelper .InitialJoinAccumulator ;
4243import org .elasticsearch .common .Strings ;
4344import org .elasticsearch .common .component .AbstractLifecycleComponent ;
4445import org .elasticsearch .common .lease .Releasable ;
46+ import org .elasticsearch .common .settings .ClusterSettings ;
4547import org .elasticsearch .common .settings .Setting ;
4648import org .elasticsearch .common .settings .Settings ;
4749import org .elasticsearch .common .unit .TimeValue ;
6466import java .util .Optional ;
6567import java .util .Random ;
6668import java .util .Set ;
69+ import java .util .concurrent .atomic .AtomicBoolean ;
6770import java .util .function .Supplier ;
6871import java .util .stream .Collectors ;
72+ import java .util .stream .StreamSupport ;
6973
74+ import static java .util .Collections .emptySet ;
75+ import static org .elasticsearch .cluster .coordination .Reconfigurator .CLUSTER_MASTER_NODES_FAILURE_TOLERANCE ;
7076import static org .elasticsearch .discovery .DiscoverySettings .NO_MASTER_BLOCK_WRITES ;
7177import static org .elasticsearch .gateway .GatewayService .STATE_NOT_RECOVERED_BLOCK ;
7278
@@ -104,16 +110,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
104110 @ Nullable
105111 private Releasable leaderCheckScheduler ;
106112 private long maxTermSeen ;
113+ private final Reconfigurator reconfigurator ;
107114
108115 private Mode mode ;
109116 private Optional <DiscoveryNode > lastKnownLeader ;
110117 private Optional <Join > lastJoin ;
111118 private JoinHelper .JoinAccumulator joinAccumulator ;
112119 private Optional <CoordinatorPublication > currentPublication = Optional .empty ();
113120
114- public Coordinator (Settings settings , TransportService transportService , AllocationService allocationService ,
115- MasterService masterService , Supplier <CoordinationState .PersistedState > persistedStateSupplier ,
116- UnicastHostsProvider unicastHostsProvider , ClusterApplier clusterApplier , Random random ) {
121+ public Coordinator (Settings settings , ClusterSettings clusterSettings , TransportService transportService ,
122+ AllocationService allocationService , MasterService masterService ,
123+ Supplier <CoordinationState .PersistedState > persistedStateSupplier , UnicastHostsProvider unicastHostsProvider ,
124+ ClusterApplier clusterApplier , Random random ) {
117125 super (settings );
118126 this .transportService = transportService ;
119127 this .masterService = masterService ;
@@ -136,6 +144,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
136144 this .nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor (allocationService , logger );
137145 this .clusterApplier = clusterApplier ;
138146 masterService .setClusterStateSupplier (this ::getStateForMasterService );
147+ this .reconfigurator = new Reconfigurator (settings , clusterSettings );
139148 }
140149
141150 private Runnable getOnLeaderFailure () {
@@ -269,8 +278,13 @@ private void updateMaxTermSeen(final long term) {
269278 logger .debug ("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump" ,
270279 maxTermSeen , currentTerm );
271280 } else {
272- ensureTermAtLeast (getLocalNode (), maxTermSeen );
273- startElection ();
281+ try {
282+ ensureTermAtLeast (getLocalNode (), maxTermSeen );
283+ startElection ();
284+ } catch (Exception e ) {
285+ logger .warn (new ParameterizedMessage ("failed to bump term to {}" , maxTermSeen ), e );
286+ becomeCandidate ("updateMaxTermSeen" );
287+ }
274288 }
275289 }
276290 }
@@ -524,6 +538,12 @@ public void invariant() {
524538 assert lastPublishedNodes .equals (followersChecker .getKnownFollowers ()) :
525539 lastPublishedNodes + " != " + followersChecker .getKnownFollowers ();
526540 }
541+
542+ assert becomingMaster || activePublication ||
543+ coordinationState .get ().getLastAcceptedConfiguration ().equals (coordinationState .get ().getLastCommittedConfiguration ())
544+ : coordinationState .get ().getLastAcceptedConfiguration () + " != "
545+ + coordinationState .get ().getLastCommittedConfiguration ();
546+
527547 } else if (mode == Mode .FOLLOWER ) {
528548 assert coordinationState .get ().electionWon () == false : getLocalNode () + " is FOLLOWER so electionWon() should be false" ;
529549 assert lastKnownLeader .isPresent () && (lastKnownLeader .get ().equals (getLocalNode ()) == false );
@@ -582,13 +602,59 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio
582602 MetaData .Builder metaDataBuilder = MetaData .builder ();
583603 // automatically generate a UID for the metadata if we need to
584604 metaDataBuilder .generateClusterUuidIfNeeded (); // TODO generate UUID in bootstrapping tool?
605+ metaDataBuilder .persistentSettings (Settings .builder ().put (CLUSTER_MASTER_NODES_FAILURE_TOLERANCE .getKey (),
606+ (votingConfiguration .getNodeIds ().size () - 1 ) / 2 ).build ()); // TODO set this in bootstrapping tool?
585607 builder .metaData (metaDataBuilder );
586608 coordinationState .get ().setInitialState (builder .build ());
587609 preVoteCollector .update (getPreVoteResponse (), null ); // pick up the change to last-accepted version
588610 startElectionScheduler ();
589611 }
590612 }
591613
614+ // Package-private for testing
615+ ClusterState improveConfiguration (ClusterState clusterState ) {
616+ assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
617+
618+ final Set <DiscoveryNode > liveNodes = StreamSupport .stream (clusterState .nodes ().spliterator (), false )
619+ .filter (this ::hasJoinVoteFrom ).collect (Collectors .toSet ());
620+ final ClusterState .VotingConfiguration newConfig = reconfigurator .reconfigure (
621+ liveNodes , emptySet (), clusterState .getLastAcceptedConfiguration ());
622+ if (newConfig .equals (clusterState .getLastAcceptedConfiguration ()) == false ) {
623+ assert coordinationState .get ().joinVotesHaveQuorumFor (newConfig );
624+ return ClusterState .builder (clusterState ).lastAcceptedConfiguration (newConfig ).build ();
625+ }
626+
627+ return clusterState ;
628+ }
629+
630+ private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean ();
631+
632+ private void scheduleReconfigurationIfNeeded () {
633+ assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
634+ assert mode == Mode .LEADER : mode ;
635+ assert currentPublication .isPresent () == false : "Expected no publication in progress" ;
636+
637+ final ClusterState state = getLastAcceptedState ();
638+ if (improveConfiguration (state ) != state && reconfigurationTaskScheduled .compareAndSet (false , true )) {
639+ logger .trace ("scheduling reconfiguration" );
640+ masterService .submitStateUpdateTask ("reconfigure" , new ClusterStateUpdateTask (Priority .URGENT ) {
641+ @ Override
642+ public ClusterState execute (ClusterState currentState ) {
643+ reconfigurationTaskScheduled .set (false );
644+ synchronized (mutex ) {
645+ return improveConfiguration (currentState );
646+ }
647+ }
648+
649+ @ Override
650+ public void onFailure (String source , Exception e ) {
651+ reconfigurationTaskScheduled .set (false );
652+ logger .debug ("reconfiguration failed" , e );
653+ }
654+ });
655+ }
656+ }
657+
592658 // for tests
593659 boolean hasJoinVoteFrom (DiscoveryNode localNode ) {
594660 return coordinationState .get ().containsJoinVoteFor (localNode );
@@ -599,19 +665,34 @@ private void handleJoin(Join join) {
599665 ensureTermAtLeast (getLocalNode (), join .getTerm ()).ifPresent (this ::handleJoin );
600666
601667 if (coordinationState .get ().electionWon ()) {
602- // if we have already won the election then the actual join does not matter for election purposes,
603- // so swallow any exception
604- try {
605- coordinationState .get ().handleJoin (join );
606- } catch (CoordinationStateRejectedException e ) {
607- logger .debug (new ParameterizedMessage ("failed to add {} - ignoring" , join ), e );
668+ // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
669+ final boolean isNewJoin = handleJoinIgnoringExceptions (join );
670+
671+ // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
672+ // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
673+ // race against the election-winning publication and log a big error message, which we can prevent by checking this here:
674+ final boolean establishedAsMaster = mode == Mode .LEADER && getLastAcceptedState ().term () == getCurrentTerm ();
675+ if (isNewJoin && establishedAsMaster && publicationInProgress () == false ) {
676+ scheduleReconfigurationIfNeeded ();
608677 }
609678 } else {
610679 coordinationState .get ().handleJoin (join ); // this might fail and bubble up the exception
611680 }
612681 }
613682 }
614683
684+ /**
685+ * @return true iff the join was from a new node and was successfully added
686+ */
687+ private boolean handleJoinIgnoringExceptions (Join join ) {
688+ try {
689+ return coordinationState .get ().handleJoin (join );
690+ } catch (CoordinationStateRejectedException e ) {
691+ logger .debug (new ParameterizedMessage ("failed to add {} - ignoring" , join ), e );
692+ return false ;
693+ }
694+ }
695+
615696 public ClusterState getLastAcceptedState () {
616697 synchronized (mutex ) {
617698 return coordinationState .get ().getLastAcceptedState ();
@@ -904,6 +985,10 @@ public void onSuccess(String source) {
904985 logger .debug ("publication ended successfully: {}" , CoordinatorPublication .this );
905986 // trigger term bump if new term was found during publication
906987 updateMaxTermSeen (getCurrentTerm ());
988+
989+ if (mode == Mode .LEADER ) {
990+ scheduleReconfigurationIfNeeded ();
991+ }
907992 }
908993 ackListener .onNodeAck (getLocalNode (), null );
909994 publishListener .onResponse (null );
@@ -916,8 +1001,7 @@ public void onFailure(Exception e) {
9161001 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
9171002 removePublicationAndPossiblyBecomeCandidate ("Publication.onCompletion(false)" );
9181003
919- FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException (
920- "publication failed" , e );
1004+ final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException ("publication failed" , e );
9211005 ackListener .onNodeAck (getLocalNode (), exception ); // other nodes have acked, but not the master.
9221006 publishListener .onFailure (exception );
9231007 }
0 commit comments