Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public Resource getTotalCapability() {
return node.getTotalCapability();
}

@Override
public Resource getAllocatedContainerResource() {
return node.getAllocatedContainerResource();
}

@Override
public String getRackName() {
return node.getRackName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.Resources;

/**
* Node managers information on available resources
Expand Down Expand Up @@ -104,6 +105,17 @@ public interface RMNode {
*/
public Resource getTotalCapability();

/**
* The total allocated resources to containers.
* This will include the sum of Guaranteed and Opportunistic
* containers queued + running + paused on the node.
* @return the total allocated resources, including all Guaranteed and
* Opportunistic containers in queued, running and paused states.
*/
default Resource getAllocatedContainerResource() {
return Resources.none();
}

/**
* If the total available resources has been updated.
* @return If the capability has been updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
private volatile Resource allocatedContainerResource =
Resource.newInstance(Resources.none());
private volatile boolean updatedCapability = false;
private final Node node;

Expand Down Expand Up @@ -464,6 +466,11 @@ public Resource getTotalCapability() {
return this.totalCapability;
}

@Override
public Resource getAllocatedContainerResource() {
return this.allocatedContainerResource;
}

@Override
public boolean isUpdatedCapability() {
return this.updatedCapability;
Expand Down Expand Up @@ -952,13 +959,22 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
ClusterMetrics.getMetrics().decrDecommisionedNMs();
}
containers = startEvent.getNMContainerStatuses();
final Resource allocatedResource = Resource.newInstance(
Resources.none());
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
if (container.getContainerState() == ContainerState.NEW ||
container.getContainerState() == ContainerState.RUNNING) {
Resources.addTo(allocatedResource,
container.getAllocatedResource());
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
}
}

rmNode.allocatedContainerResource = allocatedResource;
}

if (null != startEvent.getRunningApplications()) {
Expand Down Expand Up @@ -1554,6 +1570,8 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
List<Map.Entry<ApplicationId, ContainerStatus>> needUpdateContainers =
new ArrayList<Map.Entry<ApplicationId, ContainerStatus>>();
int numRemoteRunningContainers = 0;
final Resource allocatedResource = Resource.newInstance(Resources.none());

for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId();

Expand Down Expand Up @@ -1622,8 +1640,16 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
containerAllocationExpirer
.unregister(new AllocationExpirationInfo(containerId));
}

if ((remoteContainer.getState() == ContainerState.RUNNING ||
remoteContainer.getState() == ContainerState.NEW) &&
remoteContainer.getCapability() != null) {
Resources.addTo(allocatedResource, remoteContainer.getCapability());
}
}

allocatedContainerResource = allocatedResource;

List<ContainerStatus> lostContainers =
findLostContainers(numRemoteRunningContainers, containerStatuses);
for (ContainerStatus remoteContainer : lostContainers) {
Expand Down
Loading