From aabbd58e5d941a4ddd8f9b2fd45f20973711790a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 23 Jan 2019 23:53:38 +0100 Subject: [PATCH 1/7] Only master with voting power --- .../cluster/coordination/Coordinator.java | 45 ++++++++++++++++--- .../cluster/coordination/Publication.java | 16 +++++++ .../coordination/CoordinatorTests.java | 3 ++ .../coordination/PublicationTests.java | 3 ++ .../coordination/VotingConfigurationIT.java | 41 +++++++++++++++++ 5 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/coordination/VotingConfigurationIT.java diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 4a018c1f78f91..b2cc0d75703f3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -112,6 +112,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final PeerFinder peerFinder; private final PreVoteCollector preVoteCollector; + private final Random random; private final ElectionSchedulerFactory electionSchedulerFactory; private final UnicastConfiguredHostsResolver configuredHostsResolver; private final TimeValue publishTimeout; @@ -153,6 +154,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); + this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen); configuredHostsResolver = new UnicastConfiguredHostsResolver(nodeName, settings, transportService, unicastHostsProvider); @@ -366,11 +368,33 @@ private void startElection() { } } + private void abdicateTo(DiscoveryNode newMaster) { + assert Thread.holdsLock(mutex); + assert mode == Mode.LEADER : "expected to be leader on abdication but was " + mode; + assert newMaster.isMasterNode() : "should only abdicate to master-eligible node but was " + newMaster; + final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1); + logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm()); + getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> { + if (isZen1Node(node) == false) { + joinHelper.sendStartJoinRequest(startJoinRequest, node); + } + }); + // handling of start join messages on the local node will be dispatched to the generic thread-pool + assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode; + // explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event + becomeCandidate("after abdicating to " + newMaster); + } + private static boolean electionQuorumContainsLocalNode(ClusterState lastAcceptedState) { - final String localNodeId = lastAcceptedState.nodes().getLocalNodeId(); - assert localNodeId != null; - return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(localNodeId) - || lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(localNodeId); + final DiscoveryNode localNode = lastAcceptedState.nodes().getLocalNode(); + assert localNode != null; + return electionQuorumContains(lastAcceptedState, localNode); + } + + private static boolean electionQuorumContains(ClusterState lastAcceptedState, DiscoveryNode node) { + final String nodeId = node.getId(); + return lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId) + || lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId); } private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { @@ -1184,7 +1208,18 @@ public void onSuccess(String source) { updateMaxTermSeen(getCurrentTerm()); if (mode == Mode.LEADER) { - scheduleReconfigurationIfNeeded(); + final ClusterState state = getLastAcceptedState(); // committed state + if (electionQuorumContainsLocalNode(state) == false) { + final List masterCandidates = completedNodes().stream() + .filter(DiscoveryNode::isMasterNode) + .filter(node -> electionQuorumContains(state, node)) + .collect(Collectors.toList()); + if (masterCandidates.isEmpty() == false) { + abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size()))); + } + } else { + scheduleReconfigurationIfNeeded(); + } } lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 4aea820d6d9e0..da7c1d02a1e0b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.LongSupplier; +import java.util.stream.Collectors; public abstract class Publication { @@ -92,6 +93,13 @@ public void onFaultyNode(DiscoveryNode faultyNode) { onPossibleCompletion(); } + public List completedNodes() { + return publicationTargets.stream() + .filter(PublicationTarget::isSuccessfullyCompleted) + .map(PublicationTarget::getDiscoveryNode) + .collect(Collectors.toList()); + } + public boolean isCommitted() { return applyCommitRequest.isPresent(); } @@ -268,6 +276,10 @@ void onFaultyNode(DiscoveryNode faultyNode) { } } + DiscoveryNode getDiscoveryNode() { + return discoveryNode; + } + private void ackOnce(Exception e) { if (ackIsPending) { ackIsPending = false; @@ -280,6 +292,10 @@ boolean isActive() { && state != PublicationTargetState.APPLIED_COMMIT; } + boolean isSuccessfullyCompleted() { + return state == PublicationTargetState.APPLIED_COMMIT; + } + boolean isWaitingForQuorum() { return state == PublicationTargetState.WAITING_FOR_QUORUM; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 7db63ab120e91..5d3804d00dc28 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; +import org.hamcrest.core.IsCollectionContaining; import org.junit.After; import org.junit.Before; @@ -1328,6 +1329,8 @@ void stabilise(long stabilisationDurationMillis) { final VotingConfiguration lastCommittedConfiguration = lastAcceptedState.getLastCommittedConfiguration(); assertTrue(connectedNodeIds + " should be a quorum of " + lastCommittedConfiguration, lastCommittedConfiguration.hasQuorum(connectedNodeIds)); + assertThat("leader " + leader.getLocalNode() + " should be part of voting configuration " + lastCommittedConfiguration, + lastCommittedConfiguration.getNodeIds(), IsCollectionContaining.hasItem(leader.getLocalNode().getId())); assertThat("no reconfiguration is in progress", lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration())); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 658250bc7a4da..8086682d06347 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -178,6 +178,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); + assertThat(publication.completedNodes().size(), equalTo(0)); assertTrue(publication.pendingCommits.isEmpty()); AtomicBoolean processedNode1PublishResponse = new AtomicBoolean(); boolean delayProcessingNode2PublishResponse = randomBoolean(); @@ -232,10 +233,12 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertFalse(publication.completed); assertFalse(publication.committed); + assertThat(publication.completedNodes(), containsInAnyOrder(n1, n3)); publication.pendingCommits.get(n2).onResponse(TransportResponse.Empty.INSTANCE); } assertTrue(publication.completed); + assertThat(publication.completedNodes(), containsInAnyOrder(n1, n2, n3)); assertTrue(publication.committed); assertThat(ackListener.await(0L, TimeUnit.SECONDS), containsInAnyOrder(n1, n2, n3)); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/VotingConfigurationIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/VotingConfigurationIT.java new file mode 100644 index 0000000000000..8c6775cb6c91e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/VotingConfigurationIT.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; +import org.elasticsearch.common.Priority; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.concurrent.ExecutionException; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class VotingConfigurationIT extends ESIntegTestCase { + + public void testAbdicateAfterVotingConfigExclusionAdded() throws ExecutionException, InterruptedException { + internalCluster().startNodes(2); + final String originalMaster = internalCluster().getMasterName(); + + logger.info("--> excluding master node {}", originalMaster); + client().execute(AddVotingConfigExclusionsAction.INSTANCE, + new AddVotingConfigExclusionsRequest(new String[]{originalMaster})).get(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + assertNotEquals(originalMaster, internalCluster().getMasterName()); + } +} From ca8f04719abc0a48dbe8652329b9b7a286c24213 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 24 Jan 2019 11:46:40 +0100 Subject: [PATCH 2/7] empty --- .../elasticsearch/cluster/coordination/PublicationTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index 8086682d06347..d332888c185ac 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -56,6 +56,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class PublicationTests extends ESTestCase { @@ -178,7 +179,7 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { discoveryNodes, singleNodeConfig, singleNodeConfig, 42L), ackListener, Collections.emptySet()); assertThat(publication.pendingPublications.keySet(), equalTo(discoNodes)); - assertThat(publication.completedNodes().size(), equalTo(0)); + assertThat(publication.completedNodes(), empty()); assertTrue(publication.pendingCommits.isEmpty()); AtomicBoolean processedNode1PublishResponse = new AtomicBoolean(); boolean delayProcessingNode2PublishResponse = randomBoolean(); From 8170a41c908a0e61250b23553f8c788e8b55eb1b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 24 Jan 2019 11:53:46 +0100 Subject: [PATCH 3/7] add docs --- .../modules/discovery/adding-removing-nodes.asciidoc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/reference/modules/discovery/adding-removing-nodes.asciidoc b/docs/reference/modules/discovery/adding-removing-nodes.asciidoc index 3b416ea51d223..ccc0e99125371 100644 --- a/docs/reference/modules/discovery/adding-removing-nodes.asciidoc +++ b/docs/reference/modules/discovery/adding-removing-nodes.asciidoc @@ -72,7 +72,10 @@ The node that should be added to the exclusions list is specified using <> in place of `node_name` here. If a call to the voting configuration exclusions API fails, you can safely retry it. Only a successful response guarantees that the node has actually been removed from the -voting configuration and will not be reinstated. +voting configuration and will not be reinstated. If it's the active master that +was removed from the voting configuration, then it will abdicate to another +master-eligible node that's still in the voting configuration, if such a node +is available. Although the voting configuration exclusions API is most useful for down-scaling a two-node to a one-node cluster, it is also possible to use it to remove From cadf2a75f8da068da8a1d9d3de99f803ad8a5cf5 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 24 Jan 2019 17:34:11 +0100 Subject: [PATCH 4/7] treesets everywhere --- .../cluster/coordination/Coordinator.java | 2 +- .../cluster/coordination/Reconfigurator.java | 19 ++++++++-- .../coordination/ReconfiguratorTests.java | 38 ++++++++++++++----- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index b2cc0d75703f3..a057418b5ce41 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -796,7 +796,7 @@ ClusterState improveConfiguration(ClusterState clusterState) { .filter(this::hasJoinVoteFrom).filter(discoveryNode -> isZen1Node(discoveryNode) == false).collect(Collectors.toSet()); final VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes, clusterState.getVotingConfigExclusions().stream().map(VotingConfigExclusion::getNodeId).collect(Collectors.toSet()), - clusterState.getLastAcceptedConfiguration()); + getLocalNode(), clusterState.getLastAcceptedConfiguration()); if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData()) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java index 5c7b9562d8d8a..df73adac1d9cb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.set.Sets; import java.util.Collection; +import java.util.Comparator; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -90,12 +91,16 @@ public String toString() { * @param retiredNodeIds Nodes that are leaving the cluster and which should not appear in the configuration if possible. Nodes that are * retired and not in the current configuration will never appear in the resulting configuration; this is useful * for shifting the vote in a 2-node cluster so one of the nodes can be restarted without harming availability. + * @param currentMaster The current master. Unless retired, we prefer to keep the current master in the config. * @param currentConfig The current configuration. As far as possible, we prefer to keep the current config as-is. * @return An optimal configuration, or leave the current configuration unchanged if the optimal configuration has no live quorum. */ - public VotingConfiguration reconfigure(Set liveNodes, Set retiredNodeIds, VotingConfiguration currentConfig) { + public VotingConfiguration reconfigure(Set liveNodes, Set retiredNodeIds, DiscoveryNode currentMaster, + VotingConfiguration currentConfig) { assert liveNodes.stream().noneMatch(Coordinator::isZen1Node) : liveNodes; - logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}", this, currentConfig, liveNodes, retiredNodeIds); + assert liveNodes.contains(currentMaster) : "liveNodes = " + liveNodes + " master = " + currentMaster; + logger.trace("{} reconfiguring {} based on liveNodes={}, retiredNodeIds={}, currentMaster={}", + this, currentConfig, liveNodes, retiredNodeIds, currentMaster); /* * There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config. @@ -122,7 +127,7 @@ public VotingConfiguration reconfigure(Set liveNodes, Set final Set nonRetiredInConfigNotLiveIds = new TreeSet<>(inConfigNotLiveIds); nonRetiredInConfigNotLiveIds.removeAll(retiredNodeIds); - final Set nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds); + final Set nonRetiredInConfigLiveIds = masterFirstTreeSet(liveInConfigIds, currentMaster); nonRetiredInConfigLiveIds.removeAll(retiredNodeIds); final Set nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds()); @@ -162,4 +167,12 @@ public VotingConfiguration reconfigure(Set liveNodes, Set return currentConfig; } } + + private TreeSet masterFirstTreeSet(Collection items, DiscoveryNode masterNode) { + final String masterNodeId = masterNode.getId(); + final TreeSet set = new TreeSet<>(Comparator.comparingInt(s -> s.equals(masterNodeId) ? 0 : 1) + .thenComparing(Comparator.naturalOrder())); + set.addAll(items); + return set; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java index 7e7c7adbe1af9..bbd9514222c77 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ReconfiguratorTests.java @@ -31,6 +31,7 @@ import org.junit.Before; import java.util.Arrays; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -52,6 +53,7 @@ public void testReconfigurationExamples() { check(nodes("a"), conf("a"), true, conf("a")); check(nodes("a", "b"), conf("a"), true, conf("a")); + check(nodes("a", "b"), conf("b"), true, conf("b")); check(nodes("a", "b"), conf("a", "c"), true, conf("a")); check(nodes("a", "b"), conf("a", "b"), true, conf("a")); check(nodes("a", "b"), conf("a", "b", "e"), true, conf("a", "b", "e")); @@ -64,6 +66,7 @@ public void testReconfigurationExamples() { check(nodes("a", "b", "c", "d"), conf("a", "b", "e"), true, conf("a", "b", "c")); check(nodes("a", "b", "c", "d", "e"), conf("a", "f", "g"), true, conf("a", "b", "c", "d", "e")); check(nodes("a", "b", "c", "d"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c")); + check(nodes("e", "a", "b", "c"), retired(), "e", conf("a", "b", "c", "d", "e"), true, conf("a", "b", "e")); check(nodes("a", "b", "c"), conf("a", "b", "c", "d", "e"), true, conf("a", "b", "c")); check(nodes("a"), conf("a"), false, conf("a")); @@ -124,7 +127,8 @@ public void testAutoShrinking() { final int quorumSize = Math.max(liveNodes.length / 2 + 1, initialVotingNodes.length < 3 ? 1 : 2); - final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig); + final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), + randomFrom(liveNodesSet), initialConfig); final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig; @@ -152,7 +156,8 @@ public void testManualShrinking() { final int quorumSize = Math.max(liveNodes.length, initialVotingNodes.length) / 2 + 1; - final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), initialConfig); + final VotingConfiguration finalConfig = reconfigurator.reconfigure(liveNodesSet, emptySet(), randomFrom(liveNodesSet), + initialConfig); final String description = "reconfigure " + liveNodesSet + " from " + initialConfig + " yielded " + finalConfig; @@ -187,13 +192,20 @@ private void check(Set liveNodes, VotingConfiguration config, boo private void check(Set liveNodes, Set retired, VotingConfiguration config, boolean autoShrinkVotingConfiguration, VotingConfiguration expectedConfig) { + final DiscoveryNode master = liveNodes.stream().sorted(Comparator.comparing(DiscoveryNode::getId)).findFirst().get(); + check(liveNodes, retired, master.getId(), config, autoShrinkVotingConfiguration, expectedConfig); + } + + private void check(Set liveNodes, Set retired, String masterId, VotingConfiguration config, + boolean autoShrinkVotingConfiguration, VotingConfiguration expectedConfig) { final Reconfigurator reconfigurator = makeReconfigurator(Settings.builder() .put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), autoShrinkVotingConfiguration) .build()); - final VotingConfiguration adaptedConfig = reconfigurator.reconfigure(liveNodes, retired, config); - assertEquals(new ParameterizedMessage("[liveNodes={}, retired={}, config={}, autoShrinkVotingConfiguration={}]", - liveNodes, retired, config, autoShrinkVotingConfiguration).getFormattedMessage(), + final DiscoveryNode master = liveNodes.stream().filter(n -> n.getId().equals(masterId)).findFirst().get(); + final VotingConfiguration adaptedConfig = reconfigurator.reconfigure(liveNodes, retired, master, config); + assertEquals(new ParameterizedMessage("[liveNodes={}, retired={}, master={}, config={}, autoShrinkVotingConfiguration={}]", + liveNodes, retired, master, config, autoShrinkVotingConfiguration).getFormattedMessage(), expectedConfig, adaptedConfig); } @@ -206,18 +218,24 @@ public void testDynamicSetting() { final Reconfigurator reconfigurator = new Reconfigurator(Settings.EMPTY, clusterSettings); final VotingConfiguration initialConfig = conf("a", "b", "c", "d", "e"); + Set twoNodes = nodes("a", "b"); + Set threeNodes = nodes("a", "b", "c"); + // default is "true" - assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), equalTo(conf("a", "b", "c"))); + assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig), equalTo(conf("a", "b", "c"))); // update to "false" clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "false").build()); - assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), sameInstance(initialConfig)); // no quorum - assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired(), initialConfig), equalTo(conf("a", "b", "c", "d", "e"))); - assertThat(reconfigurator.reconfigure(nodes("a", "b", "c"), retired("d"), initialConfig), equalTo(conf("a", "b", "c", "e"))); + assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig), + sameInstance(initialConfig)); // no quorum + assertThat(reconfigurator.reconfigure(threeNodes, retired(), randomFrom(threeNodes), initialConfig), + equalTo(conf("a", "b", "c", "d", "e"))); + assertThat(reconfigurator.reconfigure(threeNodes, retired("d"), randomFrom(threeNodes), initialConfig), + equalTo(conf("a", "b", "c", "e"))); // explicitly set to "true" clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "true").build()); - assertThat(reconfigurator.reconfigure(nodes("a", "b"), retired(), initialConfig), equalTo(conf("a", "b", "c"))); + assertThat(reconfigurator.reconfigure(twoNodes, retired(), randomFrom(twoNodes), initialConfig), equalTo(conf("a", "b", "c"))); expectThrows(IllegalArgumentException.class, () -> clusterSettings.applySettings(Settings.builder().put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), "blah").build())); From 7300c4bb62546ed08e540c407815c57c75bc2fef Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 25 Jan 2019 08:34:19 +0100 Subject: [PATCH 5/7] fix test --- .../elasticsearch/cluster/SpecificMasterNodesIT.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index aaef1e58fb50e..d79fd0a9832c4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; @@ -106,7 +107,7 @@ public void testSimpleOnlyMasterNodeElection() throws IOException { .execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName)); } - public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionException, InterruptedException { + public void testElectOnlyBetweenMasterNodes() throws Exception { logger.info("--> start data node / non master node"); internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true) .put(Node.NODE_MASTER_SETTING.getKey(), false).put("discovery.initial_state_timeout", "1s")); @@ -138,7 +139,14 @@ public void testElectOnlyBetweenMasterNodes() throws IOException, ExecutionExcep logger.info("--> closing master node (1)"); client().execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(new String[]{masterNodeName})).get(); - internalCluster().stopCurrentMasterNode(); + // removing the master from the voting configuration immediately triggers the master to step down + assertBusy(() -> { + assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState() + .execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName)); + assertThat(internalCluster().masterClient().admin().cluster().prepareState() + .execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName)); + }); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodeName)); assertThat(internalCluster().nonMasterClient().admin().cluster().prepareState() .execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligableNodeName)); assertThat(internalCluster().masterClient().admin().cluster().prepareState() From 4e5886c4f9f098f8b539901f678df41d55ca4f11 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 28 Jan 2019 12:47:19 +0100 Subject: [PATCH 6/7] moar sets --- .../cluster/coordination/Reconfigurator.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java index df73adac1d9cb..ebca37bdac0b1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Reconfigurator.java @@ -30,7 +30,7 @@ import org.elasticsearch.common.util.set.Sets; import java.util.Collection; -import java.util.Comparator; +import java.util.Collections; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -106,7 +106,8 @@ public VotingConfiguration reconfigure(Set liveNodes, Set * There are three true/false properties of each node in play: live/non-live, retired/non-retired and in-config/not-in-config. * Firstly we divide the nodes into disjoint sets based on these properties: * - * - nonRetiredInConfigNotLiveIds + * - nonRetiredMaster + * - nonRetiredNotMasterInConfigNotLiveIds * - nonRetiredInConfigLiveIds * - nonRetiredLiveNotInConfigIds * @@ -127,9 +128,20 @@ public VotingConfiguration reconfigure(Set liveNodes, Set final Set nonRetiredInConfigNotLiveIds = new TreeSet<>(inConfigNotLiveIds); nonRetiredInConfigNotLiveIds.removeAll(retiredNodeIds); - final Set nonRetiredInConfigLiveIds = masterFirstTreeSet(liveInConfigIds, currentMaster); + final Set nonRetiredInConfigLiveIds = new TreeSet<>(liveInConfigIds); nonRetiredInConfigLiveIds.removeAll(retiredNodeIds); + final Set nonRetiredInConfigLiveMasterIds; + final Set nonRetiredInConfigLiveNotMasterIds; + if (nonRetiredInConfigLiveIds.contains(currentMaster.getId())) { + nonRetiredInConfigLiveNotMasterIds = new TreeSet<>(nonRetiredInConfigLiveIds); + nonRetiredInConfigLiveNotMasterIds.remove(currentMaster.getId()); + nonRetiredInConfigLiveMasterIds = Collections.singleton(currentMaster.getId()); + } else { + nonRetiredInConfigLiveNotMasterIds = nonRetiredInConfigLiveIds; + nonRetiredInConfigLiveMasterIds = Collections.emptySet(); + } + final Set nonRetiredLiveNotInConfigIds = Sets.sortedDifference(liveNodeIds, currentConfig.getNodeIds()); nonRetiredLiveNotInConfigIds.removeAll(retiredNodeIds); @@ -156,9 +168,9 @@ public VotingConfiguration reconfigure(Set liveNodes, Set * The new configuration is formed by taking this many nodes in the following preference order: */ final VotingConfiguration newConfig = new VotingConfiguration( - // live nodes first, preferring the current config, and if we need more then use non-live nodes - Stream.of(nonRetiredInConfigLiveIds, nonRetiredLiveNotInConfigIds, nonRetiredInConfigNotLiveIds) - .flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet())); + // live master first, then other live nodes, preferring the current config, and if we need more then use non-live nodes + Stream.of(nonRetiredInConfigLiveMasterIds, nonRetiredInConfigLiveNotMasterIds, nonRetiredLiveNotInConfigIds, + nonRetiredInConfigNotLiveIds).flatMap(Collection::stream).limit(targetSize).collect(Collectors.toSet())); if (newConfig.hasQuorum(liveNodeIds)) { return newConfig; @@ -167,12 +179,4 @@ public VotingConfiguration reconfigure(Set liveNodes, Set return currentConfig; } } - - private TreeSet masterFirstTreeSet(Collection items, DiscoveryNode masterNode) { - final String masterNodeId = masterNode.getId(); - final TreeSet set = new TreeSet<>(Comparator.comparingInt(s -> s.equals(masterNodeId) ? 0 : 1) - .thenComparing(Comparator.naturalOrder())); - set.addAll(items); - return set; - } } From 45093bf8371a2ee0cfd7ba35ec9eede29afcb279 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 28 Jan 2019 15:40:15 +0100 Subject: [PATCH 7/7] unused import --- .../java/org/elasticsearch/cluster/SpecificMasterNodesIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index d79fd0a9832c4..aaf01b5e6e079 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;