Skip to content

Commit 4cfdb06

Browse files
authored
Adapt InternalCluster#fullRestart to call onNodeStopped when all nodes are stopped (#35494)
Refactors and simplifies the logic around stopping nodes, making sure that for a full cluster restart onNodeStopped is only called after the nodes are actually all stopped (and in particular not while starting up some nodes again). This change also ensures that a closed node client is not being used anymore (which required a small change to a test). Relates to #35049
1 parent 212c202 commit 4cfdb06

File tree

2 files changed

+41
-45
lines changed

2 files changed

+41
-45
lines changed

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -908,28 +908,12 @@ void startNode() {
908908
}
909909
}
910910

911-
void closeNode() throws IOException {
912-
markNodeDataDirsAsPendingForWipe(node);
913-
node.close();
914-
}
915-
916-
/**
917-
* closes the current node if not already closed, builds a new node object using the current node settings and starts it
918-
*/
919-
void restart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes) throws Exception {
920-
if (!node.isClosed()) {
921-
closeNode();
922-
}
923-
recreateNodeOnRestart(callback, clearDataIfNeeded, minMasterNodes, () -> rebuildUnicastHostFiles(emptyList()));
924-
startNode();
925-
}
926-
927911
/**
928-
* rebuilds a new node object using the current node settings and starts it
912+
* closes the node and prepares it to be restarted
929913
*/
930-
void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded, int minMasterNodes,
931-
Runnable onTransportServiceStarted) throws Exception {
914+
Settings closeForRestart(RestartCallback callback, int minMasterNodes) throws Exception {
932915
assert callback != null;
916+
close();
933917
Settings callbackSettings = callback.onNodeStopped(name);
934918
Settings.Builder newSettings = Settings.builder();
935919
if (callbackSettings != null) {
@@ -939,12 +923,9 @@ void recreateNodeOnRestart(RestartCallback callback, boolean clearDataIfNeeded,
939923
assert DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed";
940924
newSettings.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build();
941925
}
942-
if (clearDataIfNeeded) {
943-
clearDataIfNeeded(callback);
944-
}
945-
createNewNode(newSettings.build(), onTransportServiceStarted);
946-
// make sure cached client points to new node
947-
resetClient();
926+
// delete data folders now, before we start other nodes that may claim it
927+
clearDataIfNeeded(callback);
928+
return newSettings.build();
948929
}
949930

950931
private void clearDataIfNeeded(RestartCallback callback) throws IOException {
@@ -958,7 +939,10 @@ private void clearDataIfNeeded(RestartCallback callback) throws IOException {
958939
}
959940
}
960941

961-
private void createNewNode(final Settings newSettings, final Runnable onTransportServiceStarted) {
942+
private void recreateNode(final Settings newSettings, final Runnable onTransportServiceStarted) {
943+
if (closed.get() == false) {
944+
throw new IllegalStateException("node " + name + " should be closed before recreating it");
945+
}
962946
// use a new seed to make sure we have new node id
963947
final long newIdSeed = NodeEnvironment.NODE_ID_SEED_SETTING.get(node.settings()) + 1;
964948
Settings finalSettings = Settings.builder()
@@ -978,6 +962,7 @@ public void afterStart() {
978962
onTransportServiceStarted.run();
979963
}
980964
});
965+
closed.set(false);
981966
markNodeDataDirsAsNotEligableForWipe(node);
982967
}
983968

@@ -987,7 +972,8 @@ public void close() throws IOException {
987972
resetClient();
988973
} finally {
989974
closed.set(true);
990-
closeNode();
975+
markNodeDataDirsAsPendingForWipe(node);
976+
node.close();
991977
}
992978
}
993979
}
@@ -1700,7 +1686,10 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
17001686
if (updateMinMaster) {
17011687
updateMinMasterNodes(masterNodesCount - 1);
17021688
}
1703-
nodeAndClient.restart(callback, true, autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
1689+
final Settings newSettings = nodeAndClient.closeForRestart(callback,
1690+
autoManageMinMasterNodes ? getMinMasterNodes(masterNodesCount) : -1);
1691+
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
1692+
nodeAndClient.startNode();
17041693
if (activeDisruptionScheme != null) {
17051694
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
17061695
}
@@ -1721,19 +1710,20 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
17211710
*/
17221711
public synchronized void fullRestart(RestartCallback callback) throws Exception {
17231712
int numNodesRestarted = 0;
1713+
final Settings[] newNodeSettings = new Settings[nextNodeId.get()];
17241714
Map<Set<Role>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
17251715
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
1716+
final int minMasterNodes = autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1;
17261717
for (NodeAndClient nodeAndClient : nodes.values()) {
17271718
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
1728-
logger.info("Stopping node [{}] ", nodeAndClient.name);
1719+
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
17291720
if (activeDisruptionScheme != null) {
17301721
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
17311722
}
1732-
nodeAndClient.closeNode();
1733-
// delete data folders now, before we start other nodes that may claim it
1734-
nodeAndClient.clearDataIfNeeded(callback);
17351723
DiscoveryNode discoveryNode = getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode();
1736-
rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId] = discoveryNode.getRoles();
1724+
final Settings newSettings = nodeAndClient.closeForRestart(callback, minMasterNodes);
1725+
newNodeSettings[nodeAndClient.nodeAndClientId()] = newSettings;
1726+
rolesOrderedByOriginalStartupOrder[nodeAndClient.nodeAndClientId()] = discoveryNode.getRoles();
17371727
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
17381728
}
17391729

@@ -1758,10 +1748,8 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
17581748
assert nodesByRoles.values().stream().collect(Collectors.summingInt(List::size)) == 0;
17591749

17601750
for (NodeAndClient nodeAndClient : startUpOrder) {
1761-
logger.info("resetting node [{}] ", nodeAndClient.name);
1762-
// we already cleared data folders, before starting nodes up
1763-
nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1,
1764-
() -> rebuildUnicastHostFiles(startUpOrder));
1751+
logger.info("creating node [{}] ", nodeAndClient.name);
1752+
nodeAndClient.recreateNode(newNodeSettings[nodeAndClient.nodeAndClientId()], () -> rebuildUnicastHostFiles(startUpOrder));
17651753
}
17661754

17671755
startAndPublishNodesAndClients(startUpOrder);

test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,17 +148,21 @@ private void assertMMNinNodeSetting(String node, InternalTestCluster cluster, in
148148
}
149149

150150
private void assertMMNinClusterSetting(InternalTestCluster cluster, int masterNodes) {
151-
final int minMasterNodes = masterNodes / 2 + 1;
152151
for (final String node : cluster.getNodeNames()) {
153-
Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
154-
.get().getState().getMetaData().settings();
155-
156-
assertEquals("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
157-
+ stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
158-
DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(stateSettings).intValue(), minMasterNodes);
152+
assertMMNinClusterSetting(node, cluster, masterNodes);
159153
}
160154
}
161155

156+
private void assertMMNinClusterSetting(String node, InternalTestCluster cluster, int masterNodes) {
157+
final int minMasterNodes = masterNodes / 2 + 1;
158+
Settings stateSettings = cluster.client(node).admin().cluster().prepareState().setLocal(true)
159+
.get().getState().getMetaData().settings();
160+
161+
assertEquals("dynamic setting for node [" + node + "] has the wrong min_master_node setting : ["
162+
+ stateSettings.get(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey()) + "]",
163+
DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(stateSettings).intValue(), minMasterNodes);
164+
}
165+
162166
public void testBeforeTest() throws Exception {
163167
final boolean autoManageMinMasterNodes = randomBoolean();
164168
long clusterSeed = randomLong();
@@ -489,7 +493,11 @@ public Settings transportClientSettings() {
489493
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
490494
@Override
491495
public Settings onNodeStopped(String nodeName) throws Exception {
492-
assertMMNinClusterSetting(cluster, 1);
496+
for (String name : cluster.getNodeNames()) {
497+
if (name.equals(nodeName) == false) {
498+
assertMMNinClusterSetting(name, cluster, 1);
499+
}
500+
}
493501
return super.onNodeStopped(nodeName);
494502
}
495503
});

0 commit comments

Comments
 (0)