2727import org .elasticsearch .cluster .ClusterState .Builder ;
2828import org .elasticsearch .cluster .ClusterState .VotingConfiguration ;
2929import org .elasticsearch .cluster .ClusterStateTaskConfig ;
30+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
3031import org .elasticsearch .cluster .block .ClusterBlocks ;
3132import org .elasticsearch .cluster .coordination .FollowersChecker .FollowerCheckRequest ;
3233import org .elasticsearch .cluster .coordination .JoinHelper .InitialJoinAccumulator ;
4243import org .elasticsearch .common .Strings ;
4344import org .elasticsearch .common .component .AbstractLifecycleComponent ;
4445import org .elasticsearch .common .lease .Releasable ;
46+ import org .elasticsearch .common .settings .ClusterSettings ;
4547import org .elasticsearch .common .settings .Setting ;
4648import org .elasticsearch .common .settings .Settings ;
4749import org .elasticsearch .common .unit .TimeValue ;
6466import java .util .Optional ;
6567import java .util .Random ;
6668import java .util .Set ;
69+ import java .util .concurrent .atomic .AtomicBoolean ;
6770import java .util .function .Supplier ;
6871import java .util .stream .Collectors ;
72+ import java .util .stream .StreamSupport ;
6973
74+ import static java .util .Collections .emptySet ;
75+ import static org .elasticsearch .cluster .coordination .Reconfigurator .CLUSTER_MASTER_NODES_FAILURE_TOLERANCE ;
7076import static org .elasticsearch .discovery .DiscoverySettings .NO_MASTER_BLOCK_WRITES ;
7177import static org .elasticsearch .gateway .GatewayService .STATE_NOT_RECOVERED_BLOCK ;
7278
@@ -104,16 +110,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
104110 @ Nullable
105111 private Releasable leaderCheckScheduler ;
106112 private long maxTermSeen ;
113+ private final Reconfigurator reconfigurator ;
107114
108115 private Mode mode ;
109116 private Optional <DiscoveryNode > lastKnownLeader ;
110117 private Optional <Join > lastJoin ;
111118 private JoinHelper .JoinAccumulator joinAccumulator ;
112119 private Optional <CoordinatorPublication > currentPublication = Optional .empty ();
113120
114- public Coordinator (Settings settings , TransportService transportService , AllocationService allocationService ,
115- MasterService masterService , Supplier <CoordinationState .PersistedState > persistedStateSupplier ,
116- UnicastHostsProvider unicastHostsProvider , ClusterApplier clusterApplier , Random random ) {
121+ public Coordinator (Settings settings , ClusterSettings clusterSettings , TransportService transportService ,
122+ AllocationService allocationService , MasterService masterService ,
123+ Supplier <CoordinationState .PersistedState > persistedStateSupplier , UnicastHostsProvider unicastHostsProvider ,
124+ ClusterApplier clusterApplier , Random random ) {
117125 super (settings );
118126 this .transportService = transportService ;
119127 this .masterService = masterService ;
@@ -136,6 +144,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
136144 this .nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor (allocationService , logger );
137145 this .clusterApplier = clusterApplier ;
138146 masterService .setClusterStateSupplier (this ::getStateForMasterService );
147+ this .reconfigurator = new Reconfigurator (settings , clusterSettings );
139148 }
140149
141150 private Runnable getOnLeaderFailure () {
@@ -582,13 +591,59 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio
582591 MetaData .Builder metaDataBuilder = MetaData .builder ();
583592 // automatically generate a UID for the metadata if we need to
584593 metaDataBuilder .generateClusterUuidIfNeeded (); // TODO generate UUID in bootstrapping tool?
594+ metaDataBuilder .persistentSettings (Settings .builder ().put (CLUSTER_MASTER_NODES_FAILURE_TOLERANCE .getKey (),
595+ (votingConfiguration .getNodeIds ().size () - 1 ) / 2 ).build ()); // TODO set this in bootstrapping tool?
585596 builder .metaData (metaDataBuilder );
586597 coordinationState .get ().setInitialState (builder .build ());
587598 preVoteCollector .update (getPreVoteResponse (), null ); // pick up the change to last-accepted version
588599 startElectionScheduler ();
589600 }
590601 }
591602
603+ // Package-private for testing
604+ ClusterState reconfigureIfPossible (ClusterState clusterState ) {
605+ synchronized (mutex ) {
606+ if (mode == Mode .LEADER ) {
607+ final Set <DiscoveryNode > liveNodes = StreamSupport .stream (clusterState .nodes ().spliterator (), false )
608+ .filter (coordinationState .get ()::containsJoinVoteFor ).collect (Collectors .toSet ());
609+ final ClusterState .VotingConfiguration newConfig = reconfigurator .reconfigure (
610+ liveNodes , emptySet (), clusterState .getLastAcceptedConfiguration ());
611+ if (newConfig .equals (clusterState .getLastAcceptedConfiguration ()) == false ) {
612+ assert coordinationState .get ().joinVotesHaveQuorumFor (newConfig );
613+ return ClusterState .builder (clusterState ).lastAcceptedConfiguration (newConfig ).build ();
614+ }
615+ }
616+ }
617+
618+ return clusterState ;
619+ }
620+
621+ private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean ();
622+
623+ private void scheduleReconfigurationIfNeeded () {
624+ assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
625+ assert mode == Mode .LEADER : mode ;
626+ assert currentPublication .isPresent () == false : "Expected no publication in progress" ;
627+
628+ final ClusterState state = getLastAcceptedState ();
629+ if (reconfigureIfPossible (state ) != state && reconfigurationTaskScheduled .compareAndSet (false , true )) {
630+ logger .trace ("scheduling reconfiguration" );
631+ masterService .submitStateUpdateTask ("reconfigure" , new ClusterStateUpdateTask () {
632+ @ Override
633+ public ClusterState execute (ClusterState currentState ) {
634+ reconfigurationTaskScheduled .set (false );
635+ return reconfigureIfPossible (currentState );
636+ }
637+
638+ @ Override
639+ public void onFailure (String source , Exception e ) {
640+ reconfigurationTaskScheduled .set (false );
641+ logger .debug ("reconfiguration failed" , e );
642+ }
643+ });
644+ }
645+ }
646+
592647 // for tests
593648 boolean hasJoinVoteFrom (DiscoveryNode localNode ) {
594649 return coordinationState .get ().containsJoinVoteFor (localNode );
@@ -599,19 +654,29 @@ private void handleJoin(Join join) {
599654 ensureTermAtLeast (getLocalNode (), join .getTerm ()).ifPresent (this ::handleJoin );
600655
601656 if (coordinationState .get ().electionWon ()) {
602- // if we have already won the election then the actual join does not matter for election purposes,
603- // so swallow any exception
604- try {
605- coordinationState .get ().handleJoin (join );
606- } catch (CoordinationStateRejectedException e ) {
607- logger .debug (new ParameterizedMessage ("failed to add {} - ignoring" , join ), e );
657+ // if we have already won the election then the actual join does not matter for election purposes, so swallow any exception
658+ final boolean isNewJoin = handleJoinIgnoringExceptions (join );
659+ if (isNewJoin && mode == Mode .LEADER && publicationInProgress () == false ) {
660+ scheduleReconfigurationIfNeeded ();
608661 }
609662 } else {
610663 coordinationState .get ().handleJoin (join ); // this might fail and bubble up the exception
611664 }
612665 }
613666 }
614667
668+ /**
669+ * @return true iff the join was from a new node and was successfully added
670+ */
671+ private boolean handleJoinIgnoringExceptions (Join join ) {
672+ try {
673+ return coordinationState .get ().handleJoin (join );
674+ } catch (CoordinationStateRejectedException e ) {
675+ logger .debug (new ParameterizedMessage ("failed to add {} - ignoring" , join ), e );
676+ return false ;
677+ }
678+ }
679+
615680 public ClusterState getLastAcceptedState () {
616681 synchronized (mutex ) {
617682 return coordinationState .get ().getLastAcceptedState ();
@@ -904,6 +969,10 @@ public void onSuccess(String source) {
904969 logger .debug ("publication ended successfully: {}" , CoordinatorPublication .this );
905970 // trigger term bump if new term was found during publication
906971 updateMaxTermSeen (getCurrentTerm ());
972+
973+ if (mode == Mode .LEADER ) {
974+ scheduleReconfigurationIfNeeded ();
975+ }
907976 }
908977 ackListener .onNodeAck (getLocalNode (), null );
909978 publishListener .onResponse (null );
@@ -916,8 +985,7 @@ public void onFailure(Exception e) {
916985 assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
917986 removePublicationAndPossiblyBecomeCandidate ("Publication.onCompletion(false)" );
918987
919- FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException (
920- "publication failed" , e );
988+ final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException ("publication failed" , e );
921989 ackListener .onNodeAck (getLocalNode (), exception ); // other nodes have acked, but not the master.
922990 publishListener .onFailure (exception );
923991 }
0 commit comments