From 8ee9f90d97ad33d24701f7803c7970d9228f12f7 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 14 Jan 2019 18:03:13 +0100 Subject: [PATCH 1/2] Only bootstrap and elect node in current configuration --- .../coordination/ClusterBootstrapService.java | 9 ++++- .../cluster/coordination/Coordinator.java | 39 ++++++++++++++++++- .../ClusterBootstrapServiceTests.java | 12 ++++++ .../coordination/CoordinatorTests.java | 37 +++++++++++++----- 4 files changed, 86 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index d21c54c03e4e5..cdbf6b6691077 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -97,7 +97,7 @@ public static boolean discoveryIsConfigured(Settings settings) { void onFoundPeersUpdated() { final Set nodes = getDiscoveredNodes(); - if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false + if (bootstrappingPermitted.get() && transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false && isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) { final Tuple,List> requirementMatchingResult; @@ -114,6 +114,13 @@ void onFoundPeersUpdated() { logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}", nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements); + if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) { + logger.info("skipping cluster bootstrapping as local node does not match bootstrap requirements: {}", + bootstrapRequirements); + bootstrappingPermitted.set(false); + return; + } + if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) { startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 084d5cf38f2db..4a018c1f78f91 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -348,6 +348,12 @@ private void startElection() { // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have // to check our mode again here. if (mode == Mode.CANDIDATE) { + if (electionQuorumContainsLocalNode(getLastAcceptedState()) == false) { + logger.trace("skip election as local node is not part of election quorum: {}", + getLastAcceptedState().coordinationMetaData()); + return; + } + final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1); logger.debug("starting election with {}", startJoinRequest); @@ -360,6 +366,13 @@ private void startElection() { } } + private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) { + final String localNodeId = lastAcceptedState.nodes().getLocalNodeId(); + assert localNodeId != null; + return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId) + || lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId); + } + private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; if (getCurrentTerm() < targetTerm) { @@ -709,10 +722,24 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura return false; } + if (getLocalNode().isMasterNode() == false) { + logger.debug("skip setting initial configuration as local node is not a master-eligible node"); + throw new CoordinationStateRejectedException( + "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"); + } + + if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) { + logger.debug("skip setting initial configuration as local node is not part of initial configuration"); + throw new CoordinationStateRejectedException("local node is not part of initial configuration"); + } + final List knownNodes = new ArrayList<>(); knownNodes.add(getLocalNode()); peerFinder.getFoundPeers().forEach(knownNodes::add); + if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { + logger.debug("skip setting initial configuration as not enough nodes discovered to form a quorum in the " + + "initial configuration [knownNodes={}, {}]", knownNodes, votingConfiguration); throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); } @@ -729,6 +756,8 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura metaDataBuilder.coordinationMetaData(coordinationMetaData); coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build()); + assert electionQuorumContainsLocalNode(getLastAcceptedState()) : + "initial state does not have local node in its election quorum: " + getLastAcceptedState().coordinationMetaData(); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version startElectionScheduler(); return true; @@ -1022,12 +1051,20 @@ private void startElectionScheduler() { public void run() { synchronized (mutex) { if (mode == Mode.CANDIDATE) { + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + + if (electionQuorumContainsLocalNode(lastAcceptedState) == false) { + logger.trace("skip prevoting as local node is not part of election quorum: {}", + lastAcceptedState.coordinationMetaData()); + return; + } + if (prevotingRound != null) { prevotingRound.close(); } - final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); final List discoveredNodes = getDiscoveredNodes().stream().filter(n -> isZen1Node(n) == false).collect(Collectors.toList()); + prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 46a43afa53897..c9ebdf278c71d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -328,6 +328,18 @@ public void testDoesNotBootstrapsOnNonMasterNode() { deterministicTaskQueue.runAllTasks(); } + public void testDoesNotBootstrapsIfLocalNodeNotInInitialMasterNodes() { + ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList( + INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getName(), otherNode2.getName()).build(), + transportService, () -> + Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> { + throw new AssertionError("should not be called"); + }); + transportService.start(); + clusterBootstrapService.onFoundPeersUpdated(); + deterministicTaskQueue.runAllTasks(); + } + public void testDoesNotBootstrapsIfNotConfigured() { ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService( Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index a9ca7d917b9d8..a7b67080fd79e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -53,6 +53,7 @@ import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; import org.elasticsearch.env.NodeEnvironment; @@ -745,7 +746,7 @@ public void testSettingInitialConfigurationTriggersElection() { assertThat(nodeId + " should have found all peers", foundPeers, hasSize(cluster.size())); } - final ClusterNode bootstrapNode = cluster.getAnyNode(); + final ClusterNode bootstrapNode = cluster.getAnyBootstrappableNode(); bootstrapNode.applyInitialConfiguration(); assertTrue(bootstrapNode.getId() + " has been bootstrapped", bootstrapNode.coordinator.isInitialConfigurationSet()); @@ -775,13 +776,13 @@ public void testCannotSetInitialConfigurationTwice() { public void testCannotSetInitialConfigurationWithoutQuorum() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Coordinator coordinator = cluster.getAnyNode().coordinator; - final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration( + Sets.newHashSet(coordinator.getLocalNode().getId(), "unknown-node")); final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); assertThat(exceptionMessage, startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); - assertThat(exceptionMessage, - endsWith("], VotingConfiguration{unknown-node}]")); + assertThat(exceptionMessage, containsString("unknown-node")); assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. @@ -789,6 +790,16 @@ public void testCannotSetInitialConfigurationWithoutQuorum() { cluster.stabilise(); } + public void testCannotSetInitialConfigurationWithoutLocalNode() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Sets.newHashSet("unknown-node")); + final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); + assertThat(exceptionMessage, + equalTo("local node is not part of initial configuration")); + } + public void testDiffBasedPublishing() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); @@ -1331,7 +1342,7 @@ void bootstrapIfNecessary() { assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); - final ClusterNode bootstrapNode = getAnyMasterEligibleNode(); + final ClusterNode bootstrapNode = getAnyBootstrappableNode(); bootstrapNode.applyInitialConfiguration(); } else { logger.info("setting initial configuration not required"); @@ -1402,8 +1413,10 @@ boolean nodeExists(DiscoveryNode node) { return clusterNodes.stream().anyMatch(cn -> cn.getLocalNode().equals(node)); } - ClusterNode getAnyMasterEligibleNode() { - return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList())); + ClusterNode getAnyBootstrappableNode() { + return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()) + .filter(n -> initialConfiguration.getNodeIds().contains(n.getLocalNode().getId())) + .collect(Collectors.toList())); } ClusterNode getAnyNode() { @@ -1737,8 +1750,14 @@ void applyInitialConfiguration() { Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random())) .limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2) .forEach(nodeIdsWithPlaceholders::add); - final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>( - randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders))); + final Set nodeIds = new HashSet<>( + randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders)); + // initial configuration should not have a place holder for local node + if (initialConfiguration.getNodeIds().contains(localNode.getId()) && nodeIds.contains(localNode.getId()) == false) { + nodeIds.remove(nodeIds.iterator().next()); + nodeIds.add(localNode.getId()); + } + final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(nodeIds); try { coordinator.setInitialConfiguration(configurationWithPlaceholders); logger.info("successfully set initial configuration to {}", configurationWithPlaceholders); From 3a6eac2ece8ef5e7e905bbc57e6343a4b00c1cb2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 22 Jan 2019 19:41:38 +0100 Subject: [PATCH 2/2] checkstyle --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index a7b67080fd79e..7db63ab120e91 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.coordination; import com.carrotsearch.randomizedtesting.RandomizedContext; - import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -94,10 +93,10 @@ import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue; import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value; -import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; +import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_DURATION_SETTING; @@ -118,7 +117,6 @@ import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo;