Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Killing all applications when test case stopped in testTonyAMStartupT… #639

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion tony-core/src/main/java/com/linkedin/tony/TonyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1332,14 +1332,23 @@ public int start() {
boolean result;
try {
result = run();
} catch (IOException | InterruptedException | URISyntaxException | YarnException | ParseException e) {
} catch (InterruptedException | YarnException interruptedException) {
try {
LOG.info("Force killing application due to accepting interruption semaphore.");
forceKillApplication();
} catch (Exception e) {
LOG.error("Errors on killing application when tony client throws exception", e);
}
result = false;
} catch (IOException | URISyntaxException | ParseException e) {
LOG.fatal("Failed to run TonyClient", e);
result = false;
} finally {
if (!this.debug) {
cleanupLocalTmpFiles();
}
}

if (result) {
LOG.info("Application completed successfully");
return 0;
Expand Down
2 changes: 1 addition & 1 deletion tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public static Set<String> getAllJobTypes(Configuration conf) {
}
}

public static int getNumTotalTasks(Configuration conf) {
public synchronized static int getNumTotalTasks(Configuration conf) {
return getAllJobTypes(conf).stream().mapToInt(type -> conf.getInt(TonyConfigurationKeys.getInstancesKey(type), 0))
.sum();
}
Expand Down
51 changes: 35 additions & 16 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import com.linkedin.tony.rpc.TaskInfo;
import com.linkedin.tony.rpc.impl.TaskStatus;

import java.util.ArrayList;
import java.util.HashSet;

import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -31,9 +32,12 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.linkedin.tony.TaskExecutor.MARK_LOST_CONNECTION_ENV_KEY;
import static com.linkedin.tony.TonyConfigurationKeys.TASK_HEARTBEAT_INTERVAL_MS;
Expand All @@ -59,6 +63,7 @@
* The YARN logs for the test should be in {@code <TonY>/target/MiniTonY}.
*/
public class TestTonyE2E {
private static final Log LOG = LogFactory.getLog(TestTonyE2E.class);

private static class TestTonyE2EHandler implements CallbackHandler, TaskUpdateListener {

Expand Down Expand Up @@ -611,10 +616,11 @@ public void testTonyAllocationTimeoutShouldFail() throws ParseException, IOExcep

@Test(timeOut = 60000)
public void testTonyAMStartupTimeoutShouldFail() throws ParseException, IOException {
List<CompletableFuture<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(mockedTask());
}
/* Creating 10 jobs submitting to Yarn to make cluster fulfill. */
ExecutorService executorService = Executors.newFixedThreadPool(10);
Stream.iterate(0, n -> n + 1)
.limit(10)
.forEach(x -> mockedTask(executorService));

TonyClient client = new TonyClient(conf);
client.init(new String[]{
Expand All @@ -631,17 +637,30 @@ public void testTonyAMStartupTimeoutShouldFail() throws ParseException, IOExcept
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);

for (CompletableFuture<Integer> task : tasks) {
try {
task.cancel(true);
} catch (Exception e) {
// ignore
shutdownExecutor(executorService);
}

private void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private CompletableFuture<Integer> mockedTask() {
return CompletableFuture.supplyAsync(() -> {
private Future<?> mockedTask(ExecutorService service) {
return service.submit((Runnable) () -> {
TonyClient client = new TonyClient(conf);
try {
client.init(new String[]{
Expand All @@ -655,11 +674,11 @@ private CompletableFuture<Integer> mockedTask() {
"--conf", "tony.ps.command=python sleep_30.py",
"--conf", "tony.worker.command=python check_env_and_venv.py"
});
client.start();
} catch (Exception e) {
e.printStackTrace();
}
int exitCode = client.start();
return exitCode;
return;
});
}

Expand Down