Skip to content

Commit 3de66f5

Browse files
authored
YARN-11547. [Federation] Router Supports Remove individual application records from FederationStateStore. (#6055)
1 parent f51162d commit 3de66f5

File tree

2 files changed

+159
-14
lines changed
  • hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src

2 files changed

+159
-14
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030

31+
import org.apache.commons.cli.CommandLine;
32+
import org.apache.commons.cli.DefaultParser;
33+
import org.apache.commons.cli.Option;
34+
import org.apache.commons.cli.Options;
35+
import org.apache.commons.cli.ParseException;
36+
import org.apache.commons.cli.MissingArgumentException;
3137
import org.apache.commons.lang.time.DurationFormatUtils;
3238
import org.apache.hadoop.classification.InterfaceAudience.Private;
3339
import org.apache.hadoop.conf.Configuration;
@@ -37,14 +43,16 @@
3743
import org.apache.hadoop.security.SecurityUtil;
3844
import org.apache.hadoop.security.UserGroupInformation;
3945
import org.apache.hadoop.service.CompositeService;
40-
import org.apache.hadoop.util.GenericOptionsParser;
4146
import org.apache.hadoop.util.JvmPauseMonitor;
4247
import org.apache.hadoop.util.ShutdownHookManager;
4348
import org.apache.hadoop.util.StringUtils;
4449
import org.apache.hadoop.util.VersionInfo;
50+
import org.apache.hadoop.util.GenericOptionsParser;
4551
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
52+
import org.apache.hadoop.yarn.api.records.ApplicationId;
4653
import org.apache.hadoop.yarn.conf.YarnConfiguration;
4754
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
55+
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
4856
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
4957
import org.apache.hadoop.yarn.server.router.cleaner.SubClusterCleaner;
5058
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
@@ -103,6 +111,9 @@ public class Router extends CompositeService {
103111
protected String webAppAddress;
104112
private static long clusterTimeStamp = System.currentTimeMillis();
105113
private FedAppReportFetcher fetcher = null;
114+
private static final String CMD_FORMAT_STATE_STORE = "-format-state-store";
115+
private static final String CMD_REMOVE_APPLICATION_FROM_STATE_STORE =
116+
"-remove-application-from-state-store";
106117

107118
/**
108119
* Priority of the Router shutdown hook.
@@ -191,7 +202,7 @@ protected void serviceStop() throws Exception {
191202
}
192203

193204
protected void shutDown() {
194-
new Thread(() -> Router.this.stop()).start();
205+
new Thread(Router.this::stop).start();
195206
}
196207

197208
protected RouterClientRMService createClientRMProxyService() {
@@ -292,24 +303,14 @@ public static String getProxyHostAndPort(Configuration conf) {
292303

293304
public static void main(String[] argv) {
294305
Configuration conf = new YarnConfiguration();
295-
Thread
296-
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
306+
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
297307
StringUtils.startupShutdownMessage(Router.class, argv, LOG);
298308
Router router = new Router();
299309
try {
300310
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
301311
argv = hParser.getRemainingArgs();
302312
if (argv.length > 1) {
303-
if (argv[0].equals("-format-state-store")) {
304-
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
305-
System.err.println("format-state-store is not yet supported.");
306-
} else if (argv[0].equals("-remove-application-from-state-store") && argv.length == 2) {
307-
// TODO: YARN-11547. [Federation]
308-
// Router Supports Remove individual application records from FederationStateStore.
309-
System.err.println("remove-application-from-state-store is not yet supported.");
310-
} else {
311-
printUsage(System.err);
312-
}
313+
executeRouterCommand(conf, argv);
313314
} else {
314315
// Remove the old hook if we are rebooting.
315316
if (null != routerShutdownHook) {
@@ -362,6 +363,73 @@ public FedAppReportFetcher getFetcher() {
362363
return fetcher;
363364
}
364365

366+
@VisibleForTesting
367+
public static void removeApplication(Configuration conf, String applicationId)
368+
throws Exception {
369+
FederationStateStoreFacade facade = FederationStateStoreFacade.getInstance(conf);
370+
ApplicationId removeAppId = ApplicationId.fromString(applicationId);
371+
LOG.info("Deleting application {} from state store.", removeAppId);
372+
facade.deleteApplicationHomeSubCluster(removeAppId);
373+
LOG.info("Application is deleted from state store");
374+
}
375+
376+
private static void handFormatStateStore() {
377+
// TODO: YARN-11548. [Federation] Router Supports Format FederationStateStore.
378+
System.err.println("format-state-store is not yet supported.");
379+
}
380+
381+
private static void handRemoveApplicationFromStateStore(Configuration conf,
382+
String applicationId) {
383+
try {
384+
removeApplication(conf, applicationId);
385+
System.out.println("Application " + applicationId + " is deleted from state store");
386+
} catch (Exception e) {
387+
System.err.println("Application " + applicationId + " error, exception = " + e);
388+
}
389+
}
390+
391+
private static void executeRouterCommand(Configuration conf, String[] args) {
392+
// Step1. Define Options.
393+
Options opts = new Options();
394+
Option formatStateStoreOpt = new Option("format-state-store", false,
395+
" Formats the FederationStateStore. " +
396+
"This will clear the FederationStateStore and " +
397+
"is useful if past applications are no longer needed. " +
398+
"This should be run only when the Router is not running.");
399+
Option removeApplicationFromStateStoreOpt = new Option("remove-application-from-state-store",
400+
false, " Remove the application from FederationStateStore. " +
401+
" This should be run only when the Router is not running. ");
402+
opts.addOption(formatStateStoreOpt);
403+
opts.addOption(removeApplicationFromStateStoreOpt);
404+
405+
// Step2. Parse Options.
406+
try {
407+
String cmd = args[0];
408+
409+
CommandLine cliParser = new DefaultParser().parse(opts, args);
410+
411+
if (CMD_FORMAT_STATE_STORE.equals(cmd)) {
412+
handFormatStateStore();
413+
} else if (CMD_REMOVE_APPLICATION_FROM_STATE_STORE.equals(cmd)) {
414+
if (cliParser.hasOption(removeApplicationFromStateStoreOpt)) {
415+
String applicationId = cliParser.getOptionValue(removeApplicationFromStateStoreOpt);
416+
handRemoveApplicationFromStateStore(conf, applicationId);
417+
} else {
418+
System.err.println("remove-application-from-state-store requires application arg.");
419+
}
420+
} else {
421+
System.out.println("No related commands found.");
422+
printUsage(System.err);
423+
}
424+
} catch (MissingArgumentException ex) {
425+
System.out.println("Missing argument for options.");
426+
printUsage(System.err);
427+
} catch (ParseException e) {
428+
System.out.println("Parsing of a command-line error.");
429+
printUsage(System.err);
430+
}
431+
}
432+
365433
private static void printUsage(PrintStream out) {
366434
out.println("Usage: yarn router [-format-state-store] | " +
367435
"[-remove-application-from-state-store <appId>]");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.yarn.server.router;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.test.LambdaTestUtils;
22+
import org.apache.hadoop.util.Time;
23+
import org.apache.hadoop.yarn.api.records.ApplicationId;
24+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
25+
import org.apache.hadoop.yarn.exceptions.YarnException;
26+
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
27+
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
28+
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
29+
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
30+
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
31+
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
35+
public class TestRouterStoreCommands {
36+
37+
////////////////////////////////
38+
// Router Constants
39+
////////////////////////////////
40+
private Configuration conf;
41+
private MemoryFederationStateStore stateStore;
42+
private FederationStateStoreFacade facade;
43+
44+
@Before
45+
public void setup() throws YarnException {
46+
conf = new YarnConfiguration();
47+
stateStore = new MemoryFederationStateStore();
48+
stateStore.init(conf);
49+
facade = FederationStateStoreFacade.getInstance(conf);
50+
facade.reinitialize(stateStore, conf);
51+
}
52+
53+
@Test
54+
public void testRemoveApplicationFromRouterStateStore() throws Exception {
55+
56+
// We will design such a unit test.
57+
// We will write the applicationId and subCluster into the stateStore,
58+
// and then remove the application through Router.removeApplication.
59+
// At this time, if we continue to query through the stateStore,
60+
// We will get a prompt that application not exists.
61+
62+
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
63+
SubClusterId homeSubCluster = SubClusterId.newInstance("SC-1");
64+
ApplicationHomeSubCluster applicationHomeSubCluster =
65+
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
66+
AddApplicationHomeSubClusterRequest request =
67+
AddApplicationHomeSubClusterRequest.newInstance(applicationHomeSubCluster);
68+
stateStore.addApplicationHomeSubCluster(request);
69+
Router.removeApplication(conf, appId.toString());
70+
71+
GetApplicationHomeSubClusterRequest request1 =
72+
GetApplicationHomeSubClusterRequest.newInstance(appId);
73+
74+
LambdaTestUtils.intercept(YarnException.class, "Application " + appId + " does not exist.",
75+
() -> stateStore.getApplicationHomeSubCluster(request1));
76+
}
77+
}

0 commit comments

Comments
 (0)