Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1038,10 +1038,9 @@ private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
// Two conditions need to meet when trying to allocate:
// 1) Node doesn't have reserved container
// 2) Node's available-resource + killable-resource should > 0
boolean accept = node.getReservedContainer() == null && Resources
.greaterThanOrEqual(resourceCalculator, clusterResource, Resources
.add(node.getUnallocatedResource(),
node.getTotalKillableResources()), minimumAllocation);
boolean accept = node.getReservedContainer() == null &&
Resources.fitsIn(resourceCalculator, minimumAllocation,
Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
if (!accept) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueuePath(), ActivityState.REJECTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ public RegularContainerAllocator(FiCaSchedulerApp application,
ActivitiesManager activitiesManager) {
super(application, rc, rmContext, activitiesManager);
}

private boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required,
String nodePartition) {

private boolean checkHeadroom(ResourceLimits currentResourceLimits,
Resource required, String nodePartition) {
// If headroom + currentReservation < required, we cannot allocate this
// require
Resource resourceCouldBeUnReserved =
Expand All @@ -86,9 +85,8 @@ private boolean checkHeadroom(Resource clusterResource,
// we won't allow to unreserve before allocation.
resourceCouldBeUnReserved = Resources.none();
}
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
required);
return Resources.fitsIn(rc, required,
Resources.add(currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved));
}

/*
Expand All @@ -97,8 +95,7 @@ private boolean checkHeadroom(Resource clusterResource,
* We will consider stuffs like exclusivity, pending resource, node partition,
* headroom, etc.
*/
private ContainerAllocation preCheckForNodeCandidateSet(
Resource clusterResource, FiCaSchedulerNode node,
private ContainerAllocation preCheckForNodeCandidateSet(FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) {
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
Expand Down Expand Up @@ -168,8 +165,7 @@ private ContainerAllocation preCheckForNodeCandidateSet(
}
}

if (!checkHeadroom(clusterResource, resourceLimits, required,
node.getPartition())) {
if (!checkHeadroom(resourceLimits, required, node.getPartition())) {
LOG.debug("cannot allocate required resource={} because of headroom",
required);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(
Expand Down Expand Up @@ -857,7 +853,7 @@ private ContainerAllocation allocate(Resource clusterResource,
FiCaSchedulerNode node = iter.next();

if (reservedContainer == null) {
result = preCheckForNodeCandidateSet(clusterResource, node,
result = preCheckForNodeCandidateSet(node,
schedulingMode, resourceLimits, schedulerKey);
if (null != result) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,62 @@ public void testSingleQueueWithOneUser() throws Exception {
assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
a.getMetrics().getAvailableMB());
}

@Test
public void testHeadroomCheckWithDRF() throws Exception {
CSAssignment assignment;
setUpWithDominantResourceCalculator();
// Mock the queue
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
// Users
final String user0 = "user_0";

// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 100 * GB, 100);

int numNodes = 1;
Resource clusterResource =
Resources.createResource(numNodes * (100 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

// Increase the user-limit-factor to make user_0 fully use max resources of the queue.
// The max resources can be used are 0.99 * [100 * GB, 100]
b.setUserLimitFactor(10.0f);

Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
ImmutableMap.of(app0.getApplicationAttemptId(), app0);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(), node0);

Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 90 * GB, 10, 1, true,
priority, recordFactory, NO_LABEL)));
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
applyCSAssignment(clusterResource, assignment, b, nodes, apps);
verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);

app0.updateResourceRequests(Collections.singletonList(TestUtils
.createResourceRequest(ResourceRequest.ANY, 10 * GB, 10, 1, true,
priority, recordFactory, NO_LABEL)));
assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
// This assignment should have no containers assigned,
// because the used memory (90 + 10)GB will exceed the max 99GB
verifyNoContainerAllocated(assignment);
}

@Test
public void testDRFUsageRatioRounding() throws Exception {
CSAssignment assign;
Expand Down