|
32 | 32 | import java.util.Vector; |
33 | 33 | import java.util.Arrays; |
34 | 34 | import java.util.Base64; |
| 35 | +import java.util.concurrent.atomic.AtomicBoolean; |
35 | 36 |
|
36 | 37 | import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; |
37 | 38 |
|
@@ -253,6 +254,10 @@ public class Client { |
253 | 254 | // Command line options |
254 | 255 | private Options opts; |
255 | 256 |
|
| 257 | + private final AtomicBoolean stopSignalReceived; |
| 258 | + private final AtomicBoolean isRunning; |
| 259 | + private final Object objectLock = new Object(); |
| 260 | + |
256 | 261 | private static final String shellCommandPath = "shellCommands"; |
257 | 262 | private static final String shellArgsPath = "shellArgs"; |
258 | 263 | private static final String appMasterJarPath = "AppMaster.jar"; |
@@ -413,6 +418,8 @@ public Client(Configuration conf) throws Exception { |
413 | 418 | opts.addOption("application_tags", true, "Application tags."); |
414 | 419 | opts.addOption("localize_files", true, "List of files, separated by comma" |
415 | 420 | + " to be localized for the command"); |
| 421 | + stopSignalReceived = new AtomicBoolean(false); |
| 422 | + isRunning = new AtomicBoolean(false); |
416 | 423 | } |
417 | 424 |
|
418 | 425 | /** |
@@ -670,8 +677,8 @@ public boolean init(String[] args) throws ParseException { |
670 | 677 | * @throws YarnException |
671 | 678 | */ |
672 | 679 | public boolean run() throws IOException, YarnException { |
673 | | - |
674 | 680 | LOG.info("Running Client"); |
| 681 | + isRunning.set(true); |
675 | 682 | yarnClient.start(); |
676 | 683 | // set the client start time. |
677 | 684 | clientStartTime = System.currentTimeMillis(); |
@@ -1116,15 +1123,22 @@ private boolean monitorApplication(ApplicationId appId) |
1116 | 1123 |
|
1117 | 1124 | boolean res = false; |
1118 | 1125 | boolean needForceKill = false; |
1119 | | - while (true) { |
| 1126 | + while (isRunning.get()) { |
1120 | 1127 | // Check app status every 1 second. |
1121 | 1128 | try { |
1122 | | - Thread.sleep(APP_MONITOR_INTERVAL); |
| 1129 | + synchronized (objectLock) { |
| 1130 | + objectLock.wait(APP_MONITOR_INTERVAL); |
| 1131 | + } |
| 1132 | + needForceKill = stopSignalReceived.get(); |
1123 | 1133 | } catch (InterruptedException e) { |
1124 | 1134 | LOG.warn("Thread sleep in monitoring loop interrupted"); |
1125 | 1135 | // if the application is to be killed when client times out; |
1126 | 1136 | // then set needForceKill to true |
1127 | 1137 | break; |
| 1138 | + } finally { |
| 1139 | + if (needForceKill) { |
| 1140 | + break; |
| 1141 | + } |
1128 | 1142 | } |
1129 | 1143 |
|
1130 | 1144 | // Get application report for the appId we are interested in |
@@ -1177,6 +1191,8 @@ private boolean monitorApplication(ApplicationId appId) |
1177 | 1191 | forceKillApplication(appId); |
1178 | 1192 | } |
1179 | 1193 |
|
| 1194 | + isRunning.set(false); |
| 1195 | + |
1180 | 1196 | return res; |
1181 | 1197 | } |
1182 | 1198 |
|
@@ -1388,4 +1404,31 @@ static Map<String, Long> parseResourcesString(String resourcesStr) { |
1388 | 1404 | } |
1389 | 1405 | return resources; |
1390 | 1406 | } |
| 1407 | + |
| 1408 | + @VisibleForTesting |
| 1409 | + protected void sendStopSignal() { |
| 1410 | + LOG.info("Sending stop Signal to Client"); |
| 1411 | + stopSignalReceived.set(true); |
| 1412 | + synchronized (objectLock) { |
| 1413 | + objectLock.notifyAll(); |
| 1414 | + } |
| 1415 | + int waitCount = 0; |
| 1416 | + LOG.info("Waiting for Client to exit loop"); |
| 1417 | + while (!isRunning.get()) { |
| 1418 | + try { |
| 1419 | + Thread.sleep(50); |
| 1420 | + } catch (InterruptedException ie) { |
| 1421 | + // do nothing |
| 1422 | + } finally { |
| 1423 | + waitCount++; |
| 1424 | + if (isRunning.get() || waitCount > 2000) { |
| 1425 | + break; |
| 1426 | + } |
| 1427 | + } |
| 1428 | + } |
| 1429 | + LOG.info("Stopping yarnClient within the Client"); |
| 1430 | + yarnClient.stop(); |
| 1431 | + yarnClient.waitForServiceToStop(clientTimeout); |
| 1432 | + LOG.info("done stopping Client"); |
| 1433 | + } |
1391 | 1434 | } |
0 commit comments