From 9129532b7a28e6e5226cdc07a59e2ee8f640b953 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:30:36 +0100 Subject: [PATCH 01/26] Add assertions about the preVoteCollector's consistency --- .../cluster/coordination/Coordinator.java | 9 ++++++++- .../cluster/coordination/PreVoteCollector.java | 13 +++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3ef43cead664a..152ff1e5a10fc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -289,9 +289,10 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) { lastJoin = Optional.of(join); peerFinder.setCurrentTerm(getCurrentTerm()); if (mode != Mode.CANDIDATE) { - becomeCandidate("joinLeaderInTerm"); // updates followersChecker + becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector } else { followersChecker.updateFastResponseState(getCurrentTerm(), mode); + preVoteCollector.update(getPreVoteResponse(), null); } return join; } @@ -485,6 +486,8 @@ public void invariant() { assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); + assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector; + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); if (becomingMaster && activePublication == false) { @@ -517,6 +520,8 @@ public void invariant() { assert leaderCheckScheduler != null; assert followersChecker.getKnownFollowers().isEmpty(); assert currentPublication.map(Publication::isCommitted).orElse(true); + assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector; + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -528,6 +533,8 @@ public void invariant() { assert followersChecker.getKnownFollowers().isEmpty(); assert applierState.nodes().getMasterNodeId() == null; assert currentPublication.map(Publication::isCommitted).orElse(true); + assert preVoteCollector.getLeader() == null : preVoteCollector; + assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index c3417c17f8f53..14e69cb76109d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -79,6 +79,17 @@ public Releasable start(final ClusterState clusterState, final Iterable(leader, preVoteResponse); @@ -88,6 +99,8 @@ private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { // TODO if we are a leader and the max term seen exceeds our term then we need to bump our term Tuple state = this.state; + assert state != null : "received pre-vote request before fully initialised"; + final DiscoveryNode leader = state.v1(); if (leader == null) { From 33113de34e4b2ef6d02310ae1ebf93dc01ca6d2a Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:40:45 +0100 Subject: [PATCH 02/26] Extract variable --- .../elasticsearch/cluster/coordination/PreVoteCollector.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 14e69cb76109d..035c21c7d2680 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -102,9 +102,10 @@ private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { assert state != null : "received pre-vote request before fully initialised"; final DiscoveryNode leader = state.v1(); + final PreVoteResponse response = state.v2(); if (leader == null) { - return state.v2(); + return response; } if (leader.equals(request.getSourceNode())) { @@ -113,7 +114,7 @@ private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers // to also detect its failure. - return state.v2(); + return response; } throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader"); From 26be2918328a6c73c7954d8ff3297d4522955391 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:41:23 +0100 Subject: [PATCH 03/26] Log all exceptions the same --- .../cluster/coordination/PreVoteCollector.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 035c21c7d2680..6e0946f2c0911 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -155,11 +155,7 @@ public void handleResponse(PreVoteResponse response) { @Override public void handleException(TransportException exp) { - if (exp.getRootCause() instanceof CoordinationStateRejectedException) { - logger.debug("{} failed: {}", this, exp.getRootCause().getMessage()); - } else { - logger.debug(new ParameterizedMessage("{} failed", this), exp); - } + logger.debug("{} failed: {}", this, exp.getRootCause().getMessage()); } @Override From 4642e24c2a1b55aa098901ebcf3d1b194bfdf054 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:42:32 +0100 Subject: [PATCH 04/26] Update max term seen --- .../elasticsearch/cluster/coordination/PreVoteCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index 6e0946f2c0911..a0c6832cf85da 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -96,7 +96,7 @@ public void update(final PreVoteResponse preVoteResponse, @Nullable final Discov } private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { - // TODO if we are a leader and the max term seen exceeds our term then we need to bump our term + updateMaxTermSeen.accept(request.getCurrentTerm()); Tuple state = this.state; assert state != null : "received pre-vote request before fully initialised"; From 22130dc1dc98e5b12347512ed6ce7433b94685d3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:44:26 +0100 Subject: [PATCH 05/26] Remove term-bump workaround --- .../cluster/coordination/Coordinator.java | 6 ++---- .../coordination/CoordinatorTests.java | 19 ------------------- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 152ff1e5a10fc..f692d7d66c8d0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -260,8 +260,7 @@ private void updateMaxTermSeen(final long term) { // do check for this after the publication completes) } - // TODO: make private again after removing term-bump workaround - void startElection() { + private void startElection() { synchronized (mutex) { // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have // to check our mode again here. @@ -273,8 +272,7 @@ void startElection() { } } - // TODO: make private again after removing term-bump workaround - Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { + private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; if (getCurrentTerm() < targetTerm) { return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm))); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c7e6fc91b9440..e2fee062fb529 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -599,25 +599,6 @@ void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); runFor(stabilisationDurationMillis, "stabilising"); - - // TODO remove when term-bumping is enabled - final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); - final long maxLeaderTerm = clusterNodes.stream().filter(n -> n.coordinator.getMode() == Coordinator.Mode.LEADER) - .map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); - - if (maxLeaderTerm < maxTerm) { - logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm); - final ClusterNode leader = getAnyLeader(); - onNode(leader.getLocalNode(), () -> { - synchronized (leader.coordinator.mutex) { - leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1); - } - leader.coordinator.startElection(); - }).run(); - runFor(DEFAULT_ELECTION_DELAY, "re-stabilising after term bump"); - } - logger.info("--> end of stabilisation"); - assertUniqueLeaderAndExpectedModes(); } From 3ba1a6d98b89efd25898dfda32f94634521e95ed Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:52:03 +0100 Subject: [PATCH 06/26] Bump term on discovery of the need to do so --- .../cluster/coordination/Coordinator.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index f692d7d66c8d0..22a1b1b36065b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -254,10 +254,15 @@ private void closePrevotingAndElectionScheduler() { } private void updateMaxTermSeen(final long term) { - maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term)); - // TODO if we are leader here, and there is no publication in flight, then we should bump our term - // (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but - // do check for this after the publication completes) + final long updatedMaxTermSeen = maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term)); + synchronized (mutex) { + if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) { + // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that + // since we check whether a term bump is needed at the end of the publication too. + ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen); + startElection(); + } + } } private void startElection() { From 4a70f628becc5bf67f61b9456e7409d421a0e6c6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:53:41 +0100 Subject: [PATCH 07/26] Reinstate assertion that every connected node has voted for the leader --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index e2fee062fb529..23ce55208df0b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -658,7 +658,7 @@ private void assertUniqueLeaderAndExpectedModes() { if (isConnectedPair(leader, clusterNode)) { assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER)); assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm)); - // TODO assert that this node has actually voted for the leader in this term + assertTrue(nodeId + " has voted for the leader", leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode())); // TODO assert that this node's accepted and committed states are the same as the leader's assertTrue(nodeId + " is in the leader's applied state", From 9cea60fe29486a41ce9ef00607c59f396b60fef2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 09:58:11 +0100 Subject: [PATCH 08/26] Make fields private --- .../elasticsearch/cluster/coordination/Coordinator.java | 7 +++++++ .../cluster/coordination/PublishResponse.java | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 22a1b1b36065b..aca8f456ef9cc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -871,6 +871,13 @@ protected Optional handlePublishResponse(DiscoveryNode sourc PublishResponse publishResponse) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert getCurrentTerm() >= publishResponse.getTerm(); + + if (hasJoinVoteFrom(sourceNode) == false) { + // process the join in the publish response, if present; if not then the node voted for someone else and we need to bump + // term by 1 + throw new AssertionError("TODO"); + } + return coordinationState.get().handlePublishResponse(sourceNode, publishResponse); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java index be7c11857021a..0890e8cb525ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublishResponse.java @@ -30,8 +30,8 @@ */ public class PublishResponse implements Writeable { - protected final long term; - protected final long version; + private final long term; + private final long version; public PublishResponse(long term, long version) { assert term >= 0; From 5acffc902fbab1b893ad4da464840d935fec6c76 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 11:02:18 +0100 Subject: [PATCH 09/26] Generate DiscoveryNodes deterministically --- .../cluster/coordination/CoordinationStateTests.java | 9 ++++++++- .../cluster/coordination/CoordinatorTests.java | 8 +------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index 47b34c9e3b3b5..0ce97e3f19f42 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -25,10 +25,12 @@ import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; @@ -37,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -82,7 +85,11 @@ public void setupNodes() { } public static DiscoveryNode createNode(String id) { - return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT); + final TransportAddress address = buildNewFakeTransportAddress(); + return new DiscoveryNode("", id, + UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests + address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), + EnumSet.allOf(Role.class), Version.CURRENT); } public void testSetInitialState() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 23ce55208df0b..1bc5a3f8ab378 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -735,13 +735,7 @@ class ClusterNode extends AbstractComponent { } private DiscoveryNode createDiscoveryNode() { - final TransportAddress transportAddress = buildNewFakeTransportAddress(); - // Generate the ephemeral ID deterministically, for repeatable tests. This means we have to pass everything else into the - // constructor explicitly too. - return new DiscoveryNode("", nodeIdFromIndex(nodeIndex), UUIDs.randomBase64UUID(random()), - transportAddress.address().getHostString(), - transportAddress.getAddress(), transportAddress, Collections.emptyMap(), - EnumSet.allOf(Role.class), Version.CURRENT); + return CoordinationStateTests.createNode(nodeIdFromIndex(nodeIndex)); } private void setUp() { From e11e92960381fd34b46b5fdddd0ee9629f603508 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 11:03:09 +0100 Subject: [PATCH 10/26] Private --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index aca8f456ef9cc..5eda0308f6124 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -547,7 +547,7 @@ boolean hasJoinVoteFrom(DiscoveryNode localNode) { return coordinationState.get().containsJoinVoteFor(localNode); } - void handleJoin(Join join) { + private void handleJoin(Join join) { synchronized (mutex) { ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin); From 2c98dd7ebfc372cf6d3c899d99cea9b70a2ea367 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 11:17:47 +0100 Subject: [PATCH 11/26] Handle publish requests without attached joins --- .../cluster/coordination/Coordinator.java | 12 +++++++++++- .../cluster/coordination/Publication.java | 10 ++++++++-- .../cluster/coordination/PublicationTests.java | 10 +++++++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 5eda0308f6124..061dac6d7b0b5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -887,7 +887,17 @@ protected void onJoin(Join join) { if (join.getTerm() == getCurrentTerm()) { handleJoin(join); } - // TODO: what to do on missing join? + } + + @Override + protected void onMissingJoin(DiscoveryNode discoveryNode) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + // The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote + // node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy + // of a join from this node then we assume the latter and bump our term to obtain a vote from this node. + if (hasJoinVoteFrom(discoveryNode) == false) { + updateMaxTermSeen(publishRequest.getAcceptedState().term() + 1); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 20104755996b4..3e947238198ed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -172,6 +172,8 @@ private void onPossibleCommitFailure() { protected abstract void onJoin(Join join); + protected abstract void onMissingJoin(DiscoveryNode discoveryNode); + protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener); @@ -301,10 +303,14 @@ public void onResponse(PublishWithJoinResponse response) { return; } - response.getJoin().ifPresent(join -> { + if (response.getJoin().isPresent()) { + final Join join = response.getJoin().get(); assert discoveryNode.equals(join.getSourceNode()); + assert join.getTerm() == response.getPublishResponse().getTerm() : response; onJoin(join); - }); + } else { + onMissingJoin(discoveryNode); + } assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM; state = PublicationTargetState.WAITING_FOR_QUORUM; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java index e08b59bc4b8de..d418f1b63c702 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java @@ -101,6 +101,7 @@ abstract class MockPublication extends Publication { Map> pendingPublications = new HashMap<>(); Map> pendingCommits = new HashMap<>(); Map joins = new HashMap<>(); + Set missingJoins = new HashSet<>(); MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) { @@ -120,6 +121,11 @@ protected void onJoin(Join join) { assertNull(joins.put(join.getSourceNode(), join)); } + @Override + protected void onMissingJoin(DiscoveryNode discoveryNode) { + assertTrue(missingJoins.add(discoveryNode)); + } + @Override protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, ActionListener responseActionListener) { @@ -182,14 +188,16 @@ public void testSimpleClusterStatePublishing() throws InterruptedException { assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty()); assertFalse(publication.joins.containsKey(e.getKey())); PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse, - randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(), + randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), publishResponse.getTerm(), randomNonNegativeLong(), randomNonNegativeLong()))); e.getValue().onResponse(publishWithJoinResponse); if (publishWithJoinResponse.getJoin().isPresent()) { assertTrue(publication.joins.containsKey(e.getKey())); + assertFalse(publication.missingJoins.contains(e.getKey())); assertEquals(publishWithJoinResponse.getJoin().get(), publication.joins.get(e.getKey())); } else { assertFalse(publication.joins.containsKey(e.getKey())); + assertTrue(publication.missingJoins.contains(e.getKey())); } if (e.getKey().equals(n1)) { processedNode1PublishResponse.set(true); From 7569c4fcb97ae00a30a565ff388cbc96ad20167d Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 11:33:55 +0100 Subject: [PATCH 12/26] Fix missing update to preVoteCollector --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 061dac6d7b0b5..7fb1fd8936828 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -222,8 +222,10 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); - if (sourceNode.equals(getLocalNode()) == false) { - becomeFollower("handlePublishRequest", sourceNode); + if (sourceNode.equals(getLocalNode())) { + preVoteCollector.update(getPreVoteResponse(), getLocalNode()); + } else { + becomeFollower("handlePublishRequest", sourceNode); // updates preVoteCollector } return new PublishWithJoinResponse(publishResponse, From 91083804a1c251a9cedc6d041d981f82bd2be89c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 12:15:51 +0100 Subject: [PATCH 13/26] TODO is done --- .../elasticsearch/cluster/coordination/Coordinator.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 7fb1fd8936828..bd0863e391591 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -873,13 +873,6 @@ protected Optional handlePublishResponse(DiscoveryNode sourc PublishResponse publishResponse) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert getCurrentTerm() >= publishResponse.getTerm(); - - if (hasJoinVoteFrom(sourceNode) == false) { - // process the join in the publish response, if present; if not then the node voted for someone else and we need to bump - // term by 1 - throw new AssertionError("TODO"); - } - return coordinationState.get().handlePublishResponse(sourceNode, publishResponse); } From af8916ad386b5f28bb91b10f730b2094ec479de8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 12:38:12 +0100 Subject: [PATCH 14/26] Trace join handling --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 4 +++- .../org/elasticsearch/cluster/coordination/Publication.java | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index bd0863e391591..81d18f43b8dda 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -891,7 +891,9 @@ protected void onMissingJoin(DiscoveryNode discoveryNode) { // node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy // of a join from this node then we assume the latter and bump our term to obtain a vote from this node. if (hasJoinVoteFrom(discoveryNode) == false) { - updateMaxTermSeen(publishRequest.getAcceptedState().term() + 1); + final long term = publishRequest.getAcceptedState().term(); + logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term); + updateMaxTermSeen(term + 1); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 3e947238198ed..af8cc00d64e10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -307,8 +307,10 @@ public void onResponse(PublishWithJoinResponse response) { final Join join = response.getJoin().get(); assert discoveryNode.equals(join.getSourceNode()); assert join.getTerm() == response.getPublishResponse().getTerm() : response; + logger.trace("handling join within publish response: {}", join); onJoin(join); } else { + logger.trace("publish response from {} contained no join", discoveryNode); onMissingJoin(discoveryNode); } From b2675ae5608013f61326133493bfe4808c00e4b2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 12:44:11 +0100 Subject: [PATCH 15/26] Include join in message --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 81d18f43b8dda..ce95dbbbc717d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -559,7 +559,7 @@ private void handleJoin(Join join) { try { coordinationState.get().handleJoin(join); } catch (CoordinationStateRejectedException e) { - logger.debug("failed to add join, ignoring", e); + logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e); } } else { coordinationState.get().handleJoin(join); // this might fail and bubble up the exception From 83d3e31631aa0c4cfeb5db2e5b972bdff37f594e Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 14:08:01 +0100 Subject: [PATCH 16/26] Track all the joins and process them at the end --- .../cluster/coordination/Coordinator.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ce95dbbbc717d..a7431f67ec6bf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -765,6 +765,11 @@ class CoordinatorPublication extends Publication { private final AckListener ackListener; private final ActionListener publishListener; + // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot + // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end. + // TODO this is unpleasant, is there a better way? + private final List receivedJoins = new ArrayList<>(); + CoordinatorPublication(PublishRequest publishRequest, ListenableFuture localNodeAckEvent, AckListener ackListener, ActionListener publishListener) { super(Coordinator.this.settings, publishRequest, @@ -818,6 +823,13 @@ boolean isActiveForCurrentLeader() { protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + receivedJoins.forEach(join -> { + if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { + logger.trace("handling {}", join); + handleJoin(join); + } + }); + localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { @@ -879,9 +891,7 @@ protected Optional handlePublishResponse(DiscoveryNode sourc @Override protected void onJoin(Join join) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - if (join.getTerm() == getCurrentTerm()) { - handleJoin(join); - } + receivedJoins.add(join); } @Override From 290b9604fdc63d2d0be027c5a6be85a27ebc6e36 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 14:15:57 +0100 Subject: [PATCH 17/26] Wait for local ack, onCompletion might not be late enough --- .../cluster/coordination/Coordinator.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a7431f67ec6bf..df7ac32e31c65 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -823,19 +823,19 @@ boolean isActiveForCurrentLeader() { protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - receivedJoins.forEach(join -> { - if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { - logger.trace("handling {}", join); - handleJoin(join); - } - }); - localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert committed; + receivedJoins.forEach(join -> { + if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { + logger.trace("handling {}", join); + handleJoin(join); + } + }); + clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState, new ClusterApplyListener() { @Override From e86e4a285372b0f28e7f24a88157ee12afdd6c52 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 16:42:02 +0100 Subject: [PATCH 18/26] Extend delay in the unresponsive leader test --- .../cluster/coordination/Coordinator.java | 3 +++ .../cluster/coordination/CoordinatorTests.java | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index df7ac32e31c65..900b7d6a96ee5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -274,6 +274,7 @@ private void startElection() { if (mode == Mode.CANDIDATE) { final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1); + logger.debug("starting election with {}", startJoinRequest); getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node)); } } @@ -807,6 +808,7 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) { assert currentPublication.get() == this; currentPublication = Optional.empty(); + logger.debug("publication ended unsuccessfully: {}", this); // check if node has not already switched modes (by bumping term) if (isActiveForCurrentLeader()) { @@ -852,6 +854,7 @@ public void onSuccess(String source) { synchronized (mutex) { assert currentPublication.get() == CoordinatorPublication.this; currentPublication = Optional.empty(); + logger.debug("publication ended successfully: {}", CoordinatorPublication.this); // trigger term bump if new term was found during publication updateMaxTermSeen(getCurrentTerm()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 1bc5a3f8ab378..c3d3d9378e5de 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -75,6 +75,7 @@ import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER; +import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING; @@ -193,9 +194,19 @@ public void testUnresponsiveLeaderDetectedEventually() { * defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING) // then wait for a follower to be promoted to leader + DEFAULT_ELECTION_DELAY - // then wait for the new leader to notice that the old leader is unresponsive - + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) - * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING) + // and the first publication times out because of the unresponsive node + + defaultMillis(PUBLISH_TIMEOUT_SETTING) + // there might be a term bump causing another election + + DEFAULT_ELECTION_DELAY + + // then wait for both of: + + Math.max( + // 1. the term bumping publication to time out + defaultMillis(PUBLISH_TIMEOUT_SETTING), + // 2. the new leader to notice that the old leader is unresponsive + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) + * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)) + // then wait for the new leader to commit a state without the old leader + DEFAULT_CLUSTER_STATE_UPDATE_DELAY, From 04b0451e10a89f7ca79ffbadb164612c670adf9e Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 16:46:50 +0100 Subject: [PATCH 19/26] Added TODO --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c3d3d9378e5de..2936d7443b36c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -188,6 +188,7 @@ public void testUnresponsiveLeaderDetectedEventually() { logger.info("--> blackholing leader {}", originalLeader); originalLeader.blackhole(); + // This stabilisation time bound is undesirably long. TODO try and reduce it. cluster.stabilise(Math.max( // first wait for all the followers to notice the leader has gone (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING)) @@ -216,6 +217,7 @@ public void testUnresponsiveLeaderDetectedEventually() { // then wait for the leader to try and commit a state removing them, causing it to stand down + DEFAULT_CLUSTER_STATE_UPDATE_DELAY )); + assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId()))); } From 5d85715bc14087fd3df30371104a5ff3b33ac9b0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 17:23:27 +0100 Subject: [PATCH 20/26] Unused imports --- .../elasticsearch/cluster/coordination/PreVoteCollector.java | 1 - .../elasticsearch/cluster/coordination/CoordinatorTests.java | 5 ----- 2 files changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index a0c6832cf85da..ec4ed534ae068 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.coordination; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.node.DiscoveryNode; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2936d7443b36c..41d8fdb207290 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.VotingConfiguration; @@ -32,10 +31,8 @@ import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -54,8 +51,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; From 71a642d409288618a3991f745452714af76561c7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 19:10:07 +0100 Subject: [PATCH 21/26] Deal with late-arriving joins --- .../cluster/coordination/Coordinator.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 900b7d6a96ee5..6833aca66455e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -770,6 +770,7 @@ class CoordinatorPublication extends Publication { // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end. // TODO this is unpleasant, is there a better way? private final List receivedJoins = new ArrayList<>(); + private boolean receivedJoinsProcessed; CoordinatorPublication(PublishRequest publishRequest, ListenableFuture localNodeAckEvent, AckListener ackListener, ActionListener publishListener) { @@ -837,6 +838,8 @@ public void onResponse(Void ignore) { handleJoin(join); } }); + assert receivedJoinsProcessed == false; + receivedJoinsProcessed = true; clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState, new ClusterApplyListener() { @@ -894,7 +897,13 @@ protected Optional handlePublishResponse(DiscoveryNode sourc @Override protected void onJoin(Join join) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - receivedJoins.add(join); + if (receivedJoinsProcessed) { + // a late response may arrive after the state has been locally applied, meaning that receivedJoins has already been + // processed, so we have to handle this late response here. + handleJoin(join); + } else { + receivedJoins.add(join); + } } @Override From 8405768b971fac2107100e1311879a732c9e8363 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 19:33:12 +0100 Subject: [PATCH 22/26] Harmonise join filtering logic --- .../cluster/coordination/Coordinator.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6833aca66455e..8c463d03c8953 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -832,12 +832,7 @@ public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert committed; - receivedJoins.forEach(join -> { - if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { - logger.trace("handling {}", join); - handleJoin(join); - } - }); + receivedJoins.forEach(CoordinatorPublication.this::handleAssociatedJoin); assert receivedJoinsProcessed == false; receivedJoinsProcessed = true; @@ -880,6 +875,13 @@ public void onFailure(Exception e) { }, EsExecutors.newDirectExecutorService()); } + private void handleAssociatedJoin(Join join) { + if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) { + logger.trace("handling {}", join); + handleJoin(join); + } + } + @Override protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; @@ -900,7 +902,7 @@ protected void onJoin(Join join) { if (receivedJoinsProcessed) { // a late response may arrive after the state has been locally applied, meaning that receivedJoins has already been // processed, so we have to handle this late response here. - handleJoin(join); + handleAssociatedJoin(join); } else { receivedJoins.add(join); } From 77bd13bc94ebd28885a5b4dbfda0ada446144a9c Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 20:09:24 +0100 Subject: [PATCH 23/26] Higher prio log messages --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 41d8fdb207290..cad61453ca300 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -612,7 +612,7 @@ void stabilise(long stabilisationDurationMillis) { void runFor(long runDurationMillis, String description) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; - logger.info("----> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); + logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) { @@ -637,7 +637,7 @@ void runFor(long runDurationMillis, String description) { deterministicTaskQueue.advanceTime(); } - logger.info("----> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description); + logger.info("--> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description); } private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { From 566623282c39e235412f952aec0cb26b1772d430 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 20:09:47 +0100 Subject: [PATCH 24/26] Add lag-fixing hack --- .../coordination/CoordinatorTests.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index cad61453ca300..c1377c7d8419b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -607,9 +607,30 @@ void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); runFor(stabilisationDurationMillis, "stabilising"); + fixLag(); assertUniqueLeaderAndExpectedModes(); } + // TODO remove this when lag detection is implemented + void fixLag() { + final ClusterNode leader = getAnyLeader(); + final long leaderVersion = leader.coordinator.getLastAcceptedState().version(); + final long minVersion = clusterNodes.stream() + .filter(n -> isConnectedPair(n, leader)) + .map(n -> n.coordinator.getLastAcceptedState().version()).min(Long::compare).orElse(Long.MIN_VALUE); + + assert minVersion >= 0; + if (minVersion < leaderVersion) { + logger.info("--> publishing a value to fix lag, leaderVersion={}, minVersion={}", leaderVersion, minVersion); + onNode(leader.getLocalNode(), () -> { + synchronized (leader.coordinator.mutex) { + leader.submitValue(randomLong()); + } + }).run(); + } + runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "re-stabilising after lag-fixing publication"); + } + void runFor(long runDurationMillis, String description) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); From dbdfcc1b955aaab7d4f879ff758dab13285250c9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 23:12:08 +0100 Subject: [PATCH 25/26] Comment fixes from review --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8c463d03c8953..a8e21da705ce7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -225,7 +225,7 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { if (sourceNode.equals(getLocalNode())) { preVoteCollector.update(getPreVoteResponse(), getLocalNode()); } else { - becomeFollower("handlePublishRequest", sourceNode); // updates preVoteCollector + becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector } return new PublishWithJoinResponse(publishResponse, @@ -768,7 +768,6 @@ class CoordinatorPublication extends Publication { // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end. - // TODO this is unpleasant, is there a better way? private final List receivedJoins = new ArrayList<>(); private boolean receivedJoinsProcessed; From 8e4b8ddc7a70472f853d745fe52f794fef6ac1a3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 5 Oct 2018 23:19:28 +0100 Subject: [PATCH 26/26] Used the wrong branch when simplifying exception logging --- .../elasticsearch/cluster/coordination/PreVoteCollector.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java index ec4ed534ae068..496c023acb6ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -154,7 +155,7 @@ public void handleResponse(PreVoteResponse response) { @Override public void handleException(TransportException exp) { - logger.debug("{} failed: {}", this, exp.getRootCause().getMessage()); + logger.debug(new ParameterizedMessage("{} failed", this), exp); } @Override