Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step down as master when configured out of voting configuration #37802

Merged
merged 13 commits into from
Jan 29, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ The node that should be added to the exclusions list is specified using
<<cluster-nodes,node filters>> in place of `node_name` here. If a call to the
voting configuration exclusions API fails, you can safely retry it. Only a
successful response guarantees that the node has actually been removed from the
voting configuration and will not be reinstated.
voting configuration and will not be reinstated. If it's the active master that
was removed from the voting configuration, then it will abdicate to another
master-eligible node that's still in the voting configuration, if such a node
is available.

Although the voting configuration exclusions API is most useful for down-scaling
a two-node to a one-node cluster, it is also possible to use it to remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery

private final PeerFinder peerFinder;
private final PreVoteCollector preVoteCollector;
private final Random random;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final UnicastConfiguredHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
Expand Down Expand Up @@ -153,6 +154,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider);
Expand Down Expand Up @@ -366,11 +368,33 @@ private void startElection() {
}
}

private void abdicateTo(DiscoveryNode newMaster) {
assert Thread.holdsLock(mutex);
assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode;
assert newMaster.isMasterNode() : "should only abdicate to master-eligible node but was " + newMaster;
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> {
if (isZen1Node(node) == false) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
// handling of start join messages on the local node will be dispatched to the generic thread-pool
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
becomeCandidate("after abdicating to " + newMaster);
}

private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
assert localNodeId != null;
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode();
assert localNode != null;
return electionQuorumContains(lastAcceptedState, localNode);
}

private static boolean electionQuorumContains(ClusterState lastAcceptedState, DiscoveryNode node) {
final String nodeId = node.getId();
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId);
}

private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
Expand Down Expand Up @@ -780,7 +804,7 @@ ClusterState improveConfiguration(ClusterState clusterState) {
.filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet());
final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()),
clusterState.getLastAcceptedConfiguration());
getLocalNode(), clusterState.getLastAcceptedConfiguration());
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
Expand Down Expand Up @@ -1192,7 +1216,18 @@ public void onSuccess(String source) {
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
scheduleReconfigurationIfNeeded();
final ClusterState state = getLastAcceptedState(); // committed state
if (electionQuorumContainsLocalNode(state) == false) {
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(node -> electionQuorumContains(state, node))
.collect(Collectors.toList());
if (masterCandidates.isEmpty() == false) {
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
}
} else {
scheduleReconfigurationIfNeeded();
}
}
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public abstract class Publication {

Expand Down Expand Up @@ -92,6 +93,13 @@ public void onFaultyNode(DiscoveryNode faultyNode) {
onPossibleCompletion();
}

public List<DiscoveryNode> completedNodes() {
return publicationTargets.stream()
.filter(PublicationTarget::isSuccessfullyCompleted)
.map(PublicationTarget::getDiscoveryNode)
.collect(Collectors.toList());
}

public boolean isCommitted() {
return applyCommitRequest.isPresent();
}
Expand Down Expand Up @@ -268,6 +276,10 @@ void onFaultyNode(DiscoveryNode faultyNode) {
}
}

DiscoveryNode getDiscoveryNode() {
return discoveryNode;
}

private void ackOnce(Exception e) {
if (ackIsPending) {
ackIsPending = false;
Expand All @@ -280,6 +292,10 @@ boolean isActive() {
&& state != PublicationTargetState.APPLIED_COMMIT;
}

boolean isSuccessfullyCompleted() {
return state == PublicationTargetState.APPLIED_COMMIT;
}

boolean isWaitingForQuorum() {
return state == PublicationTargetState.WAITING_FOR_QUORUM;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.util.set.Sets;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,18 +91,23 @@ public String toString() {
* @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are
* retired and not in the current configuration will never appear in the resulting configuration; this is useful
* for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability.
* @param currentMaster The current master. Unless retired, we prefer to keep the current master in the config.
* @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is.
* @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum.
*/
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, VotingConfiguration currentConfig) {
public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String> retiredNodeIds, DiscoveryNode currentMaster,
VotingConfiguration currentConfig) {
assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes;
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds);
assert liveNodes.contains(currentMaster) : "liveNodes = " + liveNodes + " master = " + currentMaster;
logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}, currentMaster={}",
this, currentConfig, liveNodes, retiredNodeIds, currentMaster);

/*
* There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config.
* Firstly we divide the nodes into disjoint sets based on these properties:
*
* - nonRetiredInConfigNotLiveIds
* - nonRetiredMaster
* - nonRetiredNotMasterInConfigNotLiveIds
* - nonRetiredInConfigLiveIds
* - nonRetiredLiveNotInConfigIds
*
Expand All @@ -125,6 +131,17 @@ public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String>
final Set<String> nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds);
nonRetiredInConfigLiveIds.removeAll(retiredNodeIds);

final Set<String> nonRetiredInConfigLiveMasterIds;
final Set<String> nonRetiredInConfigLiveNotMasterIds;
if (nonRetiredInConfigLiveIds.contains(currentMaster.getId())) {
nonRetiredInConfigLiveNotMasterIds = new TreeSet<>(nonRetiredInConfigLiveIds);
nonRetiredInConfigLiveNotMasterIds.remove(currentMaster.getId());
nonRetiredInConfigLiveMasterIds = Collections.singleton(currentMaster.getId());
} else {
nonRetiredInConfigLiveNotMasterIds = nonRetiredInConfigLiveIds;
nonRetiredInConfigLiveMasterIds = Collections.emptySet();
}

final Set<String> nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds());
nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds);

Expand All @@ -151,9 +168,9 @@ public VotingConfiguration reconfigure(Set<DiscoveryNode> liveNodes, Set<String>
* The new configuration is formed by taking this many nodes in the following preference order:
*/
final VotingConfiguration newConfig = new VotingConfiguration(
// live nodes first, preferring the current config, and if we need more then use non-live nodes
Stream.of(nonRetiredInConfigLiveIds, nonRetiredLiveNotInConfigIds, nonRetiredInConfigNotLiveIds)
.flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));
// live master first, then other live nodes, preferring the current config, and if we need more then use non-live nodes
Stream.of(nonRetiredInConfigLiveMasterIds, nonRetiredInConfigLiveNotMasterIds, nonRetiredLiveNotInConfigIds,
nonRetiredInConfigNotLiveIds).flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet()));

if (newConfig.hasQuorum(liveNodeIds)) {
return newConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName));
}

public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionException, InterruptedException {
public void testElectOnlyBetweenMasterNodes() throws Exception {
logger.info("--> start data node / non master node");
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true)
.put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s"));
Expand Down Expand Up @@ -138,7 +138,14 @@ public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionExcep
logger.info("--> closing master node (1)");
client().execute(AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(new String[]{masterNodeName})).get();
internalCluster().stopCurrentMasterNode();
// removing the master from the voting configuration immediately triggers the master to step down
assertBusy(() -> {
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodeName));
assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState()
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName));
assertThat(internalCluster().masterClient().admin().cluster().prepareState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import org.hamcrest.core.IsCollectionContaining;
import org.junit.After;
import org.junit.Before;

Expand Down Expand Up @@ -1331,6 +1332,8 @@ void stabilise(long stabilisationDurationMillis) {
final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration();
assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration,
lastCommittedConfiguration.hasQuorum(connectedNodeIds));
assertThat("leader " + leader.getLocalNode() + " should be part of voting configuration " + lastCommittedConfiguration,
lastCommittedConfiguration.getNodeIds(), IsCollectionContaining.hasItem(leader.getLocalNode().getId()));

assertThat("no reconfiguration is in progress",
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class PublicationTests extends ESTestCase {
Expand Down Expand Up @@ -178,6 +179,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet());

assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes));
assertThat(publication.completedNodes(), empty());
assertTrue(publication.pendingCommits.isEmpty());
AtomicBoolean processedNode1PublishResponse = new AtomicBoolean();
boolean delayProcessingNode2PublishResponse = randomBoolean();
Expand Down Expand Up @@ -232,10 +234,12 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {

assertFalse(publication.completed);
assertFalse(publication.committed);
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n3));
publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE);
}

assertTrue(publication.completed);
assertThat(publication.completedNodes(), containsInAnyOrder(n1, n2, n3));
assertTrue(publication.committed);

assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3));
Expand Down
Loading