Skip to content

Commit

Permalink
Only bootstrap and elect node in current voting configuration (#37712)
Browse files Browse the repository at this point in the history
Adapts bootstrapping and leader election to only trigger on nodes that are actually part of the voting
configuration.
  • Loading branch information
ywelsch authored Jan 23, 2019
1 parent 4ec3a6d commit d5139e0
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static boolean discoveryIsConfigured(Settings settings) {

void onFoundPeersUpdated() {
final Set<DiscoveryNode> nodes = getDiscoveredNodes();
if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
if (bootstrappingPermitted.get() && transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
&& isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) {

final Tuple<Set<DiscoveryNode>,List<String>> requirementMatchingResult;
Expand All @@ -114,6 +114,13 @@ void onFoundPeersUpdated() {
logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements);

if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) {
logger.info("skipping cluster bootstrapping as local node does not match bootstrap requirements: {}",
bootstrapRequirements);
bootstrappingPermitted.set(false);
return;
}

if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ private void startElection() {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
if (mode == Mode.CANDIDATE) {
if (electionQuorumContainsLocalNode(getLastAcceptedState()) == false) {
logger.trace("skip election as local node is not part of election quorum: {}",
getLastAcceptedState().coordinationMetaData());
return;
}

final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
Expand All @@ -360,6 +366,13 @@ private void startElection() {
}
}

private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
assert localNodeId != null;
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
}

private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
Expand Down Expand Up @@ -709,10 +722,24 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
return false;
}

if (getLocalNode().isMasterNode() == false) {
logger.debug("skip setting initial configuration as local node is not a master-eligible node");
throw new CoordinationStateRejectedException(
"this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node");
}

if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
logger.debug("skip setting initial configuration as local node is not part of initial configuration");
throw new CoordinationStateRejectedException("local node is not part of initial configuration");
}

final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);

if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
logger.debug("skip setting initial configuration as not enough nodes discovered to form a quorum in the " +
"initial configuration [knownNodes={}, {}]", knownNodes, votingConfiguration);
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
}
Expand All @@ -729,6 +756,8 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
metaDataBuilder.coordinationMetaData(coordinationMetaData);

coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
assert electionQuorumContainsLocalNode(getLastAcceptedState()) :
"initial state does not have local node in its election quorum: " + getLastAcceptedState().coordinationMetaData();
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
return true;
Expand Down Expand Up @@ -1022,12 +1051,20 @@ private void startElectionScheduler() {
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();

if (electionQuorumContainsLocalNode(lastAcceptedState) == false) {
logger.trace("skip prevoting as local node is not part of election quorum: {}",
lastAcceptedState.coordinationMetaData());
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final List<DiscoveryNode> discoveredNodes
= getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());

prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ public void testDoesNotBootstrapsOnNonMasterNode() {
deterministicTaskQueue.runAllTasks();
}

public void testDoesNotBootstrapsIfLocalNodeNotInInitialMasterNodes() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () ->
Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}

public void testDoesNotBootstrapsIfNotConfigured() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.cluster.coordination;

import com.carrotsearch.randomizedtesting.RandomizedContext;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -93,10 +93,10 @@
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_DURATION_SETTING;
Expand All @@ -117,7 +117,6 @@
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -745,7 +744,7 @@ public void testSettingInitialConfigurationTriggersElection() {
assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size()));
}

final ClusterNode bootstrapNode = cluster.getAnyNode();
final ClusterNode bootstrapNode = cluster.getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration();
assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet());

Expand Down Expand Up @@ -775,20 +774,30 @@ public void testCannotSetInitialConfigurationTwice() {
public void testCannotSetInitialConfigurationWithoutQuorum() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node"));
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(
Sets.newHashSet(coordinator.getLocalNode().getId(), "unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
assertThat(exceptionMessage,
endsWith("], VotingConfiguration{unknown-node}]"));
assertThat(exceptionMessage, containsString("unknown-node"));
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));

// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
assertTrue(coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))));
cluster.stabilise();
}

public void testCannotSetInitialConfigurationWithoutLocalNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Sets.newHashSet("unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
equalTo("local node is not part of initial configuration"));
}

public void testDiffBasedPublishing() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
Expand Down Expand Up @@ -1331,7 +1340,7 @@ void bootstrapIfNecessary() {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
final ClusterNode bootstrapNode = getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
Expand Down Expand Up @@ -1402,8 +1411,10 @@ boolean nodeExists(DiscoveryNode node) {
return clusterNodes.stream().anyMatch(cn -> cn.getLocalNode().equals(node));
}

ClusterNode getAnyMasterEligibleNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList()));
ClusterNode getAnyBootstrappableNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode())
.filter(n -> initialConfiguration.getNodeIds().contains(n.getLocalNode().getId()))
.collect(Collectors.toList()));
}

ClusterNode getAnyNode() {
Expand Down Expand Up @@ -1737,8 +1748,14 @@ void applyInitialConfiguration() {
Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random()))
.limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2)
.forEach(nodeIdsWithPlaceholders::add);
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders)));
final Set<String> nodeIds = new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders));
// initial configuration should not have a place holder for local node
if (initialConfiguration.getNodeIds().contains(localNode.getId()) && nodeIds.contains(localNode.getId()) == false) {
nodeIds.remove(nodeIds.iterator().next());
nodeIds.add(localNode.getId());
}
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(nodeIds);
try {
coordinator.setInitialConfiguration(configurationWithPlaceholders);
logger.info("successfully set initial configuration to {}", configurationWithPlaceholders);
Expand Down

0 comments on commit d5139e0

Please sign in to comment.