From ce1071ec81d069cc3659125fce3c01063f696997 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 7 Oct 2018 16:00:14 +0100 Subject: [PATCH 01/24] Fix bugs in fixLag() The hack to work around lag detection had some issues: - it always called runFor(), even if no lag was detected - it looked at the last-accepted state not the last-applied state, so missed some lag situations. This fixes these issues. --- .../cluster/coordination/CoordinatorTests.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c1377c7d8419b..21bca351f9cc3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -614,21 +614,22 @@ void stabilise(long stabilisationDurationMillis) { // TODO remove this when lag detection is implemented void fixLag() { final ClusterNode leader = getAnyLeader(); - final long leaderVersion = leader.coordinator.getLastAcceptedState().version(); + final long leaderVersion = leader.coordinator.getApplierState().version(); final long minVersion = clusterNodes.stream() .filter(n -> isConnectedPair(n, leader)) - .map(n -> n.coordinator.getLastAcceptedState().version()).min(Long::compare).orElse(Long.MIN_VALUE); - + .map(n -> n.coordinator.getApplierState().version()).min(Long::compare).orElse(Long.MIN_VALUE); assert minVersion >= 0; if (minVersion < leaderVersion) { - logger.info("--> publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); + logger.info("--> fixLag publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); onNode(leader.getLocalNode(), () -> { synchronized (leader.coordinator.mutex) { leader.submitValue(randomLong()); } }).run(); + runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication"); + } else { + logger.info("--> fixLag found no lag, leader={}, leaderVersion={}, minVersion={}", leader, leaderVersion, minVersion); } - runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication"); } void runFor(long runDurationMillis, String description) { From 5c20e6366b976701f349492df49fa1e25d9fb9bf Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 7 Oct 2018 08:18:17 +0100 Subject: [PATCH 02/24] Add low-level bootstrap implementation Today we inject the initial configuration of the cluster (i.e. the set of voting nodes) at startup. In reality we must support injecting the initial configuration after startup too. This commit adds low-level support for doing so as safely as possible. --- .../coordination/CoordinationState.java | 18 ++-- .../cluster/coordination/Coordinator.java | 82 ++++++++++---- .../cluster/service/MasterService.java | 2 +- .../coordination/CoordinatorTests.java | 101 +++++++++++++++++- 4 files changed, 167 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index d5b9cdf6adfc3..fb7071e70806a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -132,17 +132,17 @@ public void setInitialState(ClusterState initialState) { throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion); } - assert getLastAcceptedTerm() == 0; - assert getLastAcceptedConfiguration().isEmpty(); - assert getLastCommittedConfiguration().isEmpty(); - assert lastPublishedVersion == 0; - assert lastPublishedConfiguration.isEmpty(); + assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm(); + assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration(); + assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration(); + assert lastPublishedVersion == 0 : lastAcceptedVersion; + assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration; assert electionWon == false; - assert joinVotes.isEmpty(); - assert publishVotes.isEmpty(); + assert joinVotes.isEmpty() : joinVotes; + assert publishVotes.isEmpty() : publishVotes; - assert initialState.term() == 0; - assert initialState.version() == 1; + assert initialState.term() == 0 : initialState; + assert initialState.version() == 1 : initialState; assert initialState.getLastAcceptedConfiguration().isEmpty() == false; assert initialState.getLastCommittedConfiguration().isEmpty() == false; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a8e21da705ce7..c193d6e89c070 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; @@ -480,6 +482,8 @@ public void invariant() { assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) + : preVoteCollector + " vs " + getPreVoteResponse(); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -493,7 +497,6 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -527,7 +530,6 @@ public void invariant() { assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -540,11 +542,41 @@ public void invariant() { assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); assert preVoteCollector.getLeader() == null : preVoteCollector; - assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } } } + public void setInitialConfiguration(final VotingConfiguration votingConfiguration) { + synchronized (mutex) { + final ClusterState currentState = getStateForMasterService(); + + if (currentState.getLastAcceptedConfiguration().isEmpty() == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: configuration has already been set"); + } + assert currentState.term() == 0 : currentState; + assert currentState.version() == 0 : currentState; + + if (mode != Mode.CANDIDATE) { + throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); + } + + final List foundPeerIds = new ArrayList<>(); + foundPeerIds.add(getLocalNode().getId()); + peerFinder.getFoundPeers().forEach(peer -> foundPeerIds.add(peer.getId())); + if (votingConfiguration.hasQuorum(foundPeerIds) == false) { + throw new CoordinationStateRejectedException("Cannot set initial configuration: no quorum found yet"); + } + + logger.debug("setting initial configuration to {}", votingConfiguration); + final Builder builder = masterService.incrementVersion(currentState); + builder.lastAcceptedConfiguration(votingConfiguration); + builder.lastCommittedConfiguration(votingConfiguration); + coordinationState.get().setInitialState(builder.build()); + startElectionScheduler(); + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + } + } + // for tests boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); @@ -731,25 +763,7 @@ protected void onFoundPeersUpdated() { if (foundQuorum) { if (electionScheduler == null) { - final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period - electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { - @Override - public void run() { - synchronized (mutex) { - if (mode == Mode.CANDIDATE) { - if (prevotingRound != null) { - prevotingRound.close(); - } - prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); - } - } - } - - @Override - public String toString() { - return "scheduling of new prevoting round"; - } - }); + startElectionScheduler(); } } else { closePrevotingAndElectionScheduler(); @@ -759,6 +773,30 @@ public String toString() { } } + private void startElectionScheduler() { + assert electionScheduler == null : electionScheduler; + final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period + electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() { + @Override + public void run() { + synchronized (mutex) { + if (mode == Mode.CANDIDATE) { + if (prevotingRound != null) { + prevotingRound.close(); + } + final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); + prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes()); + } + } + } + + @Override + public String toString() { + return "scheduling of new prevoting round"; + } + }); + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 59e4fc3852299..8719baeff9dbc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -329,7 +329,7 @@ private ClusterState patchVersions(ClusterState previousClusterState, ClusterTas return newClusterState; } - protected Builder incrementVersion(ClusterState clusterState) { + public Builder incrementVersion(ClusterState clusterState) { return ClusterState.builder(clusterState).incrementVersion(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c1377c7d8419b..08de15e195d98 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -102,6 +103,7 @@ public void resetPortCounterBeforeEachTest() { public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -121,6 +123,7 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -141,6 +144,7 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -177,6 +181,7 @@ public void testLeaderDisconnectionDetectedQuickly() { public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -219,6 +224,7 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -251,6 +257,7 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -275,6 +282,7 @@ public void testUnresponsiveFollowerDetectedEventually() { public void testAckListenerReceivesAcksFromAllNodes() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); AckCollector ackCollector = leader.submitValue(randomLong()); @@ -289,6 +297,7 @@ public void testAckListenerReceivesAcksFromAllNodes() { public void testAckListenerReceivesNackFromFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -306,6 +315,7 @@ public void testAckListenerReceivesNackFromFollower() { public void testAckListenerReceivesNackFromLeader() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -327,6 +337,7 @@ public void testAckListenerReceivesNackFromLeader() { public void testAckListenerReceivesNoAckFromHangingFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -345,6 +356,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { public void testAckListenerReceivesNacksIfPublicationTimesOut() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -404,6 +416,57 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); } + public void testSettingInitialConfigurationTriggersElection() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase"); + for (final ClusterNode clusterNode : cluster.clusterNodes) { + final String nodeId = clusterNode.getId(); + assertThat(nodeId + " is CANDIDATE", clusterNode.coordinator.getMode(), is(CANDIDATE)); + assertThat(nodeId + " is in term 0", clusterNode.coordinator.getCurrentTerm(), is(0L)); + assertThat(nodeId + " last accepted in term 0", clusterNode.coordinator.getLastAcceptedState().term(), is(0L)); + assertThat(nodeId + " last accepted version 0", clusterNode.coordinator.getLastAcceptedState().version(), is(0L)); + assertTrue(nodeId + " has an empty last-accepted configuration", + clusterNode.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty()); + assertTrue(nodeId + " has an empty last-committed configuration", + clusterNode.coordinator.getLastAcceptedState().getLastCommittedConfiguration().isEmpty()); + } + + cluster.getAnyNode().applyInitialConfiguration(); + cluster.stabilise(defaultMillis( + // the first election should succeed + ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately + // Allow two round-trip for pre-voting and voting + + 4 * DEFAULT_DELAY_VARIABILITY + // Then a commit of the new leader's first cluster state + + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + } + + public void testCannotSetInitialConfigurationTwice() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(); + cluster.setInitialConfigurationIfRequired(); + cluster.stabilise(); + + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(coordinator.getLastAcceptedState().getLastCommittedConfiguration())); + + assertThat(exception.getMessage(), is("Cannot set initial configuration: configuration has already been set")); + } + + public void testCannotSetInitialConfigurationWithoutQuorum() { + final Cluster cluster = new Cluster(randomIntBetween(1, 5)); + final Coordinator coordinator = cluster.getAnyNode().coordinator; + final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); + final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)); + assertThat(exception.getMessage(), is("Cannot set initial configuration: no quorum found yet")); + + // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. + coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); + cluster.stabilise(); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -495,6 +558,18 @@ void addNodes(int newNodesCount) { } } + void setInitialConfigurationIfRequired() { + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("--> setting initial configuration not required"); + } + } + void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it @@ -555,6 +630,14 @@ void runRandomly() { } break; } + } else if (rarely()) { + final ClusterNode clusterNode = getAnyNode(); + onNode(clusterNode.getLocalNode(), + () -> { + logger.debug("----> [runRandomly {}] applying initial configuration {} to {}", + thisStep, initialConfiguration, clusterNode.getId()); + clusterNode.coordinator.setInitialConfiguration(initialConfiguration); + }).run(); } else { if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { deterministicTaskQueue.advanceTime(); @@ -566,7 +649,6 @@ void runRandomly() { // TODO other random steps: // - reboot a node // - abdicate leadership - // - bootstrap } catch (CoordinationStateRejectedException ignored) { // This is ok: it just means a message couldn't currently be handled. @@ -705,7 +787,7 @@ private void assertUniqueLeaderAndExpectedModes() { ClusterNode getAnyLeader() { List allLeaders = clusterNodes.stream().filter(ClusterNode::isLeader).collect(Collectors.toList()); - assertThat(allLeaders, not(empty())); + assertThat("leaders", allLeaders, not(empty())); return randomFrom(allLeaders); } @@ -758,8 +840,8 @@ class ClusterNode extends AbstractComponent { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(1L, - clusterState(1L, 1L, localNode, initialConfiguration, initialConfiguration, 0L)); + persistedState = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); } @@ -916,6 +998,17 @@ ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } + void applyInitialConfiguration() { + onNode(localNode, () -> { + try { + coordinator.setInitialConfiguration(initialConfiguration); + logger.info("successfully set initial configuration to {}", initialConfiguration); + } catch (CoordinationStateRejectedException e) { + logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e); + } + }).run(); + } + private class FakeClusterApplier implements ClusterApplier { final ClusterName clusterName; From d6d1ee4e85bf0c1a3bf2343177bd5fe44c6aa688 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 08:53:39 +0100 Subject: [PATCH 03/24] Add storage-layer disruptions to CoordinatorTests Today we assume the storage layer operates perfectly in CoordinatorTests, which means we are not testing that the system's invariants are preserved if the storage layer fails for some reason. This change injects (rare) storage-layer failures during the safety phase to cover these cases. --- .../coordination/CoordinationStateTests.java | 1 + .../coordination/CoordinatorTests.java | 29 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index 0ce97e3f19f42..de661d0690aa0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -811,6 +811,7 @@ public static long value(ClusterState clusterState) { } public static class InMemoryPersistedState implements PersistedState { + // TODO add support and tests for behaviour with persistence-layer failures private long currentTerm; private ClusterState acceptedState; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 98ea7abaa5277..78afa9c6a140f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -524,6 +524,7 @@ class Cluster { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random()); + private boolean disruptStorage; private final VotingConfiguration initialConfiguration; private final Set disconnectedNodes = new HashSet<>(); @@ -580,6 +581,7 @@ void runRandomly() { logger.info("--> start of safety phase of at least [{}] steps", randomSteps); deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + disruptStorage = true; int step = 0; long finishTime = -1; @@ -659,6 +661,7 @@ void runRandomly() { disconnectedNodes.clear(); blackholedNodes.clear(); + disruptStorage = false; } private void assertConsistentStates() { @@ -826,6 +829,30 @@ ClusterNode getAnyNodePreferringLeaders() { return getAnyNode(); } + class MockPersistedState extends InMemoryPersistedState { + MockPersistedState(long term, ClusterState acceptedState) { + super(term, acceptedState); + } + + private void possiblyFail(String description) { + if (disruptStorage && rarely()) { + throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); + } + } + + @Override + public void setCurrentTerm(long currentTerm) { + possiblyFail("writing term of " + currentTerm); + super.setCurrentTerm(currentTerm); + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + possiblyFail("writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); + super.setLastAcceptedState(clusterState); + } + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -841,7 +868,7 @@ class ClusterNode extends AbstractComponent { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(0L, + persistedState = new MockPersistedState(0L, clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); } From 5e7a4f93e440691417de8bd60f2275319cf97145 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 09:11:04 +0100 Subject: [PATCH 04/24] Assert that we clean up disruptStorage correctly --- .../org/elasticsearch/cluster/coordination/CoordinatorTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 78afa9c6a140f..08053cd04540e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -691,6 +691,7 @@ void stabilise() { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + assertFalse("stabilisation requires stable storage", disruptStorage); runFor(stabilisationDurationMillis, "stabilising"); fixLag(); assertUniqueLeaderAndExpectedModes(); From 5c47f550140910e216e6fe00240a0e43cb9639ca Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:21:49 +0100 Subject: [PATCH 05/24] Better message in the case where a quorum has not been discovered --- .../cluster/coordination/Coordinator.java | 12 +++++++----- .../cluster/coordination/CoordinatorTests.java | 13 ++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index c193d6e89c070..82f8d32c8bbe0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -560,11 +561,12 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode); } - final List foundPeerIds = new ArrayList<>(); - foundPeerIds.add(getLocalNode().getId()); - peerFinder.getFoundPeers().forEach(peer -> foundPeerIds.add(peer.getId())); - if (votingConfiguration.hasQuorum(foundPeerIds) == false) { - throw new CoordinationStateRejectedException("Cannot set initial configuration: no quorum found yet"); + final List knownNodes = new ArrayList<>(); + knownNodes.add(getLocalNode()); + peerFinder.getFoundPeers().forEach(knownNodes::add); + if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList())) == false) { + throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " + + "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); } logger.debug("setting initial configuration to {}", votingConfiguration); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 08de15e195d98..a59c1d9e4085f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -85,12 +85,15 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { @@ -458,9 +461,13 @@ public void testCannotSetInitialConfigurationWithoutQuorum() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); final Coordinator coordinator = cluster.getAnyNode().coordinator; final VotingConfiguration unknownNodeConfiguration = new VotingConfiguration(Collections.singleton("unknown-node")); - final CoordinationStateRejectedException exception = expectThrows(CoordinationStateRejectedException.class, - () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)); - assertThat(exception.getMessage(), is("Cannot set initial configuration: no quorum found yet")); + final String exceptionMessage = expectThrows(CoordinationStateRejectedException.class, + () -> coordinator.setInitialConfiguration(unknownNodeConfiguration)).getMessage(); + assertThat(exceptionMessage, + startsWith("not enough nodes discovered to form a quorum in the initial configuration [knownNodes=[")); + assertThat(exceptionMessage, + endsWith("], VotingConfiguration{unknown-node}]")); + assertThat(exceptionMessage, containsString(coordinator.getLocalNode().toString())); // This is VERY BAD: setting a _different_ initial configuration. Yet it works if the first attempt will never be a quorum. coordinator.setInitialConfiguration(new VotingConfiguration(Collections.singleton(coordinator.getLocalNode().getId()))); From dbb2b1ab1739456cdfa7edcfed7adf48d290ab6f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:27:21 +0100 Subject: [PATCH 06/24] Review feedback --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 82f8d32c8bbe0..3d5e8900738ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -569,13 +569,13 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio "[knownNodes=" + knownNodes + ", " + votingConfiguration + "]"); } - logger.debug("setting initial configuration to {}", votingConfiguration); + logger.info("setting initial configuration to {}", votingConfiguration); final Builder builder = masterService.incrementVersion(currentState); builder.lastAcceptedConfiguration(votingConfiguration); builder.lastCommittedConfiguration(votingConfiguration); coordinationState.get().setInitialState(builder.build()); - startElectionScheduler(); preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); } } From ef712a50a6904a7c5b4aa1132e6fbec1c54a65c1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:29:14 +0100 Subject: [PATCH 07/24] Set initial configuration at the start of stabilisation --- .../coordination/CoordinatorTests.java | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index a59c1d9e4085f..690b393b4d196 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -106,7 +106,6 @@ public void resetPortCounterBeforeEachTest() { public void testCanUpdateClusterStateAfterStabilisation() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -126,7 +125,6 @@ public void testCanUpdateClusterStateAfterStabilisation() { public void testNodesJoinAfterStableCluster() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm(); @@ -147,7 +145,6 @@ public void testNodesJoinAfterStableCluster() { public void testLeaderDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -184,7 +181,6 @@ public void testLeaderDisconnectionDetectedQuickly() { public void testUnresponsiveLeaderDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode originalLeader = cluster.getAnyLeader(); @@ -227,7 +223,6 @@ public void testUnresponsiveLeaderDetectedEventually() { public void testFollowerDisconnectionDetectedQuickly() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -260,7 +255,6 @@ public void testFollowerDisconnectionDetectedQuickly() { public void testUnresponsiveFollowerDetectedEventually() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); @@ -285,7 +279,6 @@ public void testUnresponsiveFollowerDetectedEventually() { public void testAckListenerReceivesAcksFromAllNodes() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); AckCollector ackCollector = leader.submitValue(randomLong()); @@ -300,7 +293,6 @@ public void testAckListenerReceivesAcksFromAllNodes() { public void testAckListenerReceivesNackFromFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -318,7 +310,6 @@ public void testAckListenerReceivesNackFromFollower() { public void testAckListenerReceivesNackFromLeader() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -340,7 +331,6 @@ public void testAckListenerReceivesNackFromLeader() { public void testAckListenerReceivesNoAckFromHangingFollower() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -359,7 +349,6 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { public void testAckListenerReceivesNacksIfPublicationTimesOut() { final Cluster cluster = new Cluster(3); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -447,7 +436,6 @@ public void testSettingInitialConfigurationTriggersElection() { public void testCannotSetInitialConfigurationTwice() { final Cluster cluster = new Cluster(randomIntBetween(1, 5)); cluster.runRandomly(); - cluster.setInitialConfigurationIfRequired(); cluster.stabilise(); final Coordinator coordinator = cluster.getAnyNode().coordinator; @@ -565,18 +553,6 @@ void addNodes(int newNodesCount) { } } - void setInitialConfigurationIfRequired() { - if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { - assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); - assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); - runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); - final ClusterNode bootstrapNode = getAnyNode(); - bootstrapNode.applyInitialConfiguration(); - } else { - logger.info("--> setting initial configuration not required"); - } - } - void runRandomly() { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it @@ -695,6 +671,17 @@ void stabilise() { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + + if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { + assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); + assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); + runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + final ClusterNode bootstrapNode = getAnyNode(); + bootstrapNode.applyInitialConfiguration(); + } else { + logger.info("setting initial configuration not required"); + } + runFor(stabilisationDurationMillis, "stabilising"); fixLag(); assertUniqueLeaderAndExpectedModes(); From dcbacd8c1ad079187bbe05c76a10f9ebbfe44a44 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Oct 2018 10:31:00 +0100 Subject: [PATCH 08/24] Describe why the first election should succeed --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 690b393b4d196..5eec9703d08d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -425,7 +425,8 @@ public void testSettingInitialConfigurationTriggersElection() { cluster.getAnyNode().applyInitialConfiguration(); cluster.stabilise(defaultMillis( - // the first election should succeed + // the first election should succeed, because only one node knows of the initial configuration and therefore can win a + // pre-voting round and proceed to an election, so there cannot be any collisions ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately // Allow two round-trip for pre-voting and voting + 4 * DEFAULT_DELAY_VARIABILITY From b81ef533c43dd4c1d256dade20dd6c69f773607b Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Oct 2018 10:26:57 +0100 Subject: [PATCH 09/24] Fail before or after --- .../cluster/coordination/CoordinatorTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2f863d4ea3240..2f51bd741396c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -838,14 +838,16 @@ private void possiblyFail(String description) { @Override public void setCurrentTerm(long currentTerm) { - possiblyFail("writing term of " + currentTerm); + possiblyFail("before writing term of " + currentTerm); super.setCurrentTerm(currentTerm); + possiblyFail("after writing term of " + currentTerm); } @Override public void setLastAcceptedState(ClusterState clusterState) { - possiblyFail("writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); + possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); super.setLastAcceptedState(clusterState); + possiblyFail("after writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); } } From 9bd4741dd8e7d35aa917b2897a5d73afe848b149 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Oct 2018 15:04:43 +0100 Subject: [PATCH 10/24] Handle exception when bumping term --- .../elasticsearch/cluster/coordination/Coordinator.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3d5e8900738ca..aeea324e10bd0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -264,7 +264,12 @@ private void updateMaxTermSeen(final long term) { if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) { // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that // since we check whether a term bump is needed at the end of the publication too. - ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); + try { + ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); + } catch (Exception e) { + becomeCandidate("updateMaxTermSeen"); + return; + } startElection(); } } From 1db841908ad9da9bf1c0dcbd732cc2dc6b6363d8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Oct 2018 15:08:16 +0100 Subject: [PATCH 11/24] Throw more kinds of exception --- .../cluster/coordination/CoordinatorTests.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2f51bd741396c..20c7cc0595ed5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -49,6 +49,8 @@ import org.hamcrest.Matcher; import org.junit.Before; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -636,7 +638,7 @@ void runRandomly() { // - reboot a node // - abdicate leadership - } catch (CoordinationStateRejectedException ignored) { + } catch (CoordinationStateRejectedException | UncheckedIOException ignored) { // This is ok: it just means a message couldn't currently be handled. } @@ -832,7 +834,12 @@ class MockPersistedState extends InMemoryPersistedState { private void possiblyFail(String description) { if (disruptStorage && rarely()) { - throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); + // TODO revisit this when we've decided how PersistedState should throw exceptions + if (randomBoolean()) { + throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); + } else { + throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); + } } } From c72e863c10a005a2f9dacd9669079d691ef1fbdd Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 11 Oct 2018 15:23:20 +0100 Subject: [PATCH 12/24] Log too --- .../java/org/elasticsearch/cluster/coordination/Coordinator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index aeea324e10bd0..1292b2c4f0ad1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -267,6 +267,7 @@ private void updateMaxTermSeen(final long term) { try { ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); } catch (Exception e) { + logger.debug("failed to bump term", e); becomeCandidate("updateMaxTermSeen"); return; } From 6ce5f678896e638cf4bccbe11e8a3e91e4b98d5c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 12 Oct 2018 10:04:07 +0100 Subject: [PATCH 13/24] More finallys --- .../cluster/coordination/Coordinator.java | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 48146ab6b64ca..2930d92bde759 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -305,16 +305,19 @@ private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTe private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { synchronized (mutex) { logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); - final Join join = coordinationState.get().handleStartJoin(startJoinRequest); - lastJoin = Optional.of(join); - peerFinder.setCurrentTerm(getCurrentTerm()); - if (mode != Mode.CANDIDATE) { - becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector - } else { - followersChecker.updateFastResponseState(getCurrentTerm(), mode); - preVoteCollector.update(getPreVoteResponse(), null); + try { + final Join join = coordinationState.get().handleStartJoin(startJoinRequest); + lastJoin = Optional.of(join); + return join; + } finally { + peerFinder.setCurrentTerm(getCurrentTerm()); + if (mode != Mode.CANDIDATE) { + becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector + } else { + followersChecker.updateFastResponseState(getCurrentTerm(), mode); + preVoteCollector.update(getPreVoteResponse(), null); + } } - return join; } } @@ -584,9 +587,12 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio final Builder builder = masterService.incrementVersion(currentState); builder.lastAcceptedConfiguration(votingConfiguration); builder.lastCommittedConfiguration(votingConfiguration); - coordinationState.get().setInitialState(builder.build()); - preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version - startElectionScheduler(); + try { + coordinationState.get().setInitialState(builder.build()); + } finally { + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); + } } } From 68bdc0854ebf07a463d4804ec757fa69dd936c4b Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 12 Oct 2018 10:22:31 +0100 Subject: [PATCH 14/24] More finally --- .../cluster/coordination/Coordinator.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 2930d92bde759..903e2cba609f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -221,17 +221,21 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); } - ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); - final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); - - if (sourceNode.equals(getLocalNode())) { - preVoteCollector.update(getPreVoteResponse(), getLocalNode()); - } else { - becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector + boolean success = false; + try { + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); + final PublishWithJoinResponse publishWithJoinResponse + = new PublishWithJoinResponse(coordinationState.get().handlePublishRequest(publishRequest), + joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); + success = true; + return publishWithJoinResponse; + } finally { + if (sourceNode.equals(getLocalNode()) || success == false) { + preVoteCollector.update(getPreVoteResponse(), getLocalNode()); + } else { + becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector + } } - - return new PublishWithJoinResponse(publishResponse, - joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } } From e1e33faec1f781384d3bbc9d197ded21a7edc3aa Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 12 Oct 2018 10:59:46 +0100 Subject: [PATCH 15/24] Fix node to follow --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 903e2cba609f2..8c4c2a0d3f392 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -231,7 +231,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { return publishWithJoinResponse; } finally { if (sourceNode.equals(getLocalNode()) || success == false) { - preVoteCollector.update(getPreVoteResponse(), getLocalNode()); + preVoteCollector.update(getPreVoteResponse(), sourceNode); } else { becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } From 3dac8fa8899977003bae3547e9c7c9b33eae9adf Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 12 Oct 2018 11:05:23 +0100 Subject: [PATCH 16/24] become candidate on failure --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8c4c2a0d3f392..3638e22e2f39e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -230,7 +230,9 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { success = true; return publishWithJoinResponse; } finally { - if (sourceNode.equals(getLocalNode()) || success == false) { + if (success == false) { + becomeCandidate("handlePublishRequest"); + } else if (sourceNode.equals(getLocalNode())) { preVoteCollector.update(getPreVoteResponse(), sourceNode); } else { becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector From ac1d564bcee0ee0bade3e7e5e4b6be3dbba85865 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 12 Oct 2018 18:16:25 +0100 Subject: [PATCH 17/24] Do not always start election scheduler --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3638e22e2f39e..44bb88fce3a01 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -597,7 +597,10 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio coordinationState.get().setInitialState(builder.build()); } finally { preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version - startElectionScheduler(); + if (coordinationState.get().getLastAcceptedVersion() > 0 && electionScheduler == null) { + // initial state application was successful + startElectionScheduler(); + } } } } From 27de8d7cd0032583cda5db2f881b54be2b0776d3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:55:21 +0100 Subject: [PATCH 18/24] Revert "Do not always start election scheduler" This reverts commit ac1d564bcee0ee0bade3e7e5e4b6be3dbba85865. --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 44bb88fce3a01..3638e22e2f39e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -597,10 +597,7 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio coordinationState.get().setInitialState(builder.build()); } finally { preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version - if (coordinationState.get().getLastAcceptedVersion() > 0 && electionScheduler == null) { - // initial state application was successful - startElectionScheduler(); - } + startElectionScheduler(); } } } From 7b6985b56a45a35fb317ceb5af1b05e22f3721ce Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:55:23 +0100 Subject: [PATCH 19/24] Revert "become candidate on failure" This reverts commit 3dac8fa8899977003bae3547e9c7c9b33eae9adf. --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3638e22e2f39e..8c4c2a0d3f392 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -230,9 +230,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { success = true; return publishWithJoinResponse; } finally { - if (success == false) { - becomeCandidate("handlePublishRequest"); - } else if (sourceNode.equals(getLocalNode())) { + if (sourceNode.equals(getLocalNode()) || success == false) { preVoteCollector.update(getPreVoteResponse(), sourceNode); } else { becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector From 02c1cf14df8c4a03ef966051c6a7a768e3027cad Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:55:25 +0100 Subject: [PATCH 20/24] Revert "Fix node to follow" This reverts commit e1e33faec1f781384d3bbc9d197ded21a7edc3aa. --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8c4c2a0d3f392..903e2cba609f2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -231,7 +231,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { return publishWithJoinResponse; } finally { if (sourceNode.equals(getLocalNode()) || success == false) { - preVoteCollector.update(getPreVoteResponse(), sourceNode); + preVoteCollector.update(getPreVoteResponse(), getLocalNode()); } else { becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } From d43916eebe43eec003a346662b67e55aafd00e26 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:55:27 +0100 Subject: [PATCH 21/24] Revert "More finally" This reverts commit 68bdc0854ebf07a463d4804ec757fa69dd936c4b. --- .../cluster/coordination/Coordinator.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 903e2cba609f2..2930d92bde759 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -221,21 +221,17 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest); } - boolean success = false; - try { - ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); - final PublishWithJoinResponse publishWithJoinResponse - = new PublishWithJoinResponse(coordinationState.get().handlePublishRequest(publishRequest), - joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); - success = true; - return publishWithJoinResponse; - } finally { - if (sourceNode.equals(getLocalNode()) || success == false) { - preVoteCollector.update(getPreVoteResponse(), getLocalNode()); - } else { - becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector - } + ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); + final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); + + if (sourceNode.equals(getLocalNode())) { + preVoteCollector.update(getPreVoteResponse(), getLocalNode()); + } else { + becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } + + return new PublishWithJoinResponse(publishResponse, + joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term())); } } From 22f5af7d818bb5a1d30eb1a3287545901f208da0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:55:29 +0100 Subject: [PATCH 22/24] Revert "More finallys" This reverts commit 6ce5f678896e638cf4bccbe11e8a3e91e4b98d5c. --- .../cluster/coordination/Coordinator.java | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 2930d92bde759..48146ab6b64ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -305,19 +305,16 @@ private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTe private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { synchronized (mutex) { logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm()); - try { - final Join join = coordinationState.get().handleStartJoin(startJoinRequest); - lastJoin = Optional.of(join); - return join; - } finally { - peerFinder.setCurrentTerm(getCurrentTerm()); - if (mode != Mode.CANDIDATE) { - becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector - } else { - followersChecker.updateFastResponseState(getCurrentTerm(), mode); - preVoteCollector.update(getPreVoteResponse(), null); - } + final Join join = coordinationState.get().handleStartJoin(startJoinRequest); + lastJoin = Optional.of(join); + peerFinder.setCurrentTerm(getCurrentTerm()); + if (mode != Mode.CANDIDATE) { + becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector + } else { + followersChecker.updateFastResponseState(getCurrentTerm(), mode); + preVoteCollector.update(getPreVoteResponse(), null); } + return join; } } @@ -587,12 +584,9 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio final Builder builder = masterService.incrementVersion(currentState); builder.lastAcceptedConfiguration(votingConfiguration); builder.lastCommittedConfiguration(votingConfiguration); - try { - coordinationState.get().setInitialState(builder.build()); - } finally { - preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version - startElectionScheduler(); - } + coordinationState.get().setInitialState(builder.build()); + preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version + startElectionScheduler(); } } From 38fac651558a50ec9f29c63b9181a4cb69d0b6b8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:56:42 +0100 Subject: [PATCH 23/24] Revert "Handle exception when bumping term" This reverts commit 9bd4741dd8e7d35aa917b2897a5d73afe848b149. --- .../elasticsearch/cluster/coordination/Coordinator.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 48146ab6b64ca..438aaa7ace4d4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -268,13 +268,7 @@ private void updateMaxTermSeen(final long term) { logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm); } else { - try { - ensureTermAtLeast(getLocalNode(), maxTermSeen); - } catch (Exception e) { - logger.debug("failed to bump term", e); - becomeCandidate("updateMaxTermSeen"); - return; - } + ensureTermAtLeast(getLocalNode(), maxTermSeen); startElection(); } } From fce76c23f47010aecb305c1e75505718107099c4 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 09:59:13 +0100 Subject: [PATCH 24/24] Remove possiblyFail() calls after updating state --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 8d06f53158179..5230c7e52945b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -852,14 +852,14 @@ private void possiblyFail(String description) { public void setCurrentTerm(long currentTerm) { possiblyFail("before writing term of " + currentTerm); super.setCurrentTerm(currentTerm); - possiblyFail("after writing term of " + currentTerm); + // TODO possiblyFail() here if that's a failure mode of the storage layer } @Override public void setLastAcceptedState(ClusterState clusterState) { possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); super.setLastAcceptedState(clusterState); - possiblyFail("after writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); + // TODO possiblyFail() here if that's a failure mode of the storage layer } }