Skip to content

Commit 4060978

Browse files
afchungHarshitGupta11
authored andcommitted
YARN-11003. Make RMNode aware of all (OContainer inclusive) allocated resources (apache#3646)
1 parent 3e5002f commit 4060978

File tree

4 files changed

+265
-19
lines changed
  • hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src

4 files changed

+265
-19
lines changed

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public Resource getTotalCapability() {
9999
return node.getTotalCapability();
100100
}
101101

102+
@Override
103+
public Resource getAllocatedContainerResource() {
104+
return node.getAllocatedContainerResource();
105+
}
106+
102107
@Override
103108
public String getRackName() {
104109
return node.getRackName();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
3636
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
3737
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
38+
import org.apache.hadoop.yarn.util.resource.Resources;
3839

3940
/**
4041
* Node managers information on available resources
@@ -104,6 +105,17 @@ public interface RMNode {
104105
*/
105106
public Resource getTotalCapability();
106107

108+
/**
109+
* The total allocated resources to containers.
110+
* This will include the sum of Guaranteed and Opportunistic
111+
* containers queued + running + paused on the node.
112+
* @return the total allocated resources, including all Guaranteed and
113+
* Opportunistic containers in queued, running and paused states.
114+
*/
115+
default Resource getAllocatedContainerResource() {
116+
return Resources.none();
117+
}
118+
107119
/**
108120
* If the total available resources has been updated.
109121
* @return If the capability has been updated.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
128128
/* Snapshot of total resources before receiving decommissioning command */
129129
private volatile Resource originalTotalCapability;
130130
private volatile Resource totalCapability;
131+
private volatile Resource allocatedContainerResource =
132+
Resource.newInstance(Resources.none());
131133
private volatile boolean updatedCapability = false;
132134
private final Node node;
133135

@@ -464,6 +466,11 @@ public Resource getTotalCapability() {
464466
return this.totalCapability;
465467
}
466468

469+
@Override
470+
public Resource getAllocatedContainerResource() {
471+
return this.allocatedContainerResource;
472+
}
473+
467474
@Override
468475
public boolean isUpdatedCapability() {
469476
return this.updatedCapability;
@@ -952,13 +959,22 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
952959
ClusterMetrics.getMetrics().decrDecommisionedNMs();
953960
}
954961
containers = startEvent.getNMContainerStatuses();
962+
final Resource allocatedResource = Resource.newInstance(
963+
Resources.none());
955964
if (containers != null && !containers.isEmpty()) {
956965
for (NMContainerStatus container : containers) {
957-
if (container.getContainerState() == ContainerState.RUNNING) {
958-
rmNode.launchedContainers.add(container.getContainerId());
966+
if (container.getContainerState() == ContainerState.NEW ||
967+
container.getContainerState() == ContainerState.RUNNING) {
968+
Resources.addTo(allocatedResource,
969+
container.getAllocatedResource());
970+
if (container.getContainerState() == ContainerState.RUNNING) {
971+
rmNode.launchedContainers.add(container.getContainerId());
972+
}
959973
}
960974
}
961975
}
976+
977+
rmNode.allocatedContainerResource = allocatedResource;
962978
}
963979

964980
if (null != startEvent.getRunningApplications()) {
@@ -1554,6 +1570,8 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
15541570
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
15551571
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
15561572
int numRemoteRunningContainers = 0;
1573+
final Resource allocatedResource = Resource.newInstance(Resources.none());
1574+
15571575
for (ContainerStatus remoteContainer : containerStatuses) {
15581576
ContainerId containerId = remoteContainer.getContainerId();
15591577

@@ -1622,8 +1640,16 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
16221640
containerAllocationExpirer
16231641
.unregister(new AllocationExpirationInfo(containerId));
16241642
}
1643+
1644+
if ((remoteContainer.getState() == ContainerState.RUNNING ||
1645+
remoteContainer.getState() == ContainerState.NEW) &&
1646+
remoteContainer.getCapability() != null) {
1647+
Resources.addTo(allocatedResource, remoteContainer.getCapability());
1648+
}
16251649
}
16261650

1651+
allocatedContainerResource = allocatedResource;
1652+
16271653
List<ContainerStatus> lostContainers =
16281654
findLostContainers(numRemoteRunningContainers, containerStatuses);
16291655
for (ContainerStatus remoteContainer : lostContainers) {

0 commit comments

Comments
 (0)