Skip to content

Commit 86ef041

Browse files
authored
[Zen2] Introduce ClusterBootstrapService (#35488)
Today, the bootstrapping of a Zen2 cluster is driven externally, requiring something else to wait for discovery to converge and then to inject the initial configuration. This is hard to use in some situations, such as REST tests. This change introduces the `ClusterBootstrapService` which brings the bootstrap retry logic within each node and allows it to be controlled via an (unsafe) node setting.
1 parent 135c3f0 commit 86ef041

File tree

12 files changed

+466
-146
lines changed

12 files changed

+466
-146
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.action.ActionRequest;
2222
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.common.Nullable;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526
import org.elasticsearch.common.unit.TimeValue;
@@ -33,6 +34,8 @@
3334
public class GetDiscoveredNodesRequest extends ActionRequest {
3435

3536
private int waitForNodes = 1;
37+
38+
@Nullable // if the request should wait indefinitely
3639
private TimeValue timeout = TimeValue.timeValueSeconds(30);
3740

3841
public GetDiscoveredNodesRequest() {
@@ -41,7 +44,7 @@ public GetDiscoveredNodesRequest() {
4144
public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
4245
super(in);
4346
waitForNodes = in.readInt();
44-
timeout = in.readTimeValue();
47+
timeout = in.readOptionalTimeValue();
4548
}
4649

4750
/**
@@ -74,8 +77,8 @@ public int getWaitForNodes() {
7477
*
7578
* @param timeout how long to wait to discover sufficiently many nodes to respond successfully.
7679
*/
77-
public void setTimeout(TimeValue timeout) {
78-
if (timeout.compareTo(TimeValue.ZERO) < 0) {
80+
public void setTimeout(@Nullable TimeValue timeout) {
81+
if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) {
7982
throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed");
8083
}
8184
this.timeout = timeout;
@@ -87,6 +90,7 @@ public void setTimeout(TimeValue timeout) {
8790
*
8891
* @return how long to wait to discover sufficiently many nodes to respond successfully.
8992
*/
93+
@Nullable
9094
public TimeValue getTimeout() {
9195
return timeout;
9296
}
@@ -105,7 +109,7 @@ public void readFrom(StreamInput in) throws IOException {
105109
public void writeTo(StreamOutput out) throws IOException {
106110
super.writeTo(out);
107111
out.writeInt(waitForNodes);
108-
out.writeTimeValue(timeout);
112+
out.writeOptionalTimeValue(timeout);
109113
}
110114

111115
@Override

server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,18 +108,20 @@ public String toString() {
108108
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
109109
respondIfRequestSatisfied.accept(coordinator.getFoundPeers());
110110

111-
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
112-
@Override
113-
public void run() {
114-
if (listenerNotified.compareAndSet(false, true)) {
115-
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
111+
if (request.getTimeout() != null) {
112+
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
113+
@Override
114+
public void run() {
115+
if (listenerNotified.compareAndSet(false, true)) {
116+
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
117+
}
116118
}
117-
}
118119

119-
@Override
120-
public String toString() {
121-
return "timeout handler for " + request;
122-
}
123-
});
120+
@Override
121+
public String toString() {
122+
return "timeout handler for " + request;
123+
}
124+
});
125+
}
124126
}
125127
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
25+
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
26+
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
27+
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
28+
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
29+
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
30+
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
31+
import org.elasticsearch.cluster.node.DiscoveryNode;
32+
import org.elasticsearch.common.io.stream.StreamInput;
33+
import org.elasticsearch.common.settings.Setting;
34+
import org.elasticsearch.common.settings.Setting.Property;
35+
import org.elasticsearch.common.settings.Settings;
36+
import org.elasticsearch.common.unit.TimeValue;
37+
import org.elasticsearch.common.util.concurrent.ThreadContext;
38+
import org.elasticsearch.threadpool.ThreadPool.Names;
39+
import org.elasticsearch.transport.TransportException;
40+
import org.elasticsearch.transport.TransportResponseHandler;
41+
import org.elasticsearch.transport.TransportService;
42+
43+
import java.io.IOException;
44+
45+
public class ClusterBootstrapService {
46+
47+
private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class);
48+
49+
// The number of master-eligible nodes which, if discovered, can be used to bootstrap the cluster. This setting is unsafe in the event
50+
// that more master nodes are started than expected.
51+
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
52+
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);
53+
54+
private final int initialMasterNodeCount;
55+
private final TransportService transportService;
56+
private volatile boolean running;
57+
58+
public ClusterBootstrapService(Settings settings, TransportService transportService) {
59+
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
60+
this.transportService = transportService;
61+
}
62+
63+
public void start() {
64+
assert running == false;
65+
running = true;
66+
67+
if (initialMasterNodeCount > 0 && transportService.getLocalNode().isMasterNode()) {
68+
logger.debug("unsafely waiting for discovery of [{}] master-eligible nodes", initialMasterNodeCount);
69+
70+
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
71+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
72+
threadContext.markAsSystemContext();
73+
74+
final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
75+
request.setWaitForNodes(initialMasterNodeCount);
76+
request.setTimeout(null);
77+
logger.trace("sending {}", request);
78+
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
79+
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
80+
@Override
81+
public void handleResponse(GetDiscoveredNodesResponse response) {
82+
assert response.getNodes().size() >= initialMasterNodeCount;
83+
assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode);
84+
logger.debug("discovered {}, starting to bootstrap", response.getNodes());
85+
awaitBootstrap(response.getBootstrapConfiguration());
86+
}
87+
88+
@Override
89+
public void handleException(TransportException exp) {
90+
logger.warn("discovery attempt failed", exp);
91+
}
92+
93+
@Override
94+
public String executor() {
95+
return Names.SAME;
96+
}
97+
98+
@Override
99+
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
100+
return new GetDiscoveredNodesResponse(in);
101+
}
102+
});
103+
}
104+
}
105+
}
106+
107+
public void stop() {
108+
assert running == true;
109+
running = false;
110+
}
111+
112+
private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) {
113+
if (running == false) {
114+
logger.debug("awaitBootstrap: not running");
115+
return;
116+
}
117+
118+
BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration);
119+
logger.trace("sending {}", request);
120+
transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request,
121+
new TransportResponseHandler<BootstrapClusterResponse>() {
122+
@Override
123+
public void handleResponse(BootstrapClusterResponse response) {
124+
logger.debug("automatic cluster bootstrapping successful: received {}", response);
125+
}
126+
127+
@Override
128+
public void handleException(TransportException exp) {
129+
// log a warning since a failure here indicates a bad problem, such as:
130+
// - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config)
131+
// - discovered nodes no longer form a quorum in the bootstrap config
132+
logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]",
133+
bootstrapConfiguration.getNodeDescriptions()), exp);
134+
135+
// There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed
136+
// since a tight loop here is unlikely to help.
137+
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() {
138+
@Override
139+
public void run() {
140+
awaitBootstrap(bootstrapConfiguration);
141+
}
142+
143+
@Override
144+
public String toString() {
145+
return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions();
146+
}
147+
});
148+
}
149+
150+
@Override
151+
public String executor() {
152+
return Names.SAME;
153+
}
154+
155+
@Override
156+
public BootstrapClusterResponse read(StreamInput in) throws IOException {
157+
return new BootstrapClusterResponse(in);
158+
}
159+
});
160+
}
161+
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
114114
private Releasable prevotingRound;
115115
private long maxTermSeen;
116116
private final Reconfigurator reconfigurator;
117+
private final ClusterBootstrapService clusterBootstrapService;
117118

118119
private Mode mode;
119120
private Optional<DiscoveryNode> lastKnownLeader;
@@ -151,6 +152,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
151152
this.clusterApplier = clusterApplier;
152153
masterService.setClusterStateSupplier(this::getStateForMasterService);
153154
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
155+
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
154156
}
155157

156158
private Runnable getOnLeaderFailure() {
@@ -483,11 +485,14 @@ public void startInitialJoin() {
483485
synchronized (mutex) {
484486
becomeCandidate("startInitialJoin");
485487
}
488+
489+
clusterBootstrapService.start();
486490
}
487491

488492
@Override
489493
protected void doStop() {
490494
configuredHostsResolver.stop();
495+
clusterBootstrapService.stop();
491496
}
492497

493498
@Override

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.InternalClusterInfoService;
3333
import org.elasticsearch.cluster.NodeConnectionsService;
3434
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
35+
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
3536
import org.elasticsearch.cluster.coordination.Coordinator;
3637
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
3738
import org.elasticsearch.cluster.coordination.JoinHelper;
@@ -459,7 +460,8 @@ public void apply(Settings value, Settings current, Settings previous) {
459460
Coordinator.PUBLISH_TIMEOUT_SETTING,
460461
JoinHelper.JOIN_TIMEOUT_SETTING,
461462
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
462-
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING
463+
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
464+
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING
463465
)));
464466

465467
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(

server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.hamcrest.Matchers.endsWith;
2727
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.nullValue;
2829
import static org.hamcrest.Matchers.startsWith;
2930
import static org.hamcrest.core.Is.is;
3031

@@ -56,6 +57,9 @@ public void testTimeoutValidation() {
5657
() -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1))));
5758
assertThat(exception.getMessage(), startsWith("negative timeout of "));
5859
assertThat(exception.getMessage(), endsWith(" is not allowed"));
60+
61+
getDiscoveredNodesRequest.setTimeout(null);
62+
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue());
5963
}
6064

6165
public void testSerialization() throws IOException {
@@ -67,6 +71,8 @@ public void testSerialization() throws IOException {
6771

6872
if (randomBoolean()) {
6973
originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
74+
} else if (randomBoolean()) {
75+
originalRequest.setTimeout(null);
7076
}
7177

7278
final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new);

server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -171,34 +171,53 @@ public void handleException(TransportException exp) {
171171
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
172172
}
173173

174-
public void testFailsQuicklyWithZeroTimeout() throws InterruptedException {
174+
public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException {
175175
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
176176
transportService.start();
177177
transportService.acceptIncomingRequests();
178178
coordinator.start();
179179
coordinator.startInitialJoin();
180180

181-
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
182-
getDiscoveredNodesRequest.setWaitForNodes(2);
183-
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
181+
{
182+
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
183+
getDiscoveredNodesRequest.setWaitForNodes(2);
184+
getDiscoveredNodesRequest.setTimeout(null);
185+
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
186+
@Override
187+
public void handleResponse(GetDiscoveredNodesResponse response) {
188+
throw new AssertionError("should not be called");
189+
}
184190

185-
final CountDownLatch countDownLatch = new CountDownLatch(1);
186-
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
187-
@Override
188-
public void handleResponse(GetDiscoveredNodesResponse response) {
189-
throw new AssertionError("should not be called");
190-
}
191+
@Override
192+
public void handleException(TransportException exp) {
193+
throw new AssertionError("should not be called", exp);
194+
}
195+
});
196+
}
191197

192-
@Override
193-
public void handleException(TransportException exp) {
194-
final Throwable rootCause = exp.getRootCause();
195-
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
196-
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
197-
countDownLatch.countDown();
198-
}
199-
});
198+
{
199+
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
200+
getDiscoveredNodesRequest.setWaitForNodes(2);
201+
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
200202

201-
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
203+
final CountDownLatch countDownLatch = new CountDownLatch(1);
204+
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
205+
@Override
206+
public void handleResponse(GetDiscoveredNodesResponse response) {
207+
throw new AssertionError("should not be called");
208+
}
209+
210+
@Override
211+
public void handleException(TransportException exp) {
212+
final Throwable rootCause = exp.getRootCause();
213+
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
214+
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
215+
countDownLatch.countDown();
216+
}
217+
});
218+
219+
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
220+
}
202221
}
203222

204223
public void testGetsDiscoveredNodes() throws InterruptedException {

0 commit comments

Comments
 (0)