Skip to content

Commit da709a2

Browse files
committed
YARN-2641. Decommission nodes on -refreshNodes instead of next NM-RM heartbeat. (Zhihai Xu via kasha)
1 parent 178bc50 commit da709a2

File tree

4 files changed

+35
-22
lines changed

4 files changed

+35
-22
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ Release 2.7.0 - UNRELEASED
4747
YARN-2635. TestRM, TestRMRestart, TestClientToAMTokens should run
4848
with both CS and FS. (Wei Yan and kasha via kasha)
4949

50+
YARN-2641. Decommission nodes on -refreshNodes instead of next
51+
NM-RM heartbeat. (Zhihai Xu via kasha)
52+
5053
OPTIMIZATIONS
5154

5255
BUG FIXES

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hadoop.net.NetUtils;
3131
import org.apache.hadoop.service.AbstractService;
3232
import org.apache.hadoop.util.HostsFileReader;
33+
import org.apache.hadoop.yarn.api.records.NodeId;
3334
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3435
import org.apache.hadoop.yarn.event.EventHandler;
3536
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -38,6 +39,8 @@
3839
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
3940
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
4041
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
42+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
43+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
4144

4245
import com.google.common.annotations.VisibleForTesting;
4346

@@ -123,6 +126,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
123126
.getConfigurationInputStream(this.conf, excludesFile));
124127
printConfiguredHosts();
125128
}
129+
130+
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
131+
if (!isValidNode(nodeId.getHost())) {
132+
this.rmContext.getDispatcher().getEventHandler().handle(
133+
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
134+
}
135+
}
126136
}
127137

128138
private void setDecomissionedNMsMetrics() {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,25 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
349349
NodeStatus remoteNodeStatus = request.getNodeStatus();
350350
/**
351351
* Here is the node heartbeat sequence...
352-
* 1. Check if it's a registered node
353-
* 2. Check if it's a valid (i.e. not excluded) node
354-
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
352+
* 1. Check if it's a valid (i.e. not excluded) node
353+
* 2. Check if it's a registered node
354+
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
355355
* 4. Send healthStatus to RMNode
356356
*/
357357

358358
NodeId nodeId = remoteNodeStatus.getNodeId();
359359

360-
// 1. Check if it's a registered node
360+
// 1. Check if it's a valid (i.e. not excluded) node
361+
if (!this.nodesListManager.isValidNode(nodeId.getHost())) {
362+
String message =
363+
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
364+
+ nodeId.getHost();
365+
LOG.info(message);
366+
shutDown.setDiagnosticsMessage(message);
367+
return shutDown;
368+
}
369+
370+
// 2. Check if it's a registered node
361371
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
362372
if (rmNode == null) {
363373
/* node does not exist */
@@ -370,18 +380,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
370380
// Send ping
371381
this.nmLivelinessMonitor.receivedPing(nodeId);
372382

373-
// 2. Check if it's a valid (i.e. not excluded) node
374-
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
375-
String message =
376-
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
377-
+ rmNode.getNodeAddress();
378-
LOG.info(message);
379-
shutDown.setDiagnosticsMessage(message);
380-
this.rmContext.getDispatcher().getEventHandler().handle(
381-
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
382-
return shutDown;
383-
}
384-
385383
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
386384
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
387385
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,17 @@ public void testDecommissionWithIncludeHosts() throws Exception {
130130

131131
rm.getNodesListManager().refreshNodes(conf);
132132

133+
checkDecommissionedNMCount(rm, ++metricCount);
134+
133135
nodeHeartbeat = nm1.nodeHeartbeat(true);
134136
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
135137
Assert
136-
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
138+
.assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
137139

138140
nodeHeartbeat = nm2.nodeHeartbeat(true);
139141
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
140142
.equals(nodeHeartbeat.getNodeAction()));
141143

142-
checkDecommissionedNMCount(rm, ++metricCount);
143-
144144
nodeHeartbeat = nm3.nodeHeartbeat(true);
145145
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
146146
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
@@ -185,6 +185,8 @@ protected Dispatcher createDispatcher() {
185185

186186
rm.getNodesListManager().refreshNodes(conf);
187187

188+
checkDecommissionedNMCount(rm, metricCount + 2);
189+
188190
nodeHeartbeat = nm1.nodeHeartbeat(true);
189191
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
190192
nodeHeartbeat = nm2.nodeHeartbeat(true);
@@ -195,7 +197,7 @@ protected Dispatcher createDispatcher() {
195197
Assert.assertTrue("The decommisioned metrics are not updated",
196198
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
197199
dispatcher.await();
198-
checkDecommissionedNMCount(rm, metricCount + 2);
200+
199201
writeToHostsFile("");
200202
rm.getNodesListManager().refreshNodes(conf);
201203

@@ -234,6 +236,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
234236
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
235237
.getAbsolutePath());
236238
rm.getNodesListManager().refreshNodes(conf);
239+
checkDecommissionedNMCount(rm, ++initialMetricCount);
237240
nodeHeartbeat = nm1.nodeHeartbeat(true);
238241
Assert.assertEquals(
239242
"Node should not have been decomissioned.",
@@ -243,7 +246,6 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
243246
Assert.assertEquals("Node should have been decomissioned but is in state" +
244247
nodeHeartbeat.getNodeAction(),
245248
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
246-
checkDecommissionedNMCount(rm, ++initialMetricCount);
247249
}
248250

249251
/**
@@ -271,6 +273,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception {
271273
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
272274
.getAbsolutePath());
273275
rm.getNodesListManager().refreshNodes(conf);
276+
checkDecommissionedNMCount(rm, ++initialMetricCount);
274277
nodeHeartbeat = nm1.nodeHeartbeat(true);
275278
Assert.assertEquals(
276279
"Node should not have been decomissioned.",
@@ -280,7 +283,6 @@ public void testAddNewExcludePathToConfiguration() throws Exception {
280283
Assert.assertEquals("Node should have been decomissioned but is in state" +
281284
nodeHeartbeat.getNodeAction(),
282285
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
283-
checkDecommissionedNMCount(rm, ++initialMetricCount);
284286
}
285287

286288
@Test

0 commit comments

Comments
 (0)