Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The process of task executor is still alive when existing NM marked as lost node by RM #638

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 53 additions & 34 deletions tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,8 +33,10 @@
* Content that we want to run in the containers. TaskExecutor will register itself with AM and fetch cluster spec from
* AM. After the cluster spec is collected, TaskExecutor will set up local environment and start the worker task.
*/
public class TaskExecutor {
public class TaskExecutor implements AutoCloseable {
private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
private static final int GENERAL_EXIT_CODE = 1;
public static final String MARK_LOST_CONNECTION_ENV_KEY = "MARK_TASK_EXECUTOR_LOST_CONNECTION_WITH_AM";

@VisibleForTesting
protected Configuration tonyConf = new Configuration(false);
Expand Down Expand Up @@ -70,6 +73,8 @@ public class TaskExecutor {

private static Framework.TaskExecutorAdapter taskRuntimeAdapter;

private volatile boolean markedAsLostConnectionWithAM = false;

@VisibleForTesting
public TaskExecutor() { }

Expand Down Expand Up @@ -187,21 +192,10 @@ private static TaskExecutor createExecutor() throws Exception {

public static void main(String[] unused) throws Exception {
LOG.info("TaskExecutor is running..");
TaskExecutor executor = null;
try {
executor = requireNonNull(createExecutor());
} catch (Exception ex) {
if (executor != null) {
LOG.info("Failed to create TaskExecutor, releasing any reserved ports.");
executor.releasePorts();
}
throw ex;
}

// If not reusing port, then reserve them up until before the underlying TF process is
// launched. See <a href="https://github.com/linkedin/TonY/issues/365">this issue</a> for
// details.
if (executor != null) {
try (TaskExecutor executor = requireNonNull(createExecutor())) {
// If not reusing port, then reserve them up until before the underlying TF process is
// launched. See <a href="https://github.com/linkedin/TonY/issues/365">this issue</a> for
// details.
if (!executor.isTFGrpcReusingPort()) {
LOG.info("Releasing reserved RPC port before launching tensorflow process.");
executor.releasePort(executor.rpcPort);
Expand All @@ -211,29 +205,37 @@ public static void main(String[] unused) throws Exception {
LOG.info("Releasing reserved TB port before launching tensorflow process.");
executor.releasePort(executor.tbPort);
}
}

int exitCode;
try {
exitCode = taskRuntimeAdapter.run();
CompletableFuture<Integer> childProcessFuture = CompletableFuture.supplyAsync(() -> {
try {
return taskRuntimeAdapter.run();
} catch (Exception e) {
LOG.error("Errors on running child process.", e);
}
return GENERAL_EXIT_CODE;
});

int exitCode;
while (true) {
if (executor.markedAsLostConnectionWithAM) {
exitCode = GENERAL_EXIT_CODE;
break;
}

if (childProcessFuture.isDone()) {
exitCode = childProcessFuture.getNow(GENERAL_EXIT_CODE);
break;
}
}

// START - worker skew testing:
executor.skewAndHangIfTesting();
// END - worker skew testing:
executor.registerExecutionResult(exitCode, executor.jobName, String.valueOf(executor.taskIndex));
} finally {
if (executor.isTFGrpcReusingPort()) {
LOG.info("TensorFlow process exited, releasing reserved RPC port.");
executor.releasePort(executor.rpcPort);
}

if (executor.isTBServerReusingPort()) {
LOG.info("Tensorflow process exited, releasing reserved TB port.");
executor.releasePort(executor.tbPort);
}
LOG.info("Child process exited with exit code: " + exitCode);
System.exit(exitCode);
}

LOG.info("Child process exited with exit code " + exitCode);
System.exit(exitCode);
}

protected void initConfigs() {
Expand Down Expand Up @@ -278,6 +280,10 @@ protected void initConfigs() {
maxConsecutiveHBMiss = tonyConf.getInt(TonyConfigurationKeys.TASK_MAX_MISSED_HEARTBEATS,
TonyConfigurationKeys.DEFAULT_TASK_MAX_MISSED_HEARTBEATS);

// Only for test case.
markedAsLostConnectionWithAM = shellEnv.getOrDefault(MARK_LOST_CONNECTION_ENV_KEY, "false")
.equalsIgnoreCase("true");

Utils.initYarnConf(yarnConf);
Utils.initHdfsConf(hdfsConf);
}
Expand Down Expand Up @@ -321,6 +327,19 @@ private void registerExecutionResult(int exitCode, String jobName, String jobInd
}
}

@Override
public void close() throws Exception {
if (isTFGrpcReusingPort()) {
LOG.info("TensorFlow process exited, releasing reserved RPC port.");
releasePort(rpcPort);
}

if (isTBServerReusingPort()) {
LOG.info("Tensorflow process exited, releasing reserved TB port.");
releasePort(tbPort);
}
}

private class Heartbeater implements Runnable {
int hbMissCounter = 0;
int numHbToMiss;
Expand Down Expand Up @@ -353,8 +372,8 @@ public void run() {
LOG.error("[" + taskId + "] Failed to send Heart Beat.", e);
if (++numFailedHBAttempts > maxConsecutiveHBMiss) {
LOG.error("[" + taskId + "] Exceeded max number of allowed failed heart beat send attempts. "
+ "Going to stop heartbeating!");
e.printStackTrace();
+ "Going to stop heartbeating!", e);
markedAsLostConnectionWithAM = true;
throw new RuntimeException(e);
} else {
LOG.warn("Will retry heartbeat..");
Expand Down
17 changes: 17 additions & 0 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static com.linkedin.tony.TaskExecutor.MARK_LOST_CONNECTION_ENV_KEY;
import static com.linkedin.tony.TonyConfigurationKeys.TASK_HEARTBEAT_INTERVAL_MS;
import static com.linkedin.tony.TonyConfigurationKeys.TASK_MAX_MISSED_HEARTBEATS;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -710,6 +711,22 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
client.removeListener(handler);
}

@Test
public void testLostConnectionWithAMJobShouldFail() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python forever_not_exit.py",
"--conf", "tony.worker.instances=1",
"--conf", "tony.application.framework=tensorflow",
"--shell_env", MARK_LOST_CONNECTION_ENV_KEY + "=true"
});
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);
}

/**
* Since we are switching from passing arguments to ApplicationMaster & TaskExecutor
* to passing tony configuration file. It is critical to make sure all fields in
Expand Down