Skip to content

Commit ef950b0

Browse files
YARN-9290. Invalid SchedulingRequest not rejected in Scheduler PlacementConstraintsHandler. Contributed by Prabhu Joseph
1 parent 828ab40 commit ef950b0

File tree

16 files changed

+125
-12
lines changed

16 files changed

+125
-12
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,9 @@ public void allocate(ApplicationAttemptId appAttemptId,
357357

358358
response.setContainersFromPreviousAttempts(
359359
allocation.getPreviousAttemptContainers());
360+
361+
response.setRejectedSchedulingRequests(allocation.getRejectedRequest());
362+
360363
}
361364

362365
private void handleInvalidResourceException(InvalidResourceRequestException e,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.yarn.api.records.NMToken;
2727
import org.apache.hadoop.yarn.api.records.Resource;
2828
import org.apache.hadoop.yarn.api.records.ResourceRequest;
29+
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
2930

3031
public class Allocation {
3132

@@ -40,7 +41,7 @@ public class Allocation {
4041
final List<Container> demotedContainers;
4142
private final List<Container> previousAttemptContainers;
4243
private Resource resourceLimit;
43-
44+
private List<RejectedSchedulingRequest> rejectedRequest;
4445

4546
public Allocation(List<Container> containers, Resource resourceLimit,
4647
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
@@ -52,25 +53,26 @@ public Allocation(List<Container> containers, Resource resourceLimit,
5253
public Allocation(List<Container> containers, Resource resourceLimit,
5354
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
5455
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
55-
this(containers, resourceLimit,strictContainers, fungibleContainers,
56-
fungibleResources, nmTokens, null, null, null, null, null);
56+
this(containers, resourceLimit, strictContainers, fungibleContainers,
57+
fungibleResources, nmTokens, null, null, null, null, null, null);
5758
}
5859

5960
public Allocation(List<Container> containers, Resource resourceLimit,
6061
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
6162
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
6263
List<Container> increasedContainers, List<Container> decreasedContainer) {
63-
this(containers, resourceLimit,strictContainers, fungibleContainers,
64+
this(containers, resourceLimit, strictContainers, fungibleContainers,
6465
fungibleResources, nmTokens, increasedContainers, decreasedContainer,
65-
null, null, null);
66+
null, null, null, null);
6667
}
6768

6869
public Allocation(List<Container> containers, Resource resourceLimit,
6970
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
7071
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
7172
List<Container> increasedContainers, List<Container> decreasedContainer,
7273
List<Container> promotedContainers, List<Container> demotedContainer,
73-
List<Container> previousAttemptContainers) {
74+
List<Container> previousAttemptContainers, List<RejectedSchedulingRequest>
75+
rejectedRequest) {
7476
this.containers = containers;
7577
this.resourceLimit = resourceLimit;
7678
this.strictContainers = strictContainers;
@@ -82,6 +84,7 @@ public Allocation(List<Container> containers, Resource resourceLimit,
8284
this.promotedContainers = promotedContainers;
8385
this.demotedContainers = demotedContainer;
8486
this.previousAttemptContainers = previousAttemptContainers;
87+
this.rejectedRequest = rejectedRequest;
8588
}
8689

8790
public List<Container> getContainers() {
@@ -128,6 +131,10 @@ public List<Container> getPreviousAttemptContainers() {
128131
return previousAttemptContainers;
129132
}
130133

134+
public List<RejectedSchedulingRequest> getRejectedRequest() {
135+
return rejectedRequest;
136+
}
137+
131138
@VisibleForTesting
132139
public void setResourceLimit(Resource resource) {
133140
this.resourceLimit = resource;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.concurrent.atomic.AtomicLong;
3333
import java.util.concurrent.locks.ReentrantReadWriteLock;
34+
import java.util.stream.Collectors;
3435

3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
@@ -42,7 +43,10 @@
4243
import org.apache.hadoop.yarn.api.records.ExecutionType;
4344
import org.apache.hadoop.yarn.api.records.Resource;
4445
import org.apache.hadoop.yarn.api.records.ResourceRequest;
46+
import org.apache.hadoop.yarn.api.records.RejectionReason;
47+
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
4548
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
49+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
4650
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
4751
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
4852
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -98,6 +102,7 @@ public class AppSchedulingInfo {
98102
public final ContainerUpdateContext updateContext;
99103
private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
100104
private final RMContext rmContext;
105+
private final int retryAttempts;
101106

102107
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
103108
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
@@ -113,6 +118,9 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
113118
this.appResourceUsage = appResourceUsage;
114119
this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
115120
this.rmContext = rmContext;
121+
this.retryAttempts = rmContext.getYarnConfiguration().getInt(
122+
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
123+
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
116124

117125
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
118126
updateContext = new ContainerUpdateContext(this);
@@ -496,6 +504,20 @@ public List<SchedulingRequest> getAllSchedulingRequests() {
496504
return ret;
497505
}
498506

507+
public List<RejectedSchedulingRequest> getRejectedRequest() {
508+
this.readLock.lock();
509+
try {
510+
return schedulerKeyToAppPlacementAllocator.values().stream()
511+
.filter(ap -> ap.getPlacementAttempt() >= retryAttempts)
512+
.map(ap -> RejectedSchedulingRequest.newInstance(
513+
RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
514+
ap.getSchedulingRequest()))
515+
.collect(Collectors.toList());
516+
} finally {
517+
this.readLock.unlock();
518+
}
519+
}
520+
499521
public PendingAsk getNextPendingAsk() {
500522
readLock.lock();
501523
try {
@@ -780,8 +802,8 @@ public boolean precheckNode(SchedulerRequestKey schedulerKey,
780802
try {
781803
AppPlacementAllocator ap =
782804
schedulerKeyToAppPlacementAllocator.get(schedulerKey);
783-
return (ap != null) && ap.precheckNode(schedulerNode,
784-
schedulingMode, dcOpt);
805+
return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) &&
806+
ap.precheckNode(schedulerNode, schedulingMode, dcOpt);
785807
} finally {
786808
this.readLock.unlock();
787809
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator,
821821
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
822822
newlyIncreasedContainers, newlyDecreasedContainers,
823823
newlyPromotedContainers, newlyDemotedContainers,
824-
previousAttemptContainers);
824+
previousAttemptContainers, appSchedulingInfo.getRejectedRequest());
825825
} finally {
826826
writeLock.unlock();
827827
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -951,7 +951,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
951951
updatedNMTokens, null, null,
952952
application.pullNewlyPromotedContainers(),
953953
application.pullNewlyDemotedContainers(),
954-
previousAttemptContainers);
954+
previousAttemptContainers, null);
955955
}
956956

957957
private List<MaxResourceValidationResult> validateResourceRequests(

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Iterator;
3535
import java.util.Map;
3636
import java.util.Optional;
37+
import java.util.concurrent.atomic.AtomicInteger;
3738

3839
/**
3940
* <p>
@@ -57,6 +58,7 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
5758
protected AppSchedulingInfo appSchedulingInfo;
5859
protected SchedulerRequestKey schedulerRequestKey;
5960
protected RMContext rmContext;
61+
private AtomicInteger placementAttempt = new AtomicInteger(0);
6062

6163
/**
6264
* Get iterator of preferred node depends on requirement and/or availability.
@@ -205,4 +207,12 @@ public void initialize(AppSchedulingInfo appSchedulingInfo,
205207
* @return SchedulingRequest
206208
*/
207209
public abstract SchedulingRequest getSchedulingRequest();
210+
211+
public int getPlacementAttempt() {
212+
return placementAttempt.get();
213+
}
214+
215+
public void incrementPlacementAttempt() {
216+
placementAttempt.getAndIncrement();
217+
}
208218
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ private boolean checkCardinalityAndPending(SchedulerNode node,
363363
placementConstraintManager, allocationTagsManager, dcOpt);
364364
} catch (InvalidAllocationTagsQueryException e) {
365365
LOG.warn("Failed to query node cardinality:", e);
366+
this.incrementPlacementAttempt();
366367
return false;
367368
}
368369
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.yarn.api.records.Priority;
2929
import org.apache.hadoop.yarn.api.records.Resource;
3030
import org.apache.hadoop.yarn.api.records.ResourceRequest;
31+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3132
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
3233
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
3334
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
@@ -44,10 +45,12 @@ public void testBacklistChanged() {
4445
ApplicationAttemptId.newInstance(appIdImpl, 1);
4546

4647
FSLeafQueue queue = mock(FSLeafQueue.class);
48+
RMContext rmContext = mock(RMContext.class);
4749
doReturn("test").when(queue).getQueueName();
50+
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
4851
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
4952
"test", queue, null, 0, new ResourceUsage(),
50-
new HashMap<String, String>(), null);
53+
new HashMap<String, String>(), rmContext);
5154

5255
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
5356
new ArrayList<String>());
@@ -117,9 +120,11 @@ public void testSchedulerKeyAccounting() {
117120

118121
Queue queue = mock(Queue.class);
119122
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
123+
RMContext rmContext = mock(RMContext.class);
124+
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
120125
AppSchedulingInfo info = new AppSchedulingInfo(
121126
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
122-
new ResourceUsage(), new HashMap<>(), mock(RMContext.class));
127+
new ResourceUsage(), new HashMap<>(), rmContext);
123128
Assert.assertEquals(0, info.getSchedulerKeys().size());
124129

125130
Priority pri1 = Priority.newInstance(1);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public void testActiveUsersWhenMove() {
7575
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
7676
RMContext rmContext = mock(RMContext.class);
7777
when(rmContext.getEpoch()).thenReturn(3L);
78+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
7879
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
7980
user, queue1, queue1.getAbstractUsersManager(), rmContext);
8081

@@ -121,6 +122,7 @@ public void testMove() {
121122
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
122123
RMContext rmContext = mock(RMContext.class);
123124
when(rmContext.getEpoch()).thenReturn(3L);
125+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
124126
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
125127
user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
126128
oldMetrics.submitApp(user);
@@ -242,6 +244,7 @@ public void testAppPercentages() throws Exception {
242244
RMContext rmContext = mock(RMContext.class);
243245
when(rmContext.getEpoch()).thenReturn(3L);
244246
when(rmContext.getScheduler()).thenReturn(scheduler);
247+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
245248

246249
final String user = "user1";
247250
Queue queue = createQueue("test", null);
@@ -300,6 +303,7 @@ public void testAppPercentagesOnswitch() throws Exception {
300303
RMContext rmContext = mock(RMContext.class);
301304
when(rmContext.getEpoch()).thenReturn(3L);
302305
when(rmContext.getScheduler()).thenReturn(scheduler);
306+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
303307

304308
final String user = "user1";
305309
Queue queue = createQueue("test", null);
@@ -322,6 +326,7 @@ public void testSchedulingOpportunityOverflow() throws Exception {
322326
Queue queue = createQueue("test", null);
323327
RMContext rmContext = mock(RMContext.class);
324328
when(rmContext.getEpoch()).thenReturn(3L);
329+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
325330
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
326331
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
327332
Priority priority = Priority.newInstance(1);
@@ -347,6 +352,7 @@ public void testHasPendingResourceRequest() throws Exception {
347352
Queue queue = createQueue("test", null);
348353
RMContext rmContext = mock(RMContext.class);
349354
when(rmContext.getEpoch()).thenReturn(3L);
355+
when(rmContext.getYarnConfiguration()).thenReturn(conf);
350356
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
351357
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
352358

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4130,6 +4130,7 @@ public void testApplicationQueuePercent()
41304130
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
41314131
when(nlm.getResourceByLabel(any(), any())).thenReturn(res);
41324132
when(rmContext.getNodeLabelManager()).thenReturn(nlm);
4133+
when(rmContext.getYarnConfiguration()).thenReturn(csConf);
41334134

41344135
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
41354136
// capacity are both 1.0f.

0 commit comments

Comments
 (0)