Skip to content

Commit

Permalink
Killing all applications when test case stopped in testTonyAMStartupT…
Browse files Browse the repository at this point in the history
…imeoutShouldFail
  • Loading branch information
zuston committed Jan 20, 2022
1 parent 7226585 commit 97bd8be
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
11 changes: 10 additions & 1 deletion tony-core/src/main/java/com/linkedin/tony/TonyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1332,14 +1332,23 @@ public int start() {
boolean result;
try {
result = run();
} catch (IOException | InterruptedException | URISyntaxException | YarnException | ParseException e) {
} catch (InterruptedException | YarnException interruptedException) {
try {
LOG.info("Force killing application due to accepting interruption semaphore.");
forceKillApplication();
} catch (Exception e) {
LOG.error("Errors on killing application when tony client throws exception", e);
}
result = false;
} catch (IOException | URISyntaxException | ParseException e) {
LOG.fatal("Failed to run TonyClient", e);
result = false;
} finally {
if (!this.debug) {
cleanupLocalTmpFiles();
}
}

if (result) {
LOG.info("Application completed successfully");
return 0;
Expand Down
2 changes: 1 addition & 1 deletion tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public static Set<String> getAllJobTypes(Configuration conf) {
}
}

public static int getNumTotalTasks(Configuration conf) {
public synchronized static int getNumTotalTasks(Configuration conf) {
return getAllJobTypes(conf).stream().mapToInt(type -> conf.getInt(TonyConfigurationKeys.getInstancesKey(type), 0))
.sum();
}
Expand Down
51 changes: 35 additions & 16 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import com.linkedin.tony.rpc.TaskInfo;
import com.linkedin.tony.rpc.impl.TaskStatus;

import java.util.ArrayList;
import java.util.HashSet;

import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -31,9 +32,12 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.linkedin.tony.TaskExecutor.MARK_LOST_CONNECTION_ENV_KEY;
import static com.linkedin.tony.TonyConfigurationKeys.TASK_HEARTBEAT_INTERVAL_MS;
Expand All @@ -59,6 +63,7 @@
* The YARN logs for the test should be in {@code <TonY>/target/MiniTonY}.
*/
public class TestTonyE2E {
private static final Log LOG = LogFactory.getLog(TestTonyE2E.class);

private static class TestTonyE2EHandler implements CallbackHandler, TaskUpdateListener {

Expand Down Expand Up @@ -611,10 +616,11 @@ public void testTonyAllocationTimeoutShouldFail() throws ParseException, IOExcep

@Test(timeOut = 60000)
public void testTonyAMStartupTimeoutShouldFail() throws ParseException, IOException {
List<CompletableFuture<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(mockedTask());
}
/* Creating 10 jobs submitting to Yarn to make cluster fulfill. */
ExecutorService executorService = Executors.newFixedThreadPool(10);
Stream.iterate(0, n -> n + 1)
.limit(10)
.forEach(x -> mockedTask(executorService));

TonyClient client = new TonyClient(conf);
client.init(new String[]{
Expand All @@ -631,17 +637,30 @@ public void testTonyAMStartupTimeoutShouldFail() throws ParseException, IOExcept
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);

for (CompletableFuture<Integer> task : tasks) {
try {
task.cancel(true);
} catch (Exception e) {
// ignore
shutdownExecutor(executorService);
}

private void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private CompletableFuture<Integer> mockedTask() {
return CompletableFuture.supplyAsync(() -> {
private Future<?> mockedTask(ExecutorService service) {
return service.submit((Runnable) () -> {
TonyClient client = new TonyClient(conf);
try {
client.init(new String[]{
Expand All @@ -655,11 +674,11 @@ private CompletableFuture<Integer> mockedTask() {
"--conf", "tony.ps.command=python sleep_30.py",
"--conf", "tony.worker.command=python check_env_and_venv.py"
});
client.start();
} catch (Exception e) {
e.printStackTrace();
}
int exitCode = client.start();
return exitCode;
return;
});
}

Expand Down

0 comments on commit 97bd8be

Please sign in to comment.