Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9129532
Add assertions about the preVoteCollector's consistency
DaveCTurner Oct 5, 2018
33113de
Extract variable
DaveCTurner Oct 5, 2018
26be291
Log all exceptions the same
DaveCTurner Oct 5, 2018
4642e24
Update max term seen
DaveCTurner Oct 5, 2018
22130dc
Remove term-bump workaround
DaveCTurner Oct 5, 2018
3ba1a6d
Bump term on discovery of the need to do so
DaveCTurner Oct 5, 2018
4a70f62
Reinstate assertion that every connected node has voted for the leader
DaveCTurner Oct 5, 2018
9cea60f
Make fields private
DaveCTurner Oct 5, 2018
5acffc9
Generate DiscoveryNodes deterministically
DaveCTurner Oct 5, 2018
e11e929
Private
DaveCTurner Oct 5, 2018
2c98dd7
Handle publish requests without attached joins
DaveCTurner Oct 5, 2018
7569c4f
Fix missing update to preVoteCollector
DaveCTurner Oct 5, 2018
9108380
TODO is done
DaveCTurner Oct 5, 2018
af8916a
Trace join handling
DaveCTurner Oct 5, 2018
b2675ae
Include join in message
DaveCTurner Oct 5, 2018
83d3e31
Track all the joins and process them at the end
DaveCTurner Oct 5, 2018
290b960
Wait for local ack, onCompletion might not be late enough
DaveCTurner Oct 5, 2018
e86e4a2
Extend delay in the unresponsive leader test
DaveCTurner Oct 5, 2018
04b0451
Added TODO
DaveCTurner Oct 5, 2018
5d85715
Unused imports
DaveCTurner Oct 5, 2018
71a642d
Deal with late-arriving joins
DaveCTurner Oct 5, 2018
8405768
Harmonise join filtering logic
DaveCTurner Oct 5, 2018
77bd13b
Higher prio log messages
DaveCTurner Oct 5, 2018
5666232
Add lag-fixing hack
DaveCTurner Oct 5, 2018
dbdfcc1
Comment fixes from review
DaveCTurner Oct 5, 2018
8e4b8dd
Used the wrong branch when simplifying exception logging
DaveCTurner Oct 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

if (sourceNode.equals(getLocalNode()) == false) {
becomeFollower("handlePublishRequest", sourceNode);
if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
}

return new PublishWithJoinResponse(publishResponse,
Expand Down Expand Up @@ -254,27 +256,31 @@ private void closePrevotingAndElectionScheduler() {
}

private void updateMaxTermSeen(final long term) {
maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
// TODO if we are leader here, and there is no publication in flight, then we should bump our term
// (if we are leader and there _is_ a publication in flight then doing so would cancel the publication, so don't do that, but
// do check for this after the publication completes)
final long updatedMaxTermSeen = maxTermSeen.updateAndGet(oldMaxTerm -> Math.max(oldMaxTerm, term));
synchronized (mutex) {
if (mode == Mode.LEADER && publicationInProgress() == false && updatedMaxTermSeen > getCurrentTerm()) {
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too.
ensureTermAtLeast(getLocalNode(), updatedMaxTermSeen);
startElection();
}
}
}

// TODO: make private again after removing term-bump workaround
void startElection() {
private void startElection() {
synchronized (mutex) {
// The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
// to check our mode again here.
if (mode == Mode.CANDIDATE) {
final StartJoinRequest startJoinRequest
= new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen.get()) + 1);
logger.debug("starting election with {}", startJoinRequest);
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
}
}
}

// TODO: make private again after removing term-bump workaround
Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (getCurrentTerm() < targetTerm) {
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
Expand All @@ -289,9 +295,10 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm"); // updates followersChecker
becomeCandidate("joinLeaderInTerm"); // updates followersChecker and preVoteCollector
} else {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
preVoteCollector.update(getPreVoteResponse(), null);
}
return join;
}
Expand Down Expand Up @@ -485,6 +492,8 @@ public void invariant() {
assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode());
assert preVoteCollector.getLeader() == getLocalNode() : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;

final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false);
if (becomingMaster && activePublication == false) {
Expand Down Expand Up @@ -517,6 +526,8 @@ public void invariant() {
assert leaderCheckScheduler != null;
assert followersChecker.getKnownFollowers().isEmpty();
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader().equals(lastKnownLeader.get()) : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
Expand All @@ -528,6 +539,8 @@ public void invariant() {
assert followersChecker.getKnownFollowers().isEmpty();
assert applierState.nodes().getMasterNodeId() == null;
assert currentPublication.map(Publication::isCommitted).orElse(true);
assert preVoteCollector.getLeader() == null : preVoteCollector;
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse()) : preVoteCollector;
}
}
}
Expand All @@ -537,7 +550,7 @@ boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
}

void handleJoin(Join join) {
private void handleJoin(Join join) {
synchronized (mutex) {
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);

Expand All @@ -547,7 +560,7 @@ void handleJoin(Join join) {
try {
coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug("failed to add join, ignoring", e);
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
}
} else {
coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
Expand Down Expand Up @@ -753,6 +766,11 @@ class CoordinatorPublication extends Publication {
private final AckListener ackListener;
private final ActionListener<Void> publishListener;

// We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
// safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
private final List<Join> receivedJoins = new ArrayList<>();
private boolean receivedJoinsProcessed;

CoordinatorPublication(PublishRequest publishRequest, ListenableFuture<Void> localNodeAckEvent, AckListener ackListener,
ActionListener<Void> publishListener) {
super(Coordinator.this.settings, publishRequest,
Expand Down Expand Up @@ -790,6 +808,7 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) {

assert currentPublication.get() == this;
currentPublication = Optional.empty();
logger.debug("publication ended unsuccessfully: {}", this);

// check if node has not already switched modes (by bumping term)
if (isActiveForCurrentLeader()) {
Expand All @@ -812,6 +831,10 @@ public void onResponse(Void ignore) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert committed;

receivedJoins.forEach(CoordinatorPublication.this::handleAssociatedJoin);
assert receivedJoinsProcessed == false;
receivedJoinsProcessed = true;

clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState,
new ClusterApplyListener() {
@Override
Expand All @@ -828,6 +851,7 @@ public void onSuccess(String source) {
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
currentPublication = Optional.empty();
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());
}
Expand All @@ -850,6 +874,13 @@ public void onFailure(Exception e) {
}, EsExecutors.newDirectExecutorService());
}

private void handleAssociatedJoin(Join join) {
if (join.getTerm() == getCurrentTerm() && hasJoinVoteFrom(join.getSourceNode()) == false) {
logger.trace("handling {}", join);
handleJoin(join);
}
}

@Override
protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
Expand All @@ -867,10 +898,26 @@ protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourc
@Override
protected void onJoin(Join join) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (join.getTerm() == getCurrentTerm()) {
handleJoin(join);
if (receivedJoinsProcessed) {
// a late response may arrive after the state has been locally applied, meaning that receivedJoins has already been
// processed, so we have to handle this late response here.
handleAssociatedJoin(join);
} else {
receivedJoins.add(join);
}
}

@Override
protected void onMissingJoin(DiscoveryNode discoveryNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
if (hasJoinVoteFrom(discoveryNode) == false) {
final long term = publishRequest.getAcceptedState().term();
logger.debug("onMissingJoin: no join vote from {}, bumping term to exceed {}", discoveryNode, term);
updateMaxTermSeen(term + 1);
}
// TODO: what to do on missing join?
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,33 @@ public Releasable start(final ClusterState clusterState, final Iterable<Discover
return preVotingRound;
}

// only for testing
PreVoteResponse getPreVoteResponse() {
return state.v2();
}

// only for testing
@Nullable
DiscoveryNode getLeader() {
return state.v1();
}

public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}

private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// TODO if we are a leader and the max term seen exceeds our term then we need to bump our term
updateMaxTermSeen.accept(request.getCurrentTerm());

Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
assert state != null : "received pre-vote request before fully initialised";

final DiscoveryNode leader = state.v1();
final PreVoteResponse response = state.v2();

if (leader == null) {
return state.v2();
return response;
}

if (leader.equals(request.getSourceNode())) {
Expand All @@ -100,7 +114,7 @@ private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
// leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
// to also detect its failure.
return state.v2();
return response;
}

throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
Expand Down Expand Up @@ -141,11 +155,7 @@ public void handleResponse(PreVoteResponse response) {

@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("{} failed: {}", this, exp.getRootCause().getMessage());
} else {
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ private void onPossibleCommitFailure() {

protected abstract void onJoin(Join join);

protected abstract void onMissingJoin(DiscoveryNode discoveryNode);

protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
ActionListener<PublishWithJoinResponse> responseActionListener);

Expand Down Expand Up @@ -301,10 +303,16 @@ public void onResponse(PublishWithJoinResponse response) {
return;
}

response.getJoin().ifPresent(join -> {
if (response.getJoin().isPresent()) {
final Join join = response.getJoin().get();
assert discoveryNode.equals(join.getSourceNode());
assert join.getTerm() == response.getPublishResponse().getTerm() : response;
logger.trace("handling join within publish response: {}", join);
onJoin(join);
});
} else {
logger.trace("publish response from {} contained no join", discoveryNode);
onMissingJoin(discoveryNode);
}

assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
state = PublicationTargetState.WAITING_FOR_QUORUM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
*/
public class PublishResponse implements Writeable {

protected final long term;
protected final long version;
private final long term;
private final long version;

public PublishResponse(long term, long version) {
assert term >= 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -37,6 +39,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -82,7 +85,11 @@ public void setupNodes() {
}

public static DiscoveryNode createNode(String id) {
return new DiscoveryNode(id, buildNewFakeTransportAddress(), Version.CURRENT);
final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", id,
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
EnumSet.allOf(Role.class), Version.CURRENT);
}

public void testSetInitialState() {
Expand Down
Loading