diff --git a/core/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java b/core/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java index 715fc9864855a..c707c8206010a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen2/ConsensusState.java @@ -22,57 +22,76 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen2.Messages.ApplyCommit; +import org.elasticsearch.discovery.zen2.Messages.PublishRequest; +import org.elasticsearch.discovery.zen2.Messages.PublishResponse; +import org.elasticsearch.discovery.zen2.Messages.SlotTermDiff; +import org.elasticsearch.discovery.zen2.Messages.Vote; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; +import static org.elasticsearch.discovery.zen2.Messages.NO_TERM; + +/** + * The safety core of the consensus algorithm, which ensures that any two instances + * of this object anywhere in the cluster have the same committedState if they are + * at the same slot. + * + * @param The state on which we achieve consensus. + */ public class ConsensusState extends AbstractComponent { - public static final long NO_TERM = -1L; + final Persistence persistence; // persisted state - long currentTerm; - T committedState; - Optional> acceptedState; - + private long currentTerm; + private T committedState; + private Optional> acceptedState; // transient state - ElectionState electionState; - PublishState publishState; - - final Persistence persistence; + private boolean electionWon; + private boolean electionValueForced; + private NodeCollection joinVotes; + private boolean publishPermitted; + private NodeCollection publishVotes; public ConsensusState(Settings settings, long currentTerm, T committedState, Optional> acceptedState, Persistence persistence) { - super(settings); + // TODO idea: just pass in a Persistence and let it provide the persisted state. - assert currentTerm >= 0; - assert acceptedState.isPresent() == false || acceptedState.get().getTerm() <= currentTerm; - assert acceptedState.isPresent() == false || acceptedState.get().getSlot() <= firstUncommittedSlot(); + super(settings); + this.persistence = persistence; + // persisted state this.currentTerm = currentTerm; this.committedState = committedState; this.acceptedState = acceptedState; - this.electionState = new ElectionState(); - this.publishState = new PublishState(false); - this.persistence = persistence; - } - public Vote handleStartVote(long newTerm) { - if (newTerm <= currentTerm) { - logger.trace("handleStartVote: ignored as term provided [{}] lower or equal than current term [{}]", - newTerm, currentTerm); - throw new IllegalArgumentException("incoming term " + newTerm + " lower than current term " + currentTerm); - } + // transient state + this.electionWon = false; + this.electionValueForced = false; + this.joinVotes = new NodeCollection(); + this.publishPermitted = false; + this.publishVotes = new NodeCollection(); + + assert currentTerm >= 0; + assert acceptedState.isPresent() == false || acceptedState.get().getTerm() <= currentTerm; + assert acceptedState.isPresent() == false || acceptedState.get().getSlot() <= firstUncommittedSlot(); + } - logger.trace("handleStartVote: updating term from [{}] to [{}]", currentTerm, newTerm); + public T getCommittedState() { + return committedState; + } - persistence.persistCurrentTerm(newTerm); - currentTerm = newTerm; - electionState = new ElectionState(); - publishState = new PublishState(true); + public long getCurrentTerm() { + return currentTerm; + } - return new Vote(firstUncommittedSlot(), currentTerm, lastAcceptedTermInSlot()); + public boolean isQuorumInCurrentConfiguration(NodeCollection votes) { + final HashSet intersection = new HashSet<>(committedState.getVotingNodes().nodes.keySet()); + intersection.retainAll(votes.nodes.keySet()); + return intersection.size() * 2 > committedState.getVotingNodes().nodes.size(); } public long firstUncommittedSlot() { @@ -87,14 +106,50 @@ public long lastAcceptedTermInSlot() { } } + /** + * May be safely called at any time to move this instance to a new term. It is vitally important for safety that + * the resulting Vote is sent to no more than one node. + * + * @param newTerm The new term + * @return A Vote that must be sent to at most one other node. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ + public Vote handleStartVote(long newTerm) { + if (newTerm <= currentTerm) { + logger.debug("handleStartVote: ignored as term provided [{}] lower or equal than current term [{}]", + newTerm, currentTerm); + throw new IllegalArgumentException("incoming term " + newTerm + " lower than current term " + currentTerm); + } + + logger.debug("handleStartVote: updating term from [{}] to [{}]", currentTerm, newTerm); + + persistence.persistCurrentTerm(newTerm); + currentTerm = newTerm; + joinVotes = new NodeCollection(); + electionWon = false; + electionValueForced = false; + publishPermitted = true; + publishVotes = new NodeCollection(); + + return new Vote(firstUncommittedSlot(), currentTerm, lastAcceptedTermInSlot()); + } + + /** + * May be called on receipt of a Vote from the given sourceNode. + * + * @param sourceNode The sender of the Vote received. + * @param vote The Vote received. + * @return An optional PublishRequest which, if present, can be broadcast to all peers. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public Optional> handleVote(DiscoveryNode sourceNode, Vote vote) { if (vote.getTerm() != currentTerm) { - logger.trace("handleVote: ignored vote due to term mismatch (expected: [{}], actual: [{}])", + logger.debug("handleVote: ignored vote due to term mismatch (expected: [{}], actual: [{}])", currentTerm, vote.getTerm()); throw new IllegalArgumentException("incoming term " + vote.getTerm() + " does not match current term " + currentTerm); } if (vote.getFirstUncommittedSlot() > firstUncommittedSlot()) { - logger.trace("handleVote: ignored vote due to slot mismatch (expected: <=[{}], actual: [{}])", + logger.debug("handleVote: ignored vote due to slot mismatch (expected: <=[{}], actual: [{}])", firstUncommittedSlot(), vote.getFirstUncommittedSlot()); throw new IllegalArgumentException("incoming slot " + vote.getFirstUncommittedSlot() + " higher than current slot " + firstUncommittedSlot()); @@ -102,47 +157,54 @@ public Optional> handleVote(DiscoveryNode sourceNode, Vote vot if (vote.getFirstUncommittedSlot() == firstUncommittedSlot() && vote.getLastAcceptedTerm() != NO_TERM) { final long lastAcceptedTermInSlot = lastAcceptedTermInSlot(); if (vote.getLastAcceptedTerm() > lastAcceptedTermInSlot) { - logger.trace("handleVote: ignored vote as voter has better last accepted term (expected: <=[{}], actual: [{}])", + logger.debug("handleVote: ignored vote as voter has better last accepted term (expected: <=[{}], actual: [{}])", lastAcceptedTermInSlot, vote.getLastAcceptedTerm()); throw new IllegalArgumentException("incoming last accepted term " + vote.getLastAcceptedTerm() + " higher than " + "current last accepted term " + lastAcceptedTermInSlot); } - if (vote.getLastAcceptedTerm() < lastAcceptedTermInSlot && electionState.valueForced() == false) { - logger.trace("handleVote: ignored vote as voter has worse last accepted term and election value not forced " + - "(expected: <=[{}], actual: [{}])", lastAcceptedTermInSlot, vote.getLastAcceptedTerm()); + if (vote.getLastAcceptedTerm() < lastAcceptedTermInSlot && electionValueForced == false) { + logger.debug("handleVote: ignored vote as voter has worse last accepted term and election value not forced " + + "(expected: <=[{}], actual: [{}])", lastAcceptedTermInSlot, vote.getLastAcceptedTerm()); throw new IllegalArgumentException("incoming last accepted term " + vote.getLastAcceptedTerm() + " lower than " + "current last accepted term " + lastAcceptedTermInSlot + " and election value not forced"); } - electionState.setValueForced(true); + electionValueForced = true; } - logger.trace("handleVote: adding vote {} from {} for election at slot {}", vote, sourceNode, firstUncommittedSlot()); - electionState.add(sourceNode); + logger.debug("handleVote: adding vote {} from [{}] for election at slot {}", vote, sourceNode.getId(), firstUncommittedSlot()); + joinVotes.add(sourceNode); - if (electionState.maybeSetElectionWon(committedState.getVotingNodes())) { - logger.trace("handleVote: election won"); + electionWon = isQuorumInCurrentConfiguration(joinVotes); - if (electionState.valueForced()) { - logger.trace("handleVote: value forced"); + logger.debug("handleVote: electionWon={} publishPermitted={} electionValueForce={}", + electionWon, publishPermitted, electionValueForced); + if (electionWon && publishPermitted && electionValueForced) { + logger.debug("handleVote: sending PublishRequest"); - publishState.disablePublishing(); - assert acceptedState.isPresent(); // must be true because electionState.valueForced(); - return Optional.of(new PublishRequest<>(firstUncommittedSlot(), currentTerm, acceptedState.get().getDiff())); - } + publishPermitted = false; + assert acceptedState.isPresent(); // must be true because electionValueForced == true + return Optional.of(new PublishRequest<>(firstUncommittedSlot(), currentTerm, acceptedState.get().getDiff())); } return Optional.empty(); } + /** + * May be called on receipt of a PublishRequest. + * + * @param publishRequest The PublishRequest received. + * @return A PublishResponse which can be sent back to the sender of the PublishRequest. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { if (publishRequest.getTerm() != currentTerm) { - logger.trace("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])", + logger.debug("handlePublishRequest: ignored publish request due to term mismatch (expected: [{}], actual: [{}])", currentTerm, publishRequest.getTerm()); throw new IllegalArgumentException("incoming term " + publishRequest.getTerm() + " does not match current term " + currentTerm); } if (publishRequest.getSlot() != firstUncommittedSlot()) { - logger.trace("handlePublishRequest: ignored publish request due to slot mismatch (expected: [{}], actual: [{}])", + logger.debug("handlePublishRequest: ignored publish request due to slot mismatch (expected: [{}], actual: [{}])", firstUncommittedSlot(), publishRequest.getSlot()); throw new IllegalArgumentException("incoming slot " + publishRequest.getSlot() + " does not match current slot " + firstUncommittedSlot()); @@ -156,24 +218,37 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { return new PublishResponse(publishRequest.getSlot(), publishRequest.getTerm()); } + /** + * May be called on receipt of a PublishResponse from the given sourceNode. + * + * @param sourceNode The sender of the PublishResponse received. + * @param publishResponse The PublishResponse received. + * @return An optional ApplyCommit which, if present, may be broadcast to all peers, indicating that this publication + * has been accepted at a quorum of peers and is therefore committed. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public Optional handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) { if (publishResponse.getTerm() != currentTerm) { - logger.trace("handlePublishResponse: ignored publish response due to term mismatch (expected: [{}], actual: [{}])", + logger.debug("handlePublishResponse: ignored publish response due to term mismatch (expected: [{}], actual: [{}])", currentTerm, publishResponse.getTerm()); throw new IllegalArgumentException("incoming term " + publishResponse.getTerm() + " does not match current term " + currentTerm); } if (publishResponse.getSlot() != firstUncommittedSlot()) { - logger.trace("handlePublishResponse: ignored publish response due to slot mismatch (expected: [{}], actual: [{}])", - firstUncommittedSlot(), publishResponse.getSlot()); + if (publishResponse.getSlot() == firstUncommittedSlot() - 1) { + logger.trace("handlePublishResponse: ignored publish response for just-committed slot [{}]", publishResponse.getSlot()); + } else { + logger.debug("handlePublishResponse: ignored publish response due to slot mismatch (expected: [{}], actual: [{}])", + firstUncommittedSlot(), publishResponse.getSlot()); + } throw new IllegalArgumentException("incoming slot " + publishResponse.getSlot() + " does not match current slot " + firstUncommittedSlot()); } - logger.trace("handlePublishResponse: accepted publish response for slot [{}] and term [{}]", - publishResponse.getSlot(), publishResponse.getTerm()); - publishState.add(sourceNode); - if (committedState.getVotingNodes().isQuorum(publishState)) { + logger.trace("handlePublishResponse: accepted publish response for slot [{}] and term [{}] from [{}]", + publishResponse.getSlot(), publishResponse.getTerm(), sourceNode.getId()); + publishVotes.add(sourceNode); + if (isQuorumInCurrentConfiguration(publishVotes)) { logger.trace("handlePublishResponse: value committed for slot [{}] and term [{}]", firstUncommittedSlot(), currentTerm); return Optional.of(new ApplyCommit(publishResponse.getSlot(), publishResponse.getTerm())); @@ -182,345 +257,132 @@ public Optional handlePublishResponse(DiscoveryNode sourceNode, Pub return Optional.empty(); } + /** + * May be called on receipt of an ApplyCommit. Updates the committed state accordingly. + * + * @param applyCommit The ApplyCommit received. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public void handleCommit(ApplyCommit applyCommit) { if (applyCommit.getTerm() != lastAcceptedTermInSlot()) { - logger.trace("handleCommitRequest: ignored commit request due to term mismatch (expected: [{}], actual: [{}])", + logger.debug("handleCommit: ignored commit request due to term mismatch (expected: [{}], actual: [{}])", lastAcceptedTermInSlot(), applyCommit.getTerm()); throw new IllegalArgumentException("incoming term " + applyCommit.getTerm() + " does not match last accepted term " + lastAcceptedTermInSlot()); } if (applyCommit.getSlot() != firstUncommittedSlot()) { - logger.trace("handleCommitRequest: ignored commit request due to slot mismatch (expected: [{}], actual: [{}])", + logger.debug("handleCommit: ignored commit request due to slot mismatch (expected: [{}], actual: [{}])", firstUncommittedSlot(), applyCommit.getSlot()); throw new IllegalArgumentException("incoming slot " + applyCommit.getSlot() + " does not match current slot " + firstUncommittedSlot()); } - logger.trace("handleCommitRequest: applying commit request for slot [{}]", - applyCommit.getSlot()); + logger.trace("handleCommit: applying commit request for slot [{}]", applyCommit.getSlot()); assert acceptedState.isPresent(); assert acceptedState.get().getSlot() == committedState.getSlot() + 1; final T newCommittedState = acceptedState.get().getDiff().apply(committedState); + logger.trace("handleCommit: newCommittedState = [{}]", newCommittedState); assert newCommittedState.getSlot() == committedState.getSlot() + 1; persistence.persistCommittedState(newCommittedState); committedState = newCommittedState; - electionState.setValueForced(false); - publishState = new PublishState(true); + publishPermitted = true; + electionValueForced = false; + publishVotes = new NodeCollection(); } + /** + * May be safely called at any time. Yields the current committed state to be sent to another, lagging, peer so it can catch up. + * + * @return The current committed state. + */ public T generateCatchup() { - logger.trace("generateCatchup: generating catch up for slot [{}]", committedState.getSlot()); + logger.debug("generateCatchup: generating catch up for slot [{}]", committedState.getSlot()); return committedState; } + /** + * May be called on receipt of a catch-up message containing the current committed state from a peer. + * + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public void applyCatchup(T newCommittedState) { if (newCommittedState.getSlot() <= committedState.getSlot()) { - logger.trace("applyCatchup: ignored catch up request due to slot mismatch (expected: >[{}], actual: [{}])", + logger.debug("applyCatchup: ignored catch up request due to slot mismatch (expected: >[{}], actual: [{}])", committedState.getSlot(), newCommittedState.getSlot()); throw new IllegalArgumentException("incoming slot " + newCommittedState.getSlot() + " higher than current slot " + committedState.getSlot()); } - logger.trace("applyCatchup: applying catch up for slot [{}]", newCommittedState.getSlot()); + logger.debug("applyCatchup: applying catch up for slot [{}]", newCommittedState.getSlot()); persistence.persistCommittedState(newCommittedState); committedState = newCommittedState; - electionState = new ElectionState(); - publishState = new PublishState(false); + electionValueForced = false; + joinVotes = new NodeCollection(); + electionWon = false; + publishVotes = new NodeCollection(); + publishPermitted = false; } + /** + * May be safely called at any time in order to apply the given diff to the replicated state machine. + * + * @param diff The RSM transition on which to achieve consensus. + * @return A PublishRequest that may be broadcast to all peers. + * @throws IllegalArgumentException if the arguments were incompatible with the current state of this object. + */ public PublishRequest handleClientValue(Diff diff) { - if (electionState.electionWon() == false) { - logger.trace("handleClientValue: ignored request as election not won"); + if (electionWon == false) { + logger.debug("handleClientValue: ignored request as election not won"); throw new IllegalArgumentException("election not won"); } - if (publishState.publishPermitted() == false) { - logger.trace("handleClientValue: ignored request as publishing is not permitted"); + if (publishPermitted == false) { + logger.debug("handleClientValue: ignored request as publishing is not permitted"); throw new IllegalArgumentException("publishing not permitted"); } - if (electionState.valueForced()) { - logger.trace("handleClientValue: ignored request as election value is forced"); + if (electionValueForced) { + logger.debug("handleClientValue: ignored request as election value is forced"); throw new IllegalArgumentException("election value forced"); } - logger.trace("handleClientValue: processing request for slot [{}]", firstUncommittedSlot()); - publishState.disablePublishing(); + logger.trace("handleClientValue: processing request for slot [{}] and term [{}]", firstUncommittedSlot(), currentTerm); + publishPermitted = false; return new PublishRequest<>(firstUncommittedSlot(), currentTerm, diff); } - public static class NodeCollection { - - protected final Map nodes = new HashMap<>(); - - public void add(DiscoveryNode sourceNode) { - nodes.put(sourceNode.getId(), sourceNode); - } - - public boolean isQuorum(NodeCollection votes) { - final HashSet intersection = new HashSet(nodes.keySet()); - intersection.retainAll(votes.nodes.keySet()); - return intersection.size() * 2 > nodes.size(); - } - } - - public static class ElectionState extends NodeCollection { - private boolean electionWon; - private boolean electionValueForced; - - public ElectionState() { - electionWon = false; - electionValueForced = false; - } - - public boolean valueForced() { - return electionValueForced; - } - - public void setValueForced(boolean forced) { - electionValueForced = forced; - } - - public boolean electionWon() { - return electionWon; - } - - public boolean maybeSetElectionWon(NodeCollection config) { - if (electionWon() == false && config.isQuorum(this)) { - this.electionWon = true; - return true; - } - return false; - } - - @Override - public String toString() { - return "ElectionState{" + - "nodes=" + nodes + - ", electionWon=" + electionWon + - ", electionValueForced=" + electionValueForced + - '}'; - } - } - - public static class PublishState extends NodeCollection { - private boolean publishPermitted; - - public PublishState(boolean publishPermitted) { - this.publishPermitted = publishPermitted; - } - - public void disablePublishing() { - publishPermitted = true; - } - - public boolean publishPermitted() { - return publishPermitted; - } - - @Override - public String toString() { - return "PublishState{" + - "nodes=" + nodes + - ", publishPermitted=" + publishPermitted + - '}'; - } - } - - public static class Vote { - private final long firstUncommittedSlot; - private final long term; - private final long lastAcceptedTerm; - - public Vote(long firstUncommittedSlot, long term, long lastAcceptedTerm) { - assert firstUncommittedSlot >= 0; - assert term >= 0; - assert lastAcceptedTerm == NO_TERM || lastAcceptedTerm >= 0; - - this.firstUncommittedSlot = firstUncommittedSlot; - this.term = term; - this.lastAcceptedTerm = lastAcceptedTerm; - } - - public long getFirstUncommittedSlot() { - return firstUncommittedSlot; - } - - public long getTerm() { - return term; - } - - public long getLastAcceptedTerm() { - return lastAcceptedTerm; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Vote vote = (Vote) o; - - if (firstUncommittedSlot != vote.firstUncommittedSlot) return false; - if (term != vote.term) return false; - return lastAcceptedTerm == vote.lastAcceptedTerm; - } - - @Override - public int hashCode() { - int result = (int) (firstUncommittedSlot ^ (firstUncommittedSlot >>> 32)); - result = 31 * result + (int) (term ^ (term >>> 32)); - result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32)); - return result; - } - - @Override - public String toString() { - return "Vote{" + - "firstUncommittedSlot=" + firstUncommittedSlot + - ", term=" + term + - ", lastAcceptedTerm=" + lastAcceptedTerm + - '}'; - } - } - - public static class SlotTerm { - protected final long slot; - protected final long term; - - public SlotTerm(long slot, long term) { - assert slot >= 0; - assert term >= 0; - - this.slot = slot; - this.term = term; - } - - public long getSlot() { - return slot; - } - - public long getTerm() { - return term; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SlotTerm slotTerm = (SlotTerm) o; - - if (slot != slotTerm.slot) return false; - return term == slotTerm.term; - } - - @Override - public int hashCode() { - int result = (int) (slot ^ (slot >>> 32)); - result = 31 * result + (int) (term ^ (term >>> 32)); - return result; - } - - @Override - public String toString() { - return "SlotTerm{" + - "slot=" + slot + - ", term=" + term + - '}'; - } - } - - public static class PublishResponse extends SlotTerm { - - public PublishResponse(long slot, long term) { - super(slot, term); - } + /** + * An immutable object representing the current committed state. + */ + public interface CommittedState { + long getSlot(); - @Override - public String toString() { - return "PublishResponse{" + - "slot=" + slot + - ", term=" + term + - '}'; - } + NodeCollection getVotingNodes(); } - public static class ApplyCommit extends SlotTerm { + public interface Persistence { + void persistCurrentTerm(long currentTerm); - public ApplyCommit(long slot, long term) { - super(slot, term); - } + void persistCommittedState(T committedState); - @Override - public String toString() { - return "ApplyCommit{" + - "slot=" + slot + - ", term=" + term + - '}'; - } + void persistAcceptedState(SlotTermDiff slotTermDiff); } - public static class SlotTermDiff extends SlotTerm { - protected final Diff diff; - - public SlotTermDiff(long slot, long term, Diff diff) { - super(slot, term); - this.diff = diff; - } - - public Diff getDiff() { - return diff; - } - - @Override - public boolean equals(Object o) { - if (super.equals(o) == false) { return false; } - SlotTermDiff that = (SlotTermDiff) o; - return diff != null ? diff.equals(that.diff) : that.diff == null; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (diff != null ? diff.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "SlotTermDiff{" + - "slot=" + slot + - ", term=" + term + - ", diff=" + diff + - '}'; - } - } + /** + * A collection of nodes, used to calculate quorums. + */ + public static class NodeCollection { - public static class PublishRequest extends SlotTermDiff { + protected final Map nodes = new HashMap<>(); - public PublishRequest(long slot, long term, Diff diff) { - super(slot, term, diff); + public void add(DiscoveryNode sourceNode) { + // TODO is getId() unique enough or is it user-provided? If the latter, there's a risk of duplicates or of losing votes. + nodes.put(sourceNode.getId(), sourceNode); } @Override public String toString() { - return "PublishRequest{" + - "slot=" + slot + - ", term=" + term + - ", diff=" + diff + - '}'; + return "NodeCollection{" + String.join(",", nodes.keySet()) + "}"; } } - - public interface CommittedState { - long getSlot(); - NodeCollection getVotingNodes(); - } - - public interface Persistence { - void persistCurrentTerm(long currentTerm); - void persistCommittedState(T committedState); - void persistAcceptedState(SlotTermDiff slotTermDiff); - } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen2/Messages.java b/core/src/main/java/org/elasticsearch/discovery/zen2/Messages.java new file mode 100644 index 0000000000000..773af955d3523 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/zen2/Messages.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen2; + +import org.elasticsearch.cluster.Diff; + +public class Messages { + + public static final long NO_TERM = -1L; + + public static class Vote { + private final long firstUncommittedSlot; + private final long term; + private final long lastAcceptedTerm; + + public Vote(long firstUncommittedSlot, long term, long lastAcceptedTerm) { + assert firstUncommittedSlot >= 0; + assert term >= 0; + assert lastAcceptedTerm == NO_TERM || lastAcceptedTerm >= 0; + + this.firstUncommittedSlot = firstUncommittedSlot; + this.term = term; + this.lastAcceptedTerm = lastAcceptedTerm; + } + + public long getFirstUncommittedSlot() { + return firstUncommittedSlot; + } + + public long getTerm() { + return term; + } + + public long getLastAcceptedTerm() { + return lastAcceptedTerm; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Vote vote = (Vote) o; + + if (firstUncommittedSlot != vote.firstUncommittedSlot) return false; + if (term != vote.term) return false; + return lastAcceptedTerm == vote.lastAcceptedTerm; + } + + @Override + public int hashCode() { + int result = (int) (firstUncommittedSlot ^ (firstUncommittedSlot >>> 32)); + result = 31 * result + (int) (term ^ (term >>> 32)); + result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32)); + return result; + } + + @Override + public String toString() { + return "Vote{" + + "firstUncommittedSlot=" + firstUncommittedSlot + + ", term=" + term + + ", lastAcceptedTerm=" + lastAcceptedTerm + + '}'; + } + } + + public abstract static class SlotTerm { + protected final long slot; + protected final long term; + + public SlotTerm(long slot, long term) { + assert slot >= 0; + assert term >= 0; + + this.slot = slot; + this.term = term; + } + + public long getSlot() { + return slot; + } + + public long getTerm() { + return term; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SlotTerm slotTerm = (SlotTerm) o; + + if (slot != slotTerm.slot) return false; + return term == slotTerm.term; + } + + @Override + public int hashCode() { + int result = (int) (slot ^ (slot >>> 32)); + result = 31 * result + (int) (term ^ (term >>> 32)); + return result; + } + + @Override + public String toString() { + return "SlotTerm{" + + "slot=" + slot + + ", term=" + term + + '}'; + } + } + + public static class PublishResponse extends SlotTerm { + + public PublishResponse(long slot, long term) { + super(slot, term); + } + + @Override + public String toString() { + return "PublishResponse{" + + "slot=" + slot + + ", term=" + term + + '}'; + } + } + + public static class ApplyCommit extends SlotTerm { + + public ApplyCommit(long slot, long term) { + super(slot, term); + } + + @Override + public String toString() { + return "ApplyCommit{" + + "slot=" + slot + + ", term=" + term + + '}'; + } + } + + public abstract static class SlotTermDiff extends SlotTerm { + protected final Diff diff; + + public SlotTermDiff(long slot, long term, Diff diff) { + super(slot, term); + this.diff = diff; + } + + public Diff getDiff() { + return diff; + } + + @Override + public boolean equals(Object o) { + if (super.equals(o) == false) { + return false; + } + SlotTermDiff that = (SlotTermDiff) o; + return diff != null ? diff.equals(that.diff) : that.diff == null; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (diff != null ? diff.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "SlotTermDiff{" + + "slot=" + slot + + ", term=" + term + + ", diff=" + diff + + '}'; + } + } + + public static class PublishRequest extends SlotTermDiff { + + public PublishRequest(long slot, long term, Diff diff) { + super(slot, term, diff); + } + + @Override + public String toString() { + return "PublishRequest{" + + "slot=" + slot + + ", term=" + term + + ", diff=" + diff + + '}'; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/discovery/zen2/ConsensusStateTests.java b/core/src/test/java/org/elasticsearch/discovery/zen2/ConsensusStateTests.java index 92351c0b58210..704d8963142e3 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen2/ConsensusStateTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen2/ConsensusStateTests.java @@ -23,12 +23,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.discovery.zen2.ConsensusState.ApplyCommit; +import org.elasticsearch.discovery.zen2.Messages.ApplyCommit; import org.elasticsearch.discovery.zen2.ConsensusState.CommittedState; import org.elasticsearch.discovery.zen2.ConsensusState.NodeCollection; -import org.elasticsearch.discovery.zen2.ConsensusState.PublishRequest; -import org.elasticsearch.discovery.zen2.ConsensusState.PublishResponse; -import org.elasticsearch.discovery.zen2.ConsensusState.Vote; +import org.elasticsearch.discovery.zen2.Messages.PublishRequest; +import org.elasticsearch.discovery.zen2.Messages.PublishResponse; +import org.elasticsearch.discovery.zen2.Messages.Vote; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -38,6 +38,7 @@ import static org.hamcrest.Matchers.equalTo; + public class ConsensusStateTests extends ESTestCase { public ConsensusState createInitialState(ClusterState initialClusterState) { @@ -54,7 +55,7 @@ public void persistCommittedState(ClusterState committedState) { } @Override - public void persistAcceptedState(ConsensusState.SlotTermDiff slotTermDiff) { + public void persistAcceptedState(Messages.SlotTermDiff slotTermDiff) { } }); @@ -72,13 +73,13 @@ public void testSimpleScenario() { ConsensusState n2 = createInitialState(initialClusterState); ConsensusState n3 = createInitialState(initialClusterState); - assertThat(n1.currentTerm, equalTo(0L)); + assertThat(n1.getCurrentTerm(), equalTo(0L)); Vote v1 = n1.handleStartVote(1); - assertThat(n1.currentTerm, equalTo(1L)); + assertThat(n1.getCurrentTerm(), equalTo(1L)); - assertThat(n2.currentTerm, equalTo(0L)); + assertThat(n2.getCurrentTerm(), equalTo(0L)); Vote v2 = n2.handleStartVote(1); - assertThat(n2.currentTerm, equalTo(1L)); + assertThat(n2.getCurrentTerm(), equalTo(1L)); Optional> invalidVote = n1.handleVote(node2, v2); assertFalse(invalidVote.isPresent()); @@ -107,15 +108,15 @@ public void testSimpleScenario() { assertThat(n2.firstUncommittedSlot(), equalTo(0L)); assertThat(n3.firstUncommittedSlot(), equalTo(0L)); - assertThat(n3.committedState.value, equalTo(42)); + assertThat(n3.getCommittedState().value, equalTo(42)); n3.handleCommit(n1Commit.get()); assertThat(n3.firstUncommittedSlot(), equalTo(1L)); - assertThat(n3.committedState.value, equalTo(5)); + assertThat(n3.getCommittedState().value, equalTo(5)); ClusterState n3ClusterState = n3.generateCatchup(); n2.applyCatchup(n3ClusterState); assertThat(n2.firstUncommittedSlot(), equalTo(1L)); - assertThat(n2.committedState.value, equalTo(5)); + assertThat(n2.getCommittedState().value, equalTo(5)); } static class ClusterState implements CommittedState { @@ -139,9 +140,18 @@ public long getSlot() { public NodeCollection getVotingNodes() { return config; } + + public int getValue() { + return value; + } + + @Override + public String toString() { + return "ClusterState {slot=" + slot + ", value=" + value + ", config=" + config + "}"; + } } - public Diff createUpdate(Function update) { + public static Diff createUpdate(Function update) { return new Diff() { @Override