Skip to content

Commit 28b2160

Browse files
committed
Use correct cluster state version for node fault detection (#30810)
Since its introduction in ES 1.4, node fault detection has been using the wrong cluster state version to send as part of the ping request, by using always the constant -1 (ClusterState.UNKNOWN_VERSION). This can, in an unfortunate series of events, lead to a situation where a previous stale master can regain its authority and revert the cluster to an older state. This commit makes NodesFaultDetection use the correct current cluster state for sending ping requests, avoiding the situation where a stale master possibly forces a newer master to step down and rejoin the stale one.
1 parent 5667451 commit 28b2160

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Set;
4545
import java.util.concurrent.ConcurrentMap;
4646
import java.util.concurrent.CopyOnWriteArrayList;
47+
import java.util.function.Supplier;
4748

4849
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
4950

@@ -66,13 +67,16 @@ public void onPingReceived(PingRequest pingRequest) {}
6667

6768
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
6869

69-
private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
70+
private final Supplier<ClusterState> clusterStateSupplier;
7071

7172
private volatile DiscoveryNode localNode;
7273

73-
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
74+
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
75+
Supplier<ClusterState> clusterStateSupplier, ClusterName clusterName) {
7476
super(settings, threadPool, transportService, clusterName);
7577

78+
this.clusterStateSupplier = clusterStateSupplier;
79+
7680
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
7781
pingRetryCount);
7882

@@ -208,15 +212,18 @@ private boolean running() {
208212
return NodeFD.this.equals(nodesFD.get(node));
209213
}
210214

215+
private PingRequest newPingRequest() {
216+
return new PingRequest(node, clusterName, localNode, clusterStateSupplier.get().version());
217+
}
218+
211219
@Override
212220
public void run() {
213221
if (!running()) {
214222
return;
215223
}
216-
final PingRequest pingRequest = new PingRequest(node, clusterName, localNode, clusterStateVersion);
217224
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
218225
.withTimeout(pingRetryTimeout).build();
219-
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new TransportResponseHandler<PingResponse>() {
226+
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
220227
@Override
221228
public PingResponse newInstance() {
222229
return new PingResponse();
@@ -254,7 +261,7 @@ public void handleException(TransportException exp) {
254261
}
255262
} else {
256263
// resend the request, not reschedule, rely on send timeout
257-
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
264+
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);
258265
}
259266
}
260267

server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
205205

206206
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
207207
this.masterFD.addListener(new MasterNodeFailureListener());
208-
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
208+
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
209209
this.nodesFD.addListener(new NodeFaultDetectionListener());
210210
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));
211211

server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,19 @@ public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedExcep
175175
final Settings pingSettings = Settings.builder()
176176
.put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
177177
.put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build();
178-
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build();
178+
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong())
179+
.nodes(buildNodesForA(true)).build();
179180
NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(),
180-
threadPool, serviceA, clusterState.getClusterName());
181+
threadPool, serviceA, () -> clusterState, clusterState.getClusterName());
181182
nodesFDA.setLocalNode(nodeA);
182183
NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(),
183-
threadPool, serviceB, clusterState.getClusterName());
184+
threadPool, serviceB, () -> clusterState, clusterState.getClusterName());
184185
nodesFDB.setLocalNode(nodeB);
185186
final CountDownLatch pingSent = new CountDownLatch(1);
186187
nodesFDB.addListener(new NodesFaultDetection.Listener() {
187188
@Override
188189
public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) {
190+
assertThat(pingRequest.clusterStateVersion(), equalTo(clusterState.version()));
189191
pingSent.countDown();
190192
}
191193
});

0 commit comments

Comments
 (0)