Skip to content

Commit f2ea555

Browse files
committed
YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan.
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. Contributed by Wangda Tan.
1 parent 466f087 commit f2ea555

File tree

57 files changed

+2867
-796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2867
-796
lines changed

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Set;
2324

2425
import org.apache.hadoop.classification.InterfaceAudience.Private;
2526
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -159,6 +160,10 @@ public String getNodeManagerVersion() {
159160
return null;
160161
}
161162

163+
@Override
164+
public Set<String> getNodeLabels() {
165+
return null;
166+
}
162167
}
163168

164169
public static RMNode newNodeInfo(String rackName, String hostName,

hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import java.util.Collections;
3535
import java.util.List;
36+
import java.util.Set;
3637

3738
@Private
3839
@Unstable
@@ -147,4 +148,8 @@ public String getNodeManagerVersion() {
147148
return node.getNodeManagerVersion();
148149
}
149150

151+
@Override
152+
public Set<String> getNodeLabels() {
153+
return null;
154+
}
150155
}

hadoop-yarn-project/CHANGES.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,12 @@ Release 2.6.0 - UNRELEASED
165165
YARN-2656. Made RM web services authentication filter support proxy user.
166166
(Varun Vasudev and Zhijie Shen via zjshen)
167167

168+
YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating
169+
resources based on node-labels. (Wangda Tan via vinodkv)
170+
171+
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
172+
based on node-labels. (Wangda Tan via vinodkv)
173+
168174
IMPROVEMENTS
169175

170176
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,23 @@
188188
</Or>
189189
<Bug pattern="IS2_INCONSISTENT_SYNC" />
190190
</Match>
191+
<Match>
192+
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue" />
193+
<Or>
194+
<Field name="absoluteCapacity" />
195+
<Field name="absoluteMaxCapacity" />
196+
<Field name="acls" />
197+
<Field name="capacity" />
198+
<Field name="maximumCapacity" />
199+
<Field name="state" />
200+
<Field name="labelManager" />
201+
<Field name="defaultLabelExpression" />
202+
<Field name="accessibleLabels" />
203+
<Field name="absoluteNodeLabelCapacities" />
204+
<Field name="reservationsContinueLooking" />
205+
</Or>
206+
<Bug pattern="IS2_INCONSISTENT_SYNC" />
207+
</Match>
191208
<!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed -->
192209
<Match>
193210
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" />

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.yarn.api.records.AMCommand;
5050
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
5151
import org.apache.hadoop.yarn.api.records.ApplicationId;
52+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
5253
import org.apache.hadoop.yarn.api.records.Container;
5354
import org.apache.hadoop.yarn.api.records.ContainerId;
5455
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -254,13 +255,13 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
254255
if (hasApplicationMasterRegistered(applicationAttemptId)) {
255256
String message =
256257
"Application Master is already registered : "
257-
+ applicationAttemptId.getApplicationId();
258+
+ appID;
258259
LOG.warn(message);
259260
RMAuditLogger.logFailure(
260261
this.rmContext.getRMApps()
261-
.get(applicationAttemptId.getApplicationId()).getUser(),
262+
.get(appID).getUser(),
262263
AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
263-
applicationAttemptId.getApplicationId(), applicationAttemptId);
264+
appID, applicationAttemptId);
264265
throw new InvalidApplicationMasterRequestException(message);
265266
}
266267

@@ -340,6 +341,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
340341

341342
ApplicationAttemptId applicationAttemptId =
342343
authorizeRequest().getApplicationAttemptId();
344+
ApplicationId appId = applicationAttemptId.getApplicationId();
343345

344346
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
345347
if (lock == null) {
@@ -351,21 +353,21 @@ public FinishApplicationMasterResponse finishApplicationMaster(
351353
if (!hasApplicationMasterRegistered(applicationAttemptId)) {
352354
String message =
353355
"Application Master is trying to unregister before registering for: "
354-
+ applicationAttemptId.getApplicationId();
356+
+ appId;
355357
LOG.error(message);
356358
RMAuditLogger.logFailure(
357359
this.rmContext.getRMApps()
358-
.get(applicationAttemptId.getApplicationId()).getUser(),
360+
.get(appId).getUser(),
359361
AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
360-
message, applicationAttemptId.getApplicationId(),
362+
message, appId,
361363
applicationAttemptId);
362364
throw new ApplicationMasterNotRegisteredException(message);
363365
}
364366

365367
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
366368

367369
RMApp rmApp =
368-
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
370+
rmContext.getRMApps().get(appId);
369371

370372
if (rmApp.isAppFinalStateStored()) {
371373
return FinishApplicationMasterResponse.newInstance(true);
@@ -418,6 +420,7 @@ public AllocateResponse allocate(AllocateRequest request)
418420

419421
ApplicationAttemptId appAttemptId =
420422
amrmTokenIdentifier.getApplicationAttemptId();
423+
ApplicationId applicationId = appAttemptId.getApplicationId();
421424

422425
this.amLivelinessMonitor.receivedPing(appAttemptId);
423426

@@ -432,14 +435,14 @@ public AllocateResponse allocate(AllocateRequest request)
432435
if (!hasApplicationMasterRegistered(appAttemptId)) {
433436
String message =
434437
"Application Master is not registered for known application: "
435-
+ appAttemptId.getApplicationId()
438+
+ applicationId
436439
+ ". Let AM resync.";
437440
LOG.info(message);
438441
RMAuditLogger.logFailure(
439-
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
442+
this.rmContext.getRMApps().get(applicationId)
440443
.getUser(), AuditConstants.REGISTER_AM, "",
441444
"ApplicationMasterService", message,
442-
appAttemptId.getApplicationId(),
445+
applicationId,
443446
appAttemptId);
444447
return resync;
445448
}
@@ -481,11 +484,22 @@ public AllocateResponse allocate(AllocateRequest request)
481484
List<String> blacklistRemovals =
482485
(blacklistRequest != null) ?
483486
blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
484-
487+
RMApp app =
488+
this.rmContext.getRMApps().get(applicationId);
489+
490+
// set label expression for Resource Requests
491+
ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
492+
for (ResourceRequest req : ask) {
493+
if (null == req.getNodeLabelExpression()) {
494+
req.setNodeLabelExpression(asc.getNodeLabelExpression());
495+
}
496+
}
497+
485498
// sanity check
486499
try {
487500
RMServerUtils.validateResourceRequests(ask,
488-
rScheduler.getMaximumResourceCapability());
501+
rScheduler.getMaximumResourceCapability(), app.getQueue(),
502+
rScheduler);
489503
} catch (InvalidResourceRequestException e) {
490504
LOG.warn("Invalid resource ask by application " + appAttemptId, e);
491505
throw e;
@@ -498,8 +512,6 @@ public AllocateResponse allocate(AllocateRequest request)
498512
throw e;
499513
}
500514

501-
RMApp app =
502-
this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
503515
// In the case of work-preserving AM restart, it's possible for the
504516
// AM to release containers from the earlier attempt.
505517
if (!app.getApplicationSubmissionContext()
@@ -582,7 +594,7 @@ public AllocateResponse allocate(AllocateRequest request)
582594
.toString(), amrmToken.getPassword(), amrmToken.getService()
583595
.toString()));
584596
LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
585-
+ " to application: " + appAttemptId.getApplicationId());
597+
+ " to application: " + applicationId);
586598
}
587599

588600
/*

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,15 +343,15 @@ private RMAppImpl createAndPopulateNewRMApp(
343343
long submitTime, String user)
344344
throws YarnException {
345345
ApplicationId applicationId = submissionContext.getApplicationId();
346-
validateResourceRequest(submissionContext);
346+
ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
347347
// Create RMApp
348348
RMAppImpl application =
349349
new RMAppImpl(applicationId, rmContext, this.conf,
350350
submissionContext.getApplicationName(), user,
351351
submissionContext.getQueue(),
352352
submissionContext, this.scheduler, this.masterService,
353353
submitTime, submissionContext.getApplicationType(),
354-
submissionContext.getApplicationTags());
354+
submissionContext.getApplicationTags(), amReq);
355355

356356
// Concurrent app submissions with same applicationId will fail here
357357
// Concurrent app submissions with different applicationIds will not
@@ -373,7 +373,7 @@ private RMAppImpl createAndPopulateNewRMApp(
373373
return application;
374374
}
375375

376-
private void validateResourceRequest(
376+
private ResourceRequest validateAndCreateResourceRequest(
377377
ApplicationSubmissionContext submissionContext)
378378
throws InvalidResourceRequestException {
379379
// Validation of the ApplicationSubmissionContext needs to be completed
@@ -383,18 +383,36 @@ private void validateResourceRequest(
383383

384384
// Check whether AM resource requirements are within required limits
385385
if (!submissionContext.getUnmanagedAM()) {
386-
ResourceRequest amReq = BuilderUtils.newResourceRequest(
387-
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
388-
submissionContext.getResource(), 1);
386+
ResourceRequest amReq;
387+
if (submissionContext.getAMContainerResourceRequest() != null) {
388+
amReq = submissionContext.getAMContainerResourceRequest();
389+
} else {
390+
amReq =
391+
BuilderUtils.newResourceRequest(
392+
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
393+
submissionContext.getResource(), 1);
394+
}
395+
396+
// set label expression for AM container
397+
if (null == amReq.getNodeLabelExpression()) {
398+
amReq.setNodeLabelExpression(submissionContext
399+
.getNodeLabelExpression());
400+
}
401+
389402
try {
390403
SchedulerUtils.validateResourceRequest(amReq,
391-
scheduler.getMaximumResourceCapability());
404+
scheduler.getMaximumResourceCapability(),
405+
submissionContext.getQueue(), scheduler);
392406
} catch (InvalidResourceRequestException e) {
393407
LOG.warn("RM app submission failed in validating AM resource request"
394408
+ " for application " + submissionContext.getApplicationId(), e);
395409
throw e;
396410
}
411+
412+
return amReq;
397413
}
414+
415+
return null;
398416
}
399417

400418
private boolean isApplicationInFinalState(RMAppState rmAppState) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hadoop.yarn.event.Dispatcher;
2828
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
2929
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
30+
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
3031
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
3132
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
3233
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -108,6 +109,10 @@ void setRMApplicationHistoryWriter(
108109

109110
boolean isWorkPreservingRecoveryEnabled();
110111

112+
RMNodeLabelsManager getNodeLabelManager();
113+
114+
public void setNodeLabelManager(RMNodeLabelsManager mgr);
115+
111116
long getEpoch();
112117

113118
ReservationSystem getReservationSystem();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.hadoop.yarn.event.Dispatcher;
3535
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
3636
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
37+
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
3738
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
3839
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
3940
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -91,6 +92,7 @@ public class RMContextImpl implements RMContext {
9192
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
9293
private SystemMetricsPublisher systemMetricsPublisher;
9394
private ConfigurationProvider configurationProvider;
95+
private RMNodeLabelsManager nodeLabelManager;
9496
private long epoch;
9597
private Clock systemClock = new SystemClock();
9698
private long schedulerRecoveryStartTime = 0;
@@ -406,6 +408,16 @@ void setEpoch(long epoch) {
406408
this.epoch = epoch;
407409
}
408410

411+
@Override
412+
public RMNodeLabelsManager getNodeLabelManager() {
413+
return nodeLabelManager;
414+
}
415+
416+
@Override
417+
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
418+
nodeLabelManager = mgr;
419+
}
420+
409421
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
410422
this.schedulerRecoveryStartTime = systemClock.getTime();
411423
this.schedulerRecoveryWaitTime = waitTime;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
4545
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
4646
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
47+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
4748
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
4849
import org.apache.hadoop.yarn.util.resource.Resources;
4950

@@ -84,9 +85,11 @@ public static List<RMNode> queryRMNodes(RMContext context,
8485
* requested memory/vcore is non-negative and not greater than max
8586
*/
8687
public static void validateResourceRequests(List<ResourceRequest> ask,
87-
Resource maximumResource) throws InvalidResourceRequestException {
88+
Resource maximumResource, String queueName, YarnScheduler scheduler)
89+
throws InvalidResourceRequestException {
8890
for (ResourceRequest resReq : ask) {
89-
SchedulerUtils.validateResourceRequest(resReq, maximumResource);
91+
SchedulerUtils.validateResourceRequest(resReq, maximumResource,
92+
queueName, scheduler);
9093
}
9194
}
9295

@@ -132,17 +135,25 @@ public static void validateBlacklistRequest(
132135
}
133136
}
134137

138+
public static UserGroupInformation verifyAccess(
139+
AccessControlList acl, String method, final Log LOG)
140+
throws IOException {
141+
// by default, this method will use AdminService as module name
142+
return verifyAccess(acl, method, "AdminService", LOG);
143+
}
144+
135145
/**
136146
* Utility method to verify if the current user has access based on the
137147
* passed {@link AccessControlList}
138148
* @param acl the {@link AccessControlList} to check against
139149
* @param method the method name to be logged
150+
* @param module, like AdminService or NodeLabelManager
140151
* @param LOG the logger to use
141152
* @return {@link UserGroupInformation} of the current user
142153
* @throws IOException
143154
*/
144155
public static UserGroupInformation verifyAccess(
145-
AccessControlList acl, String method, final Log LOG)
156+
AccessControlList acl, String method, String module, final Log LOG)
146157
throws IOException {
147158
UserGroupInformation user;
148159
try {
@@ -159,7 +170,7 @@ public static UserGroupInformation verifyAccess(
159170
" to call '" + method + "'");
160171

161172
RMAuditLogger.logFailure(user.getShortUserName(), method,
162-
acl.toString(), "AdminService",
173+
acl.toString(), module,
163174
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
164175

165176
throw new AccessControlException("User " + user.getShortUserName() +

0 commit comments

Comments
 (0)