From 9e60671f3683ef9e3842bbabd59f37d4aa36c33d Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Thu, 11 Nov 2021 14:11:31 -0500 Subject: [PATCH 1/4] YARN-11003. Make RMNode aware of all (OContainer inclusive) allocated resources * Adds a new method in RMNode: getAllocatedContainerResource, which records how much resource is allocated to containers on a node * Adds up RUNNING and NEW container resources in HB loop to keep track of resources allocated for containers --- .../yarn/sls/scheduler/RMNodeWrapper.java | 5 ++ .../server/resourcemanager/rmnode/RMNode.java | 9 ++ .../resourcemanager/rmnode/RMNodeImpl.java | 17 ++++ .../TestRMNodeTransitions.java | 90 +++++++++++++++++++ 4 files changed, 121 insertions(+) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index b5ae4f5b3c0ae..26d35ac897235 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -99,6 +99,11 @@ public Resource getTotalCapability() { return node.getTotalCapability(); } + @Override + public Resource getAllocatedContainerResource() { + return node.getAllocatedContainerResource(); + } + @Override public String getRackName() { return node.getRackName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index e6205d2dac6a3..f6b143fdb374a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Node managers information on available resources @@ -104,6 +105,14 @@ public interface RMNode { */ public Resource getTotalCapability(); + /** + * the total allocated resources to containers. + * @return the total allocated resources. + */ + default Resource getAllocatedContainerResource() { + return Resources.none(); + } + /** * If the total available resources has been updated. * @return If the capability has been updated. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e5f6f2c85bbc5..025b5b75355dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -128,6 +128,8 @@ public class RMNodeImpl implements RMNode, EventHandler { /* Snapshot of total resources before receiving decommissioning command */ private volatile Resource originalTotalCapability; private volatile Resource totalCapability; + private volatile Resource allocatedContainerResource = + Resource.newInstance(Resources.none()); private volatile boolean updatedCapability = false; private final Node node; @@ -464,6 +466,11 @@ public Resource getTotalCapability() { return this.totalCapability; } + @Override + public Resource getAllocatedContainerResource() { + return this.allocatedContainerResource; + } + @Override public boolean isUpdatedCapability() { return this.updatedCapability; @@ -1554,6 +1561,8 @@ private void handleContainerStatus(List containerStatuses) { List> needUpdateContainers = new ArrayList>(); int numRemoteRunningContainers = 0; + final Resource allocatedResource = Resource.newInstance(Resources.none()); + for (ContainerStatus remoteContainer : containerStatuses) { ContainerId containerId = remoteContainer.getContainerId(); @@ -1622,8 +1631,16 @@ private void handleContainerStatus(List containerStatuses) { containerAllocationExpirer .unregister(new AllocationExpirationInfo(containerId)); } + + if ((remoteContainer.getState() == ContainerState.RUNNING || + remoteContainer.getState() == ContainerState.NEW) && + remoteContainer.getCapability() != null) { + Resources.addTo(allocatedResource, remoteContainer.getCapability()); + } } + allocatedContainerResource = allocatedResource; + List lostContainers = findLostContainers(numRemoteRunningContainers, containerStatuses); for (ContainerStatus remoteContainer : lostContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3346b57d98b75..2a5d90ec00d5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -358,6 +360,94 @@ public void testContainerUpdate() throws InterruptedException{ .getContainerId()); } + /** + * Tests that allocated container resources are counted correctly in + * {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode} + * upon a node update. Resources should be counted for both GUARANTEED + * and OPPORTUNISTIC containers. + */ + @Test (timeout = 5000) + public void testAllocatedContainerUpdate() { + NodeStatus mockNodeStatus = createMockNodeStatus(); + //Start the node + node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); + + NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); + + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + ContainerId newContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 0); + ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 1); + ContainerId newOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 2); + ContainerId runningOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 3); + + rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); + + RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); + ContainerStatus newContainerStatusFromNode = mock(ContainerStatus.class); + ContainerStatus runningContainerStatusFromNode = + mock(ContainerStatus.class); + + final Resource newContainerCapability = + Resource.newInstance(100, 1); + final Resource runningContainerCapability = + Resource.newInstance(200, 2); + doReturn(newContainerId).when(newContainerStatusFromNode) + .getContainerId(); + doReturn(ContainerState.NEW).when(newContainerStatusFromNode) + .getState(); + doReturn(newContainerCapability).when(newContainerStatusFromNode) + .getCapability(); + doReturn(runningContainerId).when(runningContainerStatusFromNode) + .getContainerId(); + doReturn(ContainerState.RUNNING).when(runningContainerStatusFromNode) + .getState(); + doReturn(runningContainerCapability).when(runningContainerStatusFromNode) + .getCapability(); + doReturn(Arrays.asList( + newContainerStatusFromNode, runningContainerStatusFromNode)) + .when(statusEventFromNode1).getContainers(); + node.handle(statusEventFromNode1); + Assert.assertTrue(Resources.equals( + node.getAllocatedContainerResource(), + Resource.newInstance(300, 3))); + + RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null); + ContainerStatus newOppContainerStatusFromNode = mock(ContainerStatus.class); + ContainerStatus runningOppContainerStatusFromNode = + mock(ContainerStatus.class); + doReturn(newOppContainerId).when(newOppContainerStatusFromNode) + .getContainerId(); + doReturn(ContainerState.NEW).when(newOppContainerStatusFromNode) + .getState(); + doReturn(newContainerCapability).when(newOppContainerStatusFromNode) + .getCapability(); + doReturn(ExecutionType.OPPORTUNISTIC) + .when(newOppContainerStatusFromNode) + .getExecutionType(); + doReturn(runningOppContainerId).when(runningOppContainerStatusFromNode) + .getContainerId(); + doReturn(ContainerState.RUNNING).when(runningOppContainerStatusFromNode) + .getState(); + doReturn(runningContainerCapability).when(runningOppContainerStatusFromNode) + .getCapability(); + doReturn(ExecutionType.OPPORTUNISTIC) + .when(runningOppContainerStatusFromNode) + .getExecutionType(); + doReturn(Arrays.asList( + newContainerStatusFromNode, runningContainerStatusFromNode, + newOppContainerStatusFromNode, runningOppContainerStatusFromNode)) + .when(statusEventFromNode2).getContainers(); + + node.handle(statusEventFromNode2); + Assert.assertTrue(Resources.equals( + node.getAllocatedContainerResource(), + Resource.newInstance(600, 6))); + } + @Test (timeout = 5000) public void testStatusChange(){ NodeStatus mockNodeStatus = createMockNodeStatus(); From b6ae804744b3ee540b8d289ece04527556f9586c Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Thu, 11 Nov 2021 17:03:11 -0500 Subject: [PATCH 2/4] Add more tests and address review comments --- .../TestRMNodeTransitions.java | 186 +++++++++++------- 1 file changed, 113 insertions(+), 73 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 2a5d90ec00d5a..3a8bf034dcae6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -233,6 +233,24 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { return event; } + private static ContainerStatus getMockContainerStatus( + final ContainerId containerId, final Resource capability, + final ContainerState containerState) { + return getMockContainerStatus(containerId, capability, containerState, + ExecutionType.GUARANTEED); + } + + private static ContainerStatus getMockContainerStatus( + final ContainerId containerId, final Resource capability, + final ContainerState containerState, final ExecutionType executionType) { + final ContainerStatus containerStatus = mock(ContainerStatus.class); + doReturn(containerId).when(containerStatus).getContainerId(); + doReturn(containerState).when(containerStatus).getState(); + doReturn(capability).when(containerStatus).getCapability(); + doReturn(executionType).when(containerStatus).getExecutionType(); + return containerStatus; + } + @Test (timeout = 5000) public void testExpiredContainer() { NodeStatus mockNodeStatus = createMockNodeStatus(); @@ -250,8 +268,8 @@ public void testExpiredContainer() { // Now verify that scheduler isn't notified of an expired container // by checking number of 'completedContainers' it got in the previous event RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus = mock(ContainerStatus.class); - doReturn(completedContainerId).when(containerStatus).getContainerId(); + ContainerStatus containerStatus = getMockContainerStatus( + completedContainerId, null, ContainerState.COMPLETE); doReturn(Collections.singletonList(containerStatus)). when(statusEvent).getContainers(); node.handle(statusEvent); @@ -323,12 +341,13 @@ public void testContainerUpdate() throws InterruptedException{ RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class); - ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class); - ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class); + ContainerStatus containerStatusFromNode1 = getMockContainerStatus( + completedContainerIdFromNode1, null, ContainerState.COMPLETE); + ContainerStatus containerStatusFromNode2_1 = getMockContainerStatus( + completedContainerIdFromNode2_1, null, ContainerState.COMPLETE); + ContainerStatus containerStatusFromNode2_2 = getMockContainerStatus( + completedContainerIdFromNode2_2, null, ContainerState.COMPLETE); - doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode1)) .when(statusEventFromNode1).getContainers(); node.handle(statusEventFromNode1); @@ -338,13 +357,9 @@ public void testContainerUpdate() throws InterruptedException{ completedContainers.clear(); - doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode2_1)) .when(statusEventFromNode2_1).getContainers(); - doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2) - .getContainerId(); doReturn(Collections.singletonList(containerStatusFromNode2_2)) .when(statusEventFromNode2_2).getContainers(); @@ -372,80 +387,105 @@ public void testAllocatedContainerUpdate() { //Start the node node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); - NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); + // Make sure that the node starts with no allocated resources + Assert.assertEquals(node.getAllocatedContainerResource(), Resources.none()); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); - ContainerId newContainerId = BuilderUtils.newContainerId( + final ContainerId newContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId(app0, 0), 0); - ContainerId runningContainerId = BuilderUtils.newContainerId( + final ContainerId runningContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId(app0, 0), 1); - ContainerId newOppContainerId = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId(app0, 0), 2); - ContainerId runningOppContainerId = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId(app0, 0), 3); rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); - ContainerStatus newContainerStatusFromNode = mock(ContainerStatus.class); - ContainerStatus runningContainerStatusFromNode = - mock(ContainerStatus.class); + final List containerStatuses = new ArrayList<>(); + + // Use different memory and VCores for new and running state containers + // to test that they add up correctly final Resource newContainerCapability = Resource.newInstance(100, 1); final Resource runningContainerCapability = Resource.newInstance(200, 2); - doReturn(newContainerId).when(newContainerStatusFromNode) - .getContainerId(); - doReturn(ContainerState.NEW).when(newContainerStatusFromNode) - .getState(); - doReturn(newContainerCapability).when(newContainerStatusFromNode) - .getCapability(); - doReturn(runningContainerId).when(runningContainerStatusFromNode) - .getContainerId(); - doReturn(ContainerState.RUNNING).when(runningContainerStatusFromNode) - .getState(); - doReturn(runningContainerCapability).when(runningContainerStatusFromNode) - .getCapability(); - doReturn(Arrays.asList( - newContainerStatusFromNode, runningContainerStatusFromNode)) - .when(statusEventFromNode1).getContainers(); + final Resource completedContainerCapability = + Resource.newInstance(50, 3); + final ContainerStatus newContainerStatusFromNode = getMockContainerStatus( + newContainerId, newContainerCapability, ContainerState.NEW); + final ContainerStatus runningContainerStatusFromNode = + getMockContainerStatus(runningContainerId, runningContainerCapability, + ContainerState.RUNNING); + + containerStatuses.addAll(Arrays.asList( + newContainerStatusFromNode, runningContainerStatusFromNode)); + doReturn(containerStatuses).when(statusEventFromNode1).getContainers(); node.handle(statusEventFromNode1); - Assert.assertTrue(Resources.equals( - node.getAllocatedContainerResource(), - Resource.newInstance(300, 3))); + Assert.assertEquals(node.getAllocatedContainerResource(), + Resource.newInstance(300, 3)); + + final ContainerId newOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 2); + final ContainerId runningOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 3); + // Use the same resource capability as in previous for opportunistic case RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null); - ContainerStatus newOppContainerStatusFromNode = mock(ContainerStatus.class); - ContainerStatus runningOppContainerStatusFromNode = - mock(ContainerStatus.class); - doReturn(newOppContainerId).when(newOppContainerStatusFromNode) - .getContainerId(); - doReturn(ContainerState.NEW).when(newOppContainerStatusFromNode) - .getState(); - doReturn(newContainerCapability).when(newOppContainerStatusFromNode) - .getCapability(); - doReturn(ExecutionType.OPPORTUNISTIC) - .when(newOppContainerStatusFromNode) - .getExecutionType(); - doReturn(runningOppContainerId).when(runningOppContainerStatusFromNode) - .getContainerId(); - doReturn(ContainerState.RUNNING).when(runningOppContainerStatusFromNode) - .getState(); - doReturn(runningContainerCapability).when(runningOppContainerStatusFromNode) - .getCapability(); - doReturn(ExecutionType.OPPORTUNISTIC) - .when(runningOppContainerStatusFromNode) - .getExecutionType(); - doReturn(Arrays.asList( - newContainerStatusFromNode, runningContainerStatusFromNode, - newOppContainerStatusFromNode, runningOppContainerStatusFromNode)) - .when(statusEventFromNode2).getContainers(); + final ContainerStatus newOppContainerStatusFromNode = + getMockContainerStatus(newOppContainerId, newContainerCapability, + ContainerState.NEW, ExecutionType.OPPORTUNISTIC); + final ContainerStatus runningOppContainerStatusFromNode = + getMockContainerStatus(runningOppContainerId, + runningContainerCapability, ContainerState.RUNNING, + ExecutionType.OPPORTUNISTIC); + + containerStatuses.addAll(Arrays.asList( + newOppContainerStatusFromNode, runningOppContainerStatusFromNode)); + + // Pass in both guaranteed and opportunistic container statuses + doReturn(containerStatuses).when(statusEventFromNode2).getContainers(); node.handle(statusEventFromNode2); - Assert.assertTrue(Resources.equals( - node.getAllocatedContainerResource(), - Resource.newInstance(600, 6))); + + // The result here should be double the first check, + // since allocated resources are doubled, just + // with different execution types + Assert.assertEquals(node.getAllocatedContainerResource(), + Resource.newInstance(600, 6)); + + RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null); + final ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 4); + final ContainerId completedOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 5); + final ContainerStatus completedContainerStatusFromNode = + getMockContainerStatus(completedContainerId, completedContainerCapability, + ContainerState.COMPLETE, ExecutionType.OPPORTUNISTIC); + final ContainerStatus completedOppContainerStatusFromNode = + getMockContainerStatus(completedOppContainerId, + completedContainerCapability, ContainerState.COMPLETE, + ExecutionType.OPPORTUNISTIC); + + containerStatuses.addAll(Arrays.asList( + completedContainerStatusFromNode, completedOppContainerStatusFromNode)); + + doReturn(containerStatuses).when(statusEventFromNode3).getContainers(); + node.handle(statusEventFromNode3); + + // Adding completed containers should not have changed + // the resources allocated + Assert.assertEquals(node.getAllocatedContainerResource(), + Resource.newInstance(600, 6)); + + RMNodeStatusEvent emptyStatusEventFromNode = + getMockRMNodeStatusEvent(null); + + doReturn(Collections.emptyList()) + .when(emptyStatusEventFromNode).getContainers(); + node.handle(emptyStatusEventFromNode); + + // Passing an empty containers list should yield no resources allocated + Assert.assertEquals(node.getAllocatedContainerResource(), + Resources.none()); } @Test (timeout = 5000) @@ -466,14 +506,14 @@ public void testStatusChange(){ RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus1 = mock(ContainerStatus.class); - ContainerStatus containerStatus2 = mock(ContainerStatus.class); + ContainerStatus containerStatus1 = getMockContainerStatus( + completedContainerId1, null, null); + ContainerStatus containerStatus2 = getMockContainerStatus( + completedContainerId2, null, null); - doReturn(completedContainerId1).when(containerStatus1).getContainerId(); doReturn(Collections.singletonList(containerStatus1)) .when(statusEvent1).getContainers(); - doReturn(completedContainerId2).when(containerStatus2).getContainerId(); doReturn(Collections.singletonList(containerStatus2)) .when(statusEvent2).getContainers(); @@ -1243,9 +1283,9 @@ public void testForHandlingDuplicatedCompltedContainers() { RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); - ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerStatus containerStatus1 = getMockContainerStatus( + completedContainerId1, null, ContainerState.COMPLETE); - doReturn(completedContainerId1).when(containerStatus1).getContainerId(); doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) .getContainers(); From 2ee6bf8f93f7c4b1825ec367d4fbf76ae66d2fc4 Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Mon, 15 Nov 2021 14:23:08 -0500 Subject: [PATCH 3/4] Correct assertEquals parameter order --- .../resourcemanager/TestRMNodeTransitions.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3a8bf034dcae6..c6a266d47773f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -388,7 +388,7 @@ public void testAllocatedContainerUpdate() { node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); // Make sure that the node starts with no allocated resources - Assert.assertEquals(node.getAllocatedContainerResource(), Resources.none()); + Assert.assertEquals(Resources.none(), node.getAllocatedContainerResource()); ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); final ContainerId newContainerId = BuilderUtils.newContainerId( @@ -420,8 +420,8 @@ public void testAllocatedContainerUpdate() { newContainerStatusFromNode, runningContainerStatusFromNode)); doReturn(containerStatuses).when(statusEventFromNode1).getContainers(); node.handle(statusEventFromNode1); - Assert.assertEquals(node.getAllocatedContainerResource(), - Resource.newInstance(300, 3)); + Assert.assertEquals(Resource.newInstance(300, 3), + node.getAllocatedContainerResource()); final ContainerId newOppContainerId = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId(app0, 0), 2); @@ -449,8 +449,8 @@ public void testAllocatedContainerUpdate() { // The result here should be double the first check, // since allocated resources are doubled, just // with different execution types - Assert.assertEquals(node.getAllocatedContainerResource(), - Resource.newInstance(600, 6)); + Assert.assertEquals(Resource.newInstance(600, 6), + node.getAllocatedContainerResource()); RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null); final ContainerId completedContainerId = BuilderUtils.newContainerId( @@ -473,8 +473,8 @@ public void testAllocatedContainerUpdate() { // Adding completed containers should not have changed // the resources allocated - Assert.assertEquals(node.getAllocatedContainerResource(), - Resource.newInstance(600, 6)); + Assert.assertEquals(Resource.newInstance(600, 6), + node.getAllocatedContainerResource()); RMNodeStatusEvent emptyStatusEventFromNode = getMockRMNodeStatusEvent(null); @@ -484,8 +484,8 @@ public void testAllocatedContainerUpdate() { node.handle(emptyStatusEventFromNode); // Passing an empty containers list should yield no resources allocated - Assert.assertEquals(node.getAllocatedContainerResource(), - Resources.none()); + Assert.assertEquals(Resources.none(), + node.getAllocatedContainerResource()); } @Test (timeout = 5000) From 7457e09b25fece8192063a05305d8152d5f9b9ae Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Tue, 16 Nov 2021 10:30:34 -0500 Subject: [PATCH 4/4] Address review comments, add node transition resource accounting --- .../server/resourcemanager/rmnode/RMNode.java | 7 +- .../resourcemanager/rmnode/RMNodeImpl.java | 13 +++- .../TestRMNodeTransitions.java | 73 +++++++++++++++++++ 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index f6b143fdb374a..5d60b4fbe0678 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -106,8 +106,11 @@ public interface RMNode { public Resource getTotalCapability(); /** - * the total allocated resources to containers. - * @return the total allocated resources. + * The total allocated resources to containers. + * This will include the sum of Guaranteed and Opportunistic + * containers queued + running + paused on the node. + * @return the total allocated resources, including all Guaranteed and + * Opportunistic containers in queued, running and paused states. */ default Resource getAllocatedContainerResource() { return Resources.none(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 025b5b75355dc..b8aaea5de330c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -959,13 +959,22 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { ClusterMetrics.getMetrics().decrDecommisionedNMs(); } containers = startEvent.getNMContainerStatuses(); + final Resource allocatedResource = Resource.newInstance( + Resources.none()); if (containers != null && !containers.isEmpty()) { for (NMContainerStatus container : containers) { - if (container.getContainerState() == ContainerState.RUNNING) { - rmNode.launchedContainers.add(container.getContainerId()); + if (container.getContainerState() == ContainerState.NEW || + container.getContainerState() == ContainerState.RUNNING) { + Resources.addTo(allocatedResource, + container.getAllocatedResource()); + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } } } } + + rmNode.allocatedContainerResource = allocatedResource; } if (null != startEvent.getRunningApplications()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index c6a266d47773f..db14d422f6eb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -46,11 +46,14 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -251,6 +254,14 @@ private static ContainerStatus getMockContainerStatus( return containerStatus; } + private static NMContainerStatus createNMContainerStatus( + final ContainerId containerId, final ExecutionType executionType, + final ContainerState containerState, final Resource capability) { + return NMContainerStatus.newInstance(containerId, 0, containerState, + capability, "", 0, Priority.newInstance(0), 0, + CommonNodeLabelsManager.NO_LABEL, executionType, -1); + } + @Test (timeout = 5000) public void testExpiredContainer() { NodeStatus mockNodeStatus = createMockNodeStatus(); @@ -375,6 +386,68 @@ public void testContainerUpdate() throws InterruptedException{ .getContainerId()); } + /** + * Tests that allocated resources are counted correctly on new nodes + * that are added to the cluster. + */ + @Test + public void testAddWithAllocatedContainers() { + NodeStatus mockNodeStatus = createMockNodeStatus(); + RMNodeImpl node = getNewNode(); + ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); + + // Independently computed expected allocated resource to verify against + final Resource expectedResource = Resource.newInstance(Resources.none()); + + // Guaranteed containers + final ContainerId newContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 0); + final Resource newContainerCapability = + Resource.newInstance(100, 1); + Resources.addTo(expectedResource, newContainerCapability); + final NMContainerStatus newContainerStatus = createNMContainerStatus( + newContainerId, ExecutionType.GUARANTEED, + ContainerState.NEW, newContainerCapability); + + final ContainerId runningContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 1); + final Resource runningContainerCapability = + Resource.newInstance(200, 2); + Resources.addTo(expectedResource, runningContainerCapability); + final NMContainerStatus runningContainerStatus = createNMContainerStatus( + runningContainerId, ExecutionType.GUARANTEED, + ContainerState.RUNNING, runningContainerCapability); + + // Opportunistic containers + final ContainerId newOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 2); + final Resource newOppContainerCapability = + Resource.newInstance(300, 3); + Resources.addTo(expectedResource, newOppContainerCapability); + final NMContainerStatus newOppContainerStatus = createNMContainerStatus( + newOppContainerId, ExecutionType.OPPORTUNISTIC, + ContainerState.NEW, newOppContainerCapability); + + final ContainerId runningOppContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(app0, 0), 3); + final Resource runningOppContainerCapability = + Resource.newInstance(400, 4); + Resources.addTo(expectedResource, runningOppContainerCapability); + final NMContainerStatus runningOppContainerStatus = createNMContainerStatus( + runningOppContainerId, ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, runningOppContainerCapability); + + node.handle(new RMNodeStartedEvent(node.getNodeID(), + Arrays.asList(newContainerStatus, runningContainerStatus, + newOppContainerStatus, runningOppContainerStatus), + null, mockNodeStatus)); + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + Assert.assertEquals(expectedResource, node.getAllocatedContainerResource()); + } + /** * Tests that allocated container resources are counted correctly in * {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode}