Skip to content

Commit a0d32f5

Browse files
authored
Zen2: Add leader-side join handling logic (#33013)
Adds the logic for handling joins by a prospective leader. Introduces the Coordinator class with the basic lifecycle modes (candidate, leader, follower) as well as a JoinHelper class that contains most of the plumbing for handling joins.
1 parent e4ef127 commit a0d32f5

File tree

8 files changed

+1035
-3
lines changed

8 files changed

+1035
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ public boolean handleJoin(Join join) {
227227
boolean added = joinVotes.addVote(join.getSourceNode());
228228
boolean prevElectionWon = electionWon;
229229
electionWon = isElectionQuorum(joinVotes);
230+
assert !prevElectionWon || electionWon; // we cannot go from won to not won
230231
logger.debug("handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}", join,
231232
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());
232233

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.apache.lucene.util.SetOnce;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
24+
import org.elasticsearch.cluster.service.MasterService;
25+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.transport.TransportService;
28+
29+
import java.util.Optional;
30+
import java.util.function.Supplier;
31+
32+
public class Coordinator extends AbstractLifecycleComponent {
33+
34+
private final TransportService transportService;
35+
private final JoinHelper joinHelper;
36+
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
37+
// TODO: the following two fields are package-private as some tests require access to them
38+
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
39+
final Object mutex = new Object();
40+
final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
41+
42+
private Mode mode;
43+
private Optional<DiscoveryNode> lastKnownLeader;
44+
private Optional<Join> lastJoin;
45+
private JoinHelper.JoinAccumulator joinAccumulator;
46+
47+
public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
48+
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier) {
49+
super(settings);
50+
this.transportService = transportService;
51+
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
52+
this::getCurrentTerm, this::handleJoinRequest);
53+
this.persistedStateSupplier = persistedStateSupplier;
54+
this.lastKnownLeader = Optional.empty();
55+
this.lastJoin = Optional.empty();
56+
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
57+
}
58+
59+
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
60+
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
61+
if (getCurrentTerm() < targetTerm) {
62+
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
63+
}
64+
return Optional.empty();
65+
}
66+
67+
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
68+
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
69+
logger.debug("joinLeaderInTerm: from [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
70+
Join join = coordinationState.get().handleStartJoin(startJoinRequest);
71+
lastJoin = Optional.of(join);
72+
if (mode != Mode.CANDIDATE) {
73+
becomeCandidate("joinLeaderInTerm");
74+
}
75+
return join;
76+
}
77+
78+
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
79+
assert Thread.holdsLock(mutex) == false;
80+
logger.trace("handleJoin: as {}, handling {}", mode, joinRequest);
81+
transportService.connectToNode(joinRequest.getSourceNode());
82+
83+
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
84+
synchronized (mutex) {
85+
final CoordinationState coordState = coordinationState.get();
86+
final boolean prevElectionWon = coordState.electionWon();
87+
88+
if (optionalJoin.isPresent()) {
89+
Join join = optionalJoin.get();
90+
// if someone thinks we should be master, let's add our vote and try to become one
91+
// note that the following line should never throw an exception
92+
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(coordState::handleJoin);
93+
94+
if (coordState.electionWon()) {
95+
// if we have already won the election then the actual join does not matter for election purposes,
96+
// so swallow any exception
97+
try {
98+
coordState.handleJoin(join);
99+
} catch (CoordinationStateRejectedException e) {
100+
logger.trace("failed to add join, ignoring", e);
101+
}
102+
} else {
103+
coordState.handleJoin(join); // this might fail and bubble up the exception
104+
}
105+
}
106+
107+
joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinCallback);
108+
109+
if (prevElectionWon == false && coordState.electionWon()) {
110+
becomeLeader("handleJoin");
111+
}
112+
}
113+
}
114+
115+
void becomeCandidate(String method) {
116+
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
117+
logger.debug("{}: becoming CANDIDATE (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
118+
119+
if (mode != Mode.CANDIDATE) {
120+
mode = Mode.CANDIDATE;
121+
joinAccumulator.close(mode);
122+
joinAccumulator = joinHelper.new CandidateJoinAccumulator();
123+
}
124+
}
125+
126+
void becomeLeader(String method) {
127+
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
128+
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
129+
logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
130+
131+
mode = Mode.LEADER;
132+
lastKnownLeader = Optional.of(getLocalNode());
133+
joinAccumulator.close(mode);
134+
joinAccumulator = joinHelper.new LeaderJoinAccumulator();
135+
}
136+
137+
void becomeFollower(String method, DiscoveryNode leaderNode) {
138+
assert Thread.holdsLock(mutex) : "Legislator mutex not held";
139+
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);
140+
141+
if (mode != Mode.FOLLOWER) {
142+
mode = Mode.FOLLOWER;
143+
joinAccumulator.close(mode);
144+
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
145+
}
146+
147+
lastKnownLeader = Optional.of(leaderNode);
148+
}
149+
150+
// package-visible for testing
151+
long getCurrentTerm() {
152+
synchronized (mutex) {
153+
return coordinationState.get().getCurrentTerm();
154+
}
155+
}
156+
157+
// package-visible for testing
158+
Mode getMode() {
159+
synchronized (mutex) {
160+
return mode;
161+
}
162+
}
163+
164+
// package-visible for testing
165+
DiscoveryNode getLocalNode() {
166+
return transportService.getLocalNode();
167+
}
168+
169+
@Override
170+
protected void doStart() {
171+
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
172+
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
173+
}
174+
175+
public void startInitialJoin() {
176+
synchronized (mutex) {
177+
becomeCandidate("startInitialJoin");
178+
}
179+
}
180+
181+
@Override
182+
protected void doStop() {
183+
184+
}
185+
186+
@Override
187+
protected void doClose() {
188+
189+
}
190+
191+
public void invariant() {
192+
synchronized (mutex) {
193+
if (mode == Mode.LEADER) {
194+
assert coordinationState.get().electionWon();
195+
assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode());
196+
assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator;
197+
} else if (mode == Mode.FOLLOWER) {
198+
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
199+
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
200+
assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator;
201+
} else {
202+
assert mode == Mode.CANDIDATE;
203+
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
204+
}
205+
}
206+
}
207+
208+
public enum Mode {
209+
CANDIDATE, LEADER, FOLLOWER
210+
}
211+
}

0 commit comments

Comments
 (0)