Skip to content

Commit f9680d9

Browse files
committed
YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue
to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li
1 parent a56ea01 commit f9680d9

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,19 @@ private synchronized void addApplication(ApplicationId applicationId,
648648
// sanity checks.
649649
CSQueue queue = getQueue(queueName);
650650
if (queue == null) {
651+
//During a restart, this indicates a queue was removed, which is
652+
//not presently supported
653+
if (isAppRecovering) {
654+
//throwing RuntimeException because some other exceptions are caught
655+
//(including YarnRuntimeException) and we want this to force an exit
656+
String queueErrorMsg = "Queue named " + queueName
657+
+ " missing during application recovery."
658+
+ " Queue removal during recovery is not presently supported by the"
659+
+ " capacity scheduler, please restart with all queues configured"
660+
+ " which were present before shutdown/restart.";
661+
LOG.fatal(queueErrorMsg);
662+
throw new RuntimeException(queueErrorMsg);
663+
}
651664
String message = "Application " + applicationId +
652665
" submitted by user " + user + " to unknown queue: " + queueName;
653666
this.rmContext.getDispatcher().getEventHandler()

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
249249
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
250250
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
251251
}
252+
253+
public RMApp submitApp(int masterMemory, String name, String user,
254+
Map<ApplicationAccessType, String> acls, String queue,
255+
boolean waitForAccepted) throws Exception {
256+
return submitApp(masterMemory, name, user, acls, false, queue,
257+
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
258+
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
259+
waitForAccepted);
260+
}
252261

253262
public RMApp submitApp(int masterMemory, String name, String user,
254263
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ private void checkFifoQueue(SchedulerApplication schedulerApp,
336336
private static final String R = "Default";
337337
private static final String A = "QueueA";
338338
private static final String B = "QueueB";
339+
//don't ever create the below queue ;-)
340+
private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
339341
private static final String USER_1 = "user1";
340342
private static final String USER_2 = "user2";
341343

@@ -351,6 +353,18 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
351353
conf.setDouble(CapacitySchedulerConfiguration
352354
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
353355
}
356+
357+
private void setupQueueConfigurationOnlyA(
358+
CapacitySchedulerConfiguration conf) {
359+
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
360+
final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
361+
conf.setCapacity(Q_R, 100);
362+
final String Q_A = Q_R + "." + A;
363+
conf.setQueues(Q_R, new String[] {A});
364+
conf.setCapacity(Q_A, 100);
365+
conf.setDouble(CapacitySchedulerConfiguration
366+
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
367+
}
354368

355369
// Test CS recovery with multi-level queues and multi-users:
356370
// 1. setup 2 NMs each with 8GB memory;
@@ -470,6 +484,70 @@ public void testCapacitySchedulerRecovery() throws Exception {
470484
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
471485
totalUsedResource.getVirtualCores());
472486
}
487+
488+
//Test that we receive a meaningful exit-causing exception if a queue
489+
//is removed during recovery
490+
//1. Add some apps to two queues, attempt to add an app to a non-existant
491+
// queue to verify that the new logic is not in effect during normal app
492+
// submission
493+
//2. Remove one of the queues, restart the RM
494+
//3. Verify that the expected exception was thrown
495+
@Test (timeout = 30000)
496+
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
497+
if (!schedulerClass.equals(CapacityScheduler.class)) {
498+
return;
499+
}
500+
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
501+
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
502+
DominantResourceCalculator.class.getName());
503+
CapacitySchedulerConfiguration csConf =
504+
new CapacitySchedulerConfiguration(conf);
505+
setupQueueConfiguration(csConf);
506+
MemoryRMStateStore memStore = new MemoryRMStateStore();
507+
memStore.init(csConf);
508+
rm1 = new MockRM(csConf, memStore);
509+
rm1.start();
510+
MockNM nm1 =
511+
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
512+
MockNM nm2 =
513+
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
514+
nm1.registerNode();
515+
nm2.registerNode();
516+
RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
517+
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
518+
RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
519+
MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
520+
521+
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
522+
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
523+
524+
//Submit an app with a non existant queue to make sure it does not
525+
//cause a fatal failure in the non-recovery case
526+
RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
527+
QUEUE_DOESNT_EXIST, false);
528+
529+
// clear queue metrics
530+
rm1.clearQueueMetrics(app1_1);
531+
rm1.clearQueueMetrics(app1_2);
532+
rm1.clearQueueMetrics(app2);
533+
534+
// Re-start RM
535+
csConf =
536+
new CapacitySchedulerConfiguration(conf);
537+
setupQueueConfigurationOnlyA(csConf);
538+
rm2 = new MockRM(csConf, memStore);
539+
boolean runtimeThrown = false;
540+
try {
541+
rm2.start();
542+
} catch (RuntimeException e) {
543+
//we're catching it because we want to verify the message
544+
//and we don't want to set it as an expected exception for the
545+
//test because we only want it to happen here
546+
assertTrue(e.getMessage().contains(B + " missing"));
547+
runtimeThrown = true;
548+
}
549+
assertTrue(runtimeThrown);
550+
}
473551

474552
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
475553
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {

0 commit comments

Comments
 (0)