Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only bootstrap and elect node in current voting configuration #37712

Merged
merged 3 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static boolean discoveryIsConfigured(Settings settings) {

void onFoundPeersUpdated() {
final Set<DiscoveryNode> nodes = getDiscoveredNodes();
if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
if (bootstrappingPermitted.get() && transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
&& isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) {

final Tuple<Set<DiscoveryNode>,List<String>> requirementMatchingResult;
Expand All @@ -114,6 +114,13 @@ void onFoundPeersUpdated() {
logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements);

if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check this in the constructor and just assert it here. Also we use the word cancelling rather than skipping indicating that we won't retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local discoverynode object is not available in the constructor yet (i.e. the transport has not bound at this point yet)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blasted laziness. Ok.

logger.info("skipping cluster bootstrapping as local node does not match bootstrap requirements: {}",
bootstrapRequirements);
bootstrappingPermitted.set(false);
return;
}

if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ private void startElection() {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
if (mode == Mode.CANDIDATE) {
if (electionQuorumContainsLocalNode(getLastAcceptedState()) == false) {
logger.trace("skip election as local node is not part of election quorum: {}",
getLastAcceptedState().coordinationMetaData());
return;
}

final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
Expand All @@ -360,6 +366,13 @@ private void startElection() {
}
}

private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) {
final String localNodeId = lastAcceptedState.nodes().getLocalNodeId();
assert localNodeId != null;
return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId);
}

private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
Expand Down Expand Up @@ -709,10 +722,24 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
return false;
}

if (getLocalNode().isMasterNode() == false) {
logger.debug("skip setting initial configuration as local node is not a master-eligible node");
throw new CoordinationStateRejectedException(
"this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node");
}

if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
logger.debug("skip setting initial configuration as local node is not part of initial configuration");
throw new CoordinationStateRejectedException("local node is not part of initial configuration");
}

final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);

if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) {
logger.debug("skip setting initial configuration as not enough nodes discovered to form a quorum in the " +
"initial configuration [knownNodes={}, {}]", knownNodes, votingConfiguration);
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
}
Expand All @@ -729,6 +756,8 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
metaDataBuilder.coordinationMetaData(coordinationMetaData);

coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
assert electionQuorumContainsLocalNode(getLastAcceptedState()) :
"initial state does not have local node in its election quorum: " + getLastAcceptedState().coordinationMetaData();
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
return true;
Expand Down Expand Up @@ -1022,12 +1051,20 @@ private void startElectionScheduler() {
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();

if (electionQuorumContainsLocalNode(lastAcceptedState) == false) {
logger.trace("skip prevoting as local node is not part of election quorum: {}",
lastAcceptedState.coordinationMetaData());
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final List<DiscoveryNode> discoveredNodes
= getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList());

prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ public void testDoesNotBootstrapsOnNonMasterNode() {
deterministicTaskQueue.runAllTasks();
}

public void testDoesNotBootstrapsIfLocalNodeNotInInitialMasterNodes() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () ->
Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}

public void testDoesNotBootstrapsIfNotConfigured() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.cluster.coordination;

import com.carrotsearch.randomizedtesting.RandomizedContext;

import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -93,10 +93,10 @@
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_DURATION_SETTING;
Expand All @@ -117,7 +117,6 @@
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -745,7 +744,7 @@ public void testSettingInitialConfigurationTriggersElection() {
assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size()));
}

final ClusterNode bootstrapNode = cluster.getAnyNode();
final ClusterNode bootstrapNode = cluster.getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration();
assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet());

Expand Down Expand Up @@ -775,20 +774,30 @@ public void testCannotSetInitialConfigurationTwice() {
public void testCannotSetInitialConfigurationWithoutQuorum() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node"));
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(
Sets.newHashSet(coordinator.getLocalNode().getId(), "unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=["));
assertThat(exceptionMessage,
endsWith("], VotingConfiguration{unknown-node}]"));
assertThat(exceptionMessage, containsString("unknown-node"));
assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString()));

// This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum.
assertTrue(coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))));
cluster.stabilise();
}

public void testCannotSetInitialConfigurationWithoutLocalNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
final Coordinator coordinator = cluster.getAnyNode().coordinator;
final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Sets.newHashSet("unknown-node"));
final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class,
() -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage();
assertThat(exceptionMessage,
equalTo("local node is not part of initial configuration"));
}

public void testDiffBasedPublishing() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
Expand Down Expand Up @@ -1331,7 +1340,7 @@ void bootstrapIfNecessary() {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
final ClusterNode bootstrapNode = getAnyBootstrappableNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
Expand Down Expand Up @@ -1402,8 +1411,10 @@ boolean nodeExists(DiscoveryNode node) {
return clusterNodes.stream().anyMatch(cn -> cn.getLocalNode().equals(node));
}

ClusterNode getAnyMasterEligibleNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList()));
ClusterNode getAnyBootstrappableNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode())
.filter(n -> initialConfiguration.getNodeIds().contains(n.getLocalNode().getId()))
.collect(Collectors.toList()));
}

ClusterNode getAnyNode() {
Expand Down Expand Up @@ -1737,8 +1748,14 @@ void applyInitialConfiguration() {
Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random()))
.limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2)
.forEach(nodeIdsWithPlaceholders::add);
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders)));
final Set<String> nodeIds = new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders));
// initial configuration should not have a place holder for local node
if (initialConfiguration.getNodeIds().contains(localNode.getId()) && nodeIds.contains(localNode.getId()) == false) {
nodeIds.remove(nodeIds.iterator().next());
nodeIds.add(localNode.getId());
}
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(nodeIds);
try {
coordinator.setInitialConfiguration(configurationWithPlaceholders);
logger.info("successfully set initial configuration to {}", configurationWithPlaceholders);
Expand Down