Skip to content

Commit

Permalink
Backport: The process of task executor is still alive when existing N…
Browse files Browse the repository at this point in the history
…M marked as lost node by RM tony-framework#638

PR: tony-framework#638
  • Loading branch information
zuston committed Jan 20, 2022
1 parent 6880b82 commit 9869f59
Showing 1 changed file with 49 additions and 33 deletions.
82 changes: 49 additions & 33 deletions tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -35,8 +36,10 @@
* Content that we want to run in the containers. TaskExecutor will register itself with AM and fetch cluster spec from
* AM. After the cluster spec is collected, TaskExecutor will set up local environment and start the worker task.
*/
public class TaskExecutor {
public class TaskExecutor implements AutoCloseable {
private static final Log LOG = LogFactory.getLog(TaskExecutor.class);
private static final int GENERAL_EXIT_CODE = 1;
public static final String MARK_LOST_CONNECTION_ENV_KEY = "MARK_TASK_EXECUTOR_LOST_CONNECTION_WITH_AM";

@VisibleForTesting
protected Configuration tonyConf = new Configuration(false);
Expand Down Expand Up @@ -74,6 +77,8 @@ public class TaskExecutor {

private static FrameworkRuntime frameworkRuntime;

private volatile boolean markedAsLostConnectionWithAM = false;

@VisibleForTesting
public TaskExecutor() { }

Expand Down Expand Up @@ -184,21 +189,10 @@ private static TaskExecutor createExecutor() throws Exception {

public static void main(String[] unused) throws Exception {
LOG.info("TaskExecutor is running..");
TaskExecutor executor = null;
try {
executor = requireNonNull(createExecutor());
} catch (Exception ex) {
if (executor != null) {
LOG.info("Failed to create TaskExecutor, releasing any reserved ports.");
executor.releasePorts();
}
throw ex;
}

// If not reusing port, then reserve them up until before the underlying TF process is
// launched. See <a href="https://github.com/linkedin/TonY/issues/365">this issue</a> for
// details.
if (executor != null) {
try (TaskExecutor executor = requireNonNull(createExecutor())) {
// If not reusing port, then reserve them up until before the underlying TF process is
// launched. See <a href="https://github.com/linkedin/TonY/issues/365">this issue</a> for
// details.
if (!executor.isTFGrpcReusingPort()) {
LOG.info("Releasing reserved RPC port before launching tensorflow process.");
executor.releasePort(executor.rpcPort);
Expand All @@ -208,27 +202,36 @@ public static void main(String[] unused) throws Exception {
LOG.info("Releasing reserved TB port before launching tensorflow process.");
executor.releasePort(executor.tbPort);
}
}

int exitCode;
try {
exitCode = frameworkRuntime.run();
CompletableFuture<Integer> childProcessFuture = CompletableFuture.supplyAsync(() -> {
try {
return frameworkRuntime.run();
} catch (Exception e) {
LOG.error("Errors on running child process.", e);
}
return GENERAL_EXIT_CODE;
});

int exitCode;
while (true) {
if (executor.markedAsLostConnectionWithAM) {
exitCode = GENERAL_EXIT_CODE;
break;
}

if (childProcessFuture.isDone()) {
exitCode = childProcessFuture.getNow(GENERAL_EXIT_CODE);
break;
}
}

// START - worker skew testing:
executor.skewAndHangIfTesting();
// END - worker skew testing:
executor.registerExecutionResult(exitCode, executor.jobName, String.valueOf(executor.taskIndex));
LOG.info("Child process exited with exit code " + exitCode);
System.exit(exitCode);
} finally {
if (executor.isTFGrpcReusingPort()) {
LOG.info("Tensorflow process exited, releasing reserved RPC port.");
executor.releasePort(executor.rpcPort);
}

if (executor.isTBServerReusingPort()) {
LOG.info("Tensorflow process exited, releasing reserved TB port.");
executor.releasePort(executor.tbPort);
}
LOG.info("Child process exited with exit code: " + exitCode);
System.exit(exitCode);
}
}

Expand Down Expand Up @@ -336,6 +339,19 @@ private void registerExecutionResult(int exitCode, String jobName, String jobInd
}
}

@Override
public void close() throws Exception {
if (isTFGrpcReusingPort()) {
LOG.info("TensorFlow process exited, releasing reserved RPC port.");
releasePort(rpcPort);
}

if (isTBServerReusingPort()) {
LOG.info("Tensorflow process exited, releasing reserved TB port.");
releasePort(tbPort);
}
}

private class Heartbeater implements Runnable {
int hbMissCounter = 0;
int numHbToMiss;
Expand Down Expand Up @@ -368,8 +384,8 @@ public void run() {
LOG.error("[" + taskId + "] Failed to send Heart Beat.", e);
if (++numFailedHBAttempts > maxConsecutiveHBMiss) {
LOG.error("[" + taskId + "] Exceeded max number of allowed failed heart beat send attempts. "
+ "Going to stop heartbeating!");
e.printStackTrace();
+ "Going to stop heartbeating!", e);
markedAsLostConnectionWithAM = true;
throw new RuntimeException(e);
} else {
LOG.warn("Will retry heartbeat..");
Expand Down

0 comments on commit 9869f59

Please sign in to comment.