Skip to content

Commit d8026e3

Browse files
authored
YARN-10903. Fix the headroom check in ParentQueue and RegularContainerAllocator for DRF (#3352)
Contributed by Jie Wang <jie.wang@hulu.com>
1 parent edfde6e commit d8026e3

File tree

3 files changed

+67
-16
lines changed

3 files changed

+67
-16
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,10 +1038,9 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
10381038
// Two conditions need to meet when trying to allocate:
10391039
// 1) Node doesn't have reserved container
10401040
// 2) Node's available-resource + killable-resource should > 0
1041-
boolean accept = node.getReservedContainer() == null && Resources
1042-
.greaterThanOrEqual(resourceCalculator, clusterResource, Resources
1043-
.add(node.getUnallocatedResource(),
1044-
node.getTotalKillableResources()), minimumAllocation);
1041+
boolean accept = node.getReservedContainer() == null &&
1042+
Resources.fitsIn(resourceCalculator, minimumAllocation,
1043+
Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
10451044
if (!accept) {
10461045
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
10471046
getParentName(), getQueuePath(), ActivityState.REJECTED,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,9 @@ public RegularContainerAllocator(FiCaSchedulerApp application,
7373
ActivitiesManager activitiesManager) {
7474
super(application, rc, rmContext, activitiesManager);
7575
}
76-
77-
private boolean checkHeadroom(Resource clusterResource,
78-
ResourceLimits currentResourceLimits, Resource required,
79-
String nodePartition) {
76+
77+
private boolean checkHeadroom(ResourceLimits currentResourceLimits,
78+
Resource required, String nodePartition) {
8079
// If headroom + currentReservation < required, we cannot allocate this
8180
// require
8281
Resource resourceCouldBeUnReserved =
@@ -86,9 +85,8 @@ private boolean checkHeadroom(Resource clusterResource,
8685
// we won't allow to unreserve before allocation.
8786
resourceCouldBeUnReserved = Resources.none();
8887
}
89-
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
90-
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
91-
required);
88+
return Resources.fitsIn(rc, required,
89+
Resources.add(currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved));
9290
}
9391

9492
/*
@@ -97,8 +95,7 @@ private boolean checkHeadroom(Resource clusterResource,
9795
* We will consider stuffs like exclusivity, pending resource, node partition,
9896
* headroom, etc.
9997
*/
100-
private ContainerAllocation preCheckForNodeCandidateSet(
101-
Resource clusterResource, FiCaSchedulerNode node,
98+
private ContainerAllocation preCheckForNodeCandidateSet(FiCaSchedulerNode node,
10299
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
103100
SchedulerRequestKey schedulerKey) {
104101
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
@@ -168,8 +165,7 @@ private ContainerAllocation preCheckForNodeCandidateSet(
168165
}
169166
}
170167

171-
if (!checkHeadroom(clusterResource, resourceLimits, required,
172-
node.getPartition())) {
168+
if (!checkHeadroom(resourceLimits, required, node.getPartition())) {
173169
LOG.debug("cannot allocate required resource={} because of headroom",
174170
required);
175171
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(
@@ -857,7 +853,7 @@ private ContainerAllocation allocate(Resource clusterResource,
857853
FiCaSchedulerNode node = iter.next();
858854

859855
if (reservedContainer == null) {
860-
result = preCheckForNodeCandidateSet(clusterResource, node,
856+
result = preCheckForNodeCandidateSet(node,
861857
schedulingMode, resourceLimits, schedulerKey);
862858
if (null != result) {
863859
continue;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,62 @@ public void testSingleQueueWithOneUser() throws Exception {
800800
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
801801
a.getMetrics().getAvailableMB());
802802
}
803+
804+
@Test
805+
public void testHeadroomCheckWithDRF() throws Exception {
806+
CSAssignment assignment;
807+
setUpWithDominantResourceCalculator();
808+
// Mock the queue
809+
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
810+
// Users
811+
final String user0 = "user_0";
812+
813+
// Submit applications
814+
final ApplicationAttemptId appAttemptId0 =
815+
TestUtils.getMockApplicationAttemptId(0, 0);
816+
FiCaSchedulerApp app0 =
817+
new FiCaSchedulerApp(appAttemptId0, user0, b,
818+
b.getAbstractUsersManager(), spyRMContext);
819+
b.submitApplicationAttempt(app0, user0);
820+
// Setup some nodes
821+
String host0 = "127.0.0.1";
822+
FiCaSchedulerNode node0 =
823+
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 100 * GB, 100);
824+
825+
int numNodes = 1;
826+
Resource clusterResource =
827+
Resources.createResource(numNodes * (100 * GB), numNodes * 100);
828+
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
829+
root.updateClusterResource(clusterResource,
830+
new ResourceLimits(clusterResource));
831+
832+
// Increase the user-limit-factor to make user_0 fully use max resources of the queue.
833+
// The max resources can be used are 0.99 * [100 * GB, 100]
834+
b.setUserLimitFactor(10.0f);
835+
836+
Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
837+
ImmutableMap.of(app0.getApplicationAttemptId(), app0);
838+
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(), node0);
839+
840+
Priority priority = TestUtils.createMockPriority(1);
841+
app0.updateResourceRequests(Collections.singletonList(TestUtils
842+
.createResourceRequest(ResourceRequest.ANY, 90 * GB, 10, 1, true,
843+
priority, recordFactory, NO_LABEL)));
844+
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
845+
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
846+
applyCSAssignment(clusterResource, assignment, b, nodes, apps);
847+
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
848+
849+
app0.updateResourceRequests(Collections.singletonList(TestUtils
850+
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 10, 1, true,
851+
priority, recordFactory, NO_LABEL)));
852+
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
853+
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
854+
// This assignment should have no containers assigned,
855+
// because the used memory (90 + 10)GB will exceed the max 99GB
856+
verifyNoContainerAllocated(assignment);
857+
}
858+
803859
@Test
804860
public void testDRFUsageRatioRounding() throws Exception {
805861
CSAssignment assign;

0 commit comments

Comments
 (0)