Skip to content

Commit 80f6438

Browse files
szilard-nemethHarshitGupta11
authored andcommitted
YARN-10996. Fix race condition of User object acquisitions. Contributed by Andras Gyori
1 parent 9811daf commit 80f6438

File tree

4 files changed

+27
-8
lines changed

4 files changed

+27
-8
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131
import java.util.TreeSet;
3232

33+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -419,17 +420,18 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
419420
String userName = app.getUser();
420421
TempUserPerPartition tmpUser = usersPerPartition.get(userName);
421422
if (tmpUser == null) {
422-
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
423-
.getResourceUsage();
423+
// User might have already been removed, but preemption still accounts for this app,
424+
// therefore reinserting the user will not cause a memory leak
425+
User user = tq.leafQueue.getOrCreateUser(userName);
426+
ResourceUsage userResourceUsage = user.getResourceUsage();
424427

425428
// perUserAMUsed was populated with running apps, now we are looping
426429
// through both running and pending apps.
427430
Resource userSpecificAmUsed = perUserAMUsed.get(userName);
428431
amUsed = (userSpecificAmUsed == null)
429432
? Resources.none() : userSpecificAmUsed;
430433

431-
tmpUser = new TempUserPerPartition(
432-
tq.leafQueue.getUser(userName), tq.queueName,
434+
tmpUser = new TempUserPerPartition(user, tq.queueName,
433435
Resources.clone(userResourceUsage.getUsed(partition)),
434436
Resources.clone(amUsed),
435437
Resources.clone(userResourceUsage.getReserved(partition)),

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,11 @@ public User getUser(String userName) {
519519
return usersManager.getUser(userName);
520520
}
521521

522+
@VisibleForTesting
523+
public User getOrCreateUser(String userName) {
524+
return usersManager.getUserAndAddIfAbsent(userName);
525+
}
526+
522527
@Private
523528
public List<AppPriorityACLGroup> getPriorityACLs() {
524529
readLock.lock();
@@ -2007,15 +2012,25 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec,
20072012

20082013
public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
20092014
SchedulerApplicationAttempt application) {
2010-
getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel,
2015+
User user = getUser(application.getUser());
2016+
if (user == null) {
2017+
return;
2018+
}
2019+
2020+
user.getResourceUsage().incAMUsed(nodeLabel,
20112021
resourceToInc);
20122022
// ResourceUsage has its own lock, no addition lock needs here.
20132023
usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
20142024
}
20152025

20162026
public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
20172027
SchedulerApplicationAttempt application) {
2018-
getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel,
2028+
User user = getUser(application.getUser());
2029+
if (user == null) {
2030+
return;
2031+
}
2032+
2033+
user.getResourceUsage().decAMUsed(nodeLabel,
20192034
resourceToDec);
20202035
// ResourceUsage has its own lock, no addition lock needs here.
20212036
usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
@@ -2103,7 +2118,7 @@ public Resource getTotalPendingResourcesConsideringUserLimit(
21032118
for (FiCaSchedulerApp app : getApplications()) {
21042119
String userName = app.getUser();
21052120
if (!userNameToHeadroom.containsKey(userName)) {
2106-
User user = getUser(userName);
2121+
User user = getUsersManager().getUserAndAddIfAbsent(userName);
21072122
Resource headroom = Resources.subtract(
21082123
getResourceLimitForActiveUsers(app.getUser(), clusterResources,
21092124
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ partitionResource, getUsageRatio(nodePartition),
817817
lQueue.getMinimumAllocation());
818818

819819
if (LOG.isDebugEnabled()) {
820+
float weight = lQueue.getUserWeights().getByUser(userName);
820821
LOG.debug("User limit computation for " + userName
821822
+ ", in queue: " + lQueue.getQueuePath()
822823
+ ", userLimitPercent=" + lQueue.getUserLimit()
@@ -834,7 +835,7 @@ partitionResource, getUsageRatio(nodePartition),
834835
+ ", Partition=" + nodePartition
835836
+ ", resourceUsed=" + resourceUsed
836837
+ ", maxUserLimit=" + maxUserLimit
837-
+ ", userWeight=" + getUser(userName).getWeight()
838+
+ ", userWeight=" + weight
838839
);
839840
}
840841
return userLimitResource;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/mockframework/MockApplications.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ private void setupUserToQueueSettings(String label, String queueName,
165165
user.setResourceUsage(userResourceUsage.get(userName));
166166
}
167167
when(queue.getUser(eq(userName))).thenReturn(user);
168+
when(queue.getOrCreateUser(eq(userName))).thenReturn(user);
168169
when(queue.getResourceLimitForAllUsers(eq(userName),
169170
any(Resource.class), anyString(), any(SchedulingMode.class)))
170171
.thenReturn(userLimit);

0 commit comments

Comments
 (0)