Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable status when dependency times out #642

Merged
merged 1 commit into from
Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@ For more information about TonY, check out the following:

3. My tensorflow's partial workers hang when chief finished. Or evaluator hang when chief and workers finished.

Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) on Tensorflow configuration to solve it.
Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) and [PR#641](https://github.com/tony-framework/TonY/issues/641) on Tensorflow configuration to solve it.
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ public static String getContainerDockerMountKey() {
public static final String GROUP_REGEX = TONY_APPLICATION_PREFIX + "group\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_IGNORE_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$.ignored";

public static String getGroupKey(String groupName) {
return String.format(TONY_APPLICATION_PREFIX + "group.%s", groupName);
Expand All @@ -357,4 +359,8 @@ public static String getGroupKey(String groupName) {
public static String getGroupDependentKey(String grp, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s", grp, dependentGrp);
}

public static String getGroupDependentIgnoredKey(String roleType, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s.ignored", roleType, dependentGrp);
}
}
10 changes: 10 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/TonySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -37,6 +38,8 @@

import static com.linkedin.tony.Constants.CHIEF_JOB_NAME;
import static com.linkedin.tony.Constants.WORKER_JOB_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.UNTRACKED_JOBTYPES;
import static com.linkedin.tony.util.Utils.getUntrackedJobTypes;


/**
Expand Down Expand Up @@ -670,4 +673,11 @@ public int getNumRegisteredTasks() {
public Set<String> getRegisteredTasks() {
return registeredTasks;
}

public void makeTaskTypeUntracked(String taskType) {
String[] defaultUntrackedTypes = getUntrackedJobTypes(tonyConf);
List<String> untrackedList = Arrays.stream(defaultUntrackedTypes).collect(Collectors.toList());
untrackedList.add(taskType);
tonyConf.set(UNTRACKED_JOBTYPES, StringUtils.join(untrackedList, ","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.linkedin.tony.util.Utils;

import static com.linkedin.tony.Constants.SIDECAR_TB_ROLE_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.getGroupDependentIgnoredKey;

public abstract class MLGenericRuntime extends AbstractFrameworkRuntime {
private static final long REGISTRATION_STATUS_INTERVAL_MS = 15 * 1000;
Expand Down Expand Up @@ -141,9 +142,16 @@ public boolean isHealthy(Configuration tonyConf) {
* chief/workers are finished, the mechanism of dependency group timeout will make job failed.
*
* Dependency group timeout configuration as follows:
*
* ```
* tony.application.group.A = worker,chief
* tony.application.dependency.evaluator.timeout.after.A = 3600
* ```
*
* And in some of the cases, we don't want to fail the whole job even though a dependency times out.
* For example, if chief succeeded and there is a worker hanging for 1 hour,
* users can configure the job to still pass. So it introduces the new config of
* `tony.application.dependency.[X].timeout.after.[GROUP].ignored = true`, and more details could be
* found in https://github.com/tony-framework/TonY/issues/641.
*
*/
String errorMsg = groupDependencyTimeout(tonyConf);
Expand Down Expand Up @@ -231,10 +239,20 @@ protected String groupDependencyTimeout(Configuration tonyConf) {
}

if (System.currentTimeMillis() - latestEndTimeInAllDependentTasks > timeout) {
return String.format("Jobtype: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));

String ignoredTaskTypeKey = getGroupDependentIgnoredKey(runningTaskType, dependentGroupName);
boolean ignoreTimeout = tonyConf.getBoolean(ignoredTaskTypeKey, false);
if (!ignoreTimeout) {
return String.format("Task type: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));
}

log.info(
String.format("Task type: %s is marked as untracked.", runningJobTypes)
);
session.makeTaskTypeUntracked(runningTaskType);
}
}

Expand Down
76 changes: 76 additions & 0 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,82 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
client.removeListener(handler);
}

/**
* When enable the conf of "tony.application.dependency.worker.timeout.after.A.ignored=true",
* it should make the job succeed.
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

/**
* Test task-dependency-timeout with the task role of PS (the default untracked job type)
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredAndWithPSShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.ps.instances=1",
"--conf", "tony.ps.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

/**
* Test task(Worker) dependency-timeout, but the role of worker exit with -1,
* and then this job should fail.
*/
@Test
public void testTaskWithDependencyTimeAndIgnoredButFailedShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python sleep_10_and_exit_1.py",
"--conf", "tony.evaluator.instances=1",
"--conf", "tony.evaluator.command=python sleep_30.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=5",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
"--conf", "tony.application.stop-on-failure-jobtypes=worker"
});
int exitCode = client.start();
Assert.assertEquals(exitCode, -1);
}

@Test
public void testLostConnectionWithAMJobShouldFail() throws Exception {
client.init(new String[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public void testGroupDependencyShouldPass() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: evaluator runs exceeded timeout because it's dependent group: A "
+ "(task set: [worker,chief]) has been finished."
"Task type: evaluator runs exceeded timeout because it's dependent group: "
+ "A (task set: [worker,chief]) has been finished."
);
}

Expand All @@ -142,7 +142,8 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: otherWorker runs exceeded timeout because it's dependent group: A (task set: [chief]) has been finished."
"Task type: otherWorker runs exceeded timeout because it's dependent group: "
+ "A (task set: [chief]) has been finished."
);
}

Expand All @@ -164,7 +165,8 @@ public void testGroupDependencyWithMultipleGroup() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: evaluator runs exceeded timeout because it's dependent group: B (task set: [chief,worker]) has been finished."
"Task type: evaluator runs exceeded timeout because it's dependent group: "
+ "B (task set: [chief,worker]) has been finished."
);
}

Expand Down Expand Up @@ -235,6 +237,32 @@ public void testPartialTaskScheduledShouldPass() {
);
}

/**
* Test case for partial tasks with ignored timeout, and it will be marked as untracked
* when dependency times out.
*/
@Test
public void testTaskTimeoutWithIgnoredShouldPass() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief");
conf.set("tony.application.dependency.otherWorker.timeout.after.A", "3600");
conf.set("tony.application.dependency.otherWorker.timeout.after.A.ignored", "true");

TonySession session = buildMockSession(conf);

TonySession.TonyTask chiefTask = session.getTask("chief", "0");
chiefTask.setEndTime(System.currentTimeMillis() - 1000 * 60 * 120);

MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
am.setTonySession(session);
Assert.assertNull(
am.groupDependencyTimeout(conf)
);

Assert.assertEquals(session.getTotalTasks() - session.getTotalTrackedTasks(), 3);
}

private TonySession buildPartialTaskScheduledSession(Configuration conf) {
TonySession session = new TonySession.Builder().setTonyConf(conf).build();

Expand Down
8 changes: 8 additions & 0 deletions tony-core/src/test/resources/scripts/sleep_10_and_exit_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright 2022 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
# See LICENSE in the project root for license information.
#
import time

time.sleep(10)
exit(1)