Skip to content

Commit

Permalink
exit when allreduce/broadcast error cause timeout (#112)
Browse files Browse the repository at this point in the history
* keep async timeout task

* add missing pthread to cmake

* add tests

* Add a sleep period to avoid flushing the tracker.
  • Loading branch information
Chen Qin authored and trivialfis committed Oct 11, 2019
1 parent af7281a commit 5d1b613
Show file tree
Hide file tree
Showing 17 changed files with 403 additions and 71 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ mpich-3.2/
cmake-build-debug/
.vscode/

# cmake
build/

15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ osx_image: xcode10.2

dist: xenial

language: cpp

# Use Build Matrix to do lint and build seperately
env:
matrix:
- TASK=lint LINT_LANG=cpp
- TASK=lint LINT_LANG=python
- TASK=doc
- TASK=build
# - TASK=build
- TASK=mpi-build
- TASK=cmake-test

matrix:
exclude:
- os: osx
env: TASK=lint LINT_LANG=cpp
- os: osx
env: TASK=lint LINT_LANG=python
- os: osx
env: TASK=doc
- os: osx
env: TASK=build

# dependent apt packages
addons:
apt:
Expand Down
9 changes: 7 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ if(R_LIB OR MINGW OR WIN32)
CXX_STANDARD_REQUIRED ON
POSITION_INDEPENDENT_CODE ON)
else()
add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)
find_package(Threads REQUIRED)
add_library(rabit_empty src/engine_empty.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)

add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
target_link_libraries(rabit Threads::Threads)
target_link_libraries(rabit_mock_static Threads::Threads)
target_link_libraries(rabit_mock Threads::Threads)

set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static)
set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static
Expand Down
11 changes: 11 additions & 0 deletions doc/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ you can also refer to [wormhole](https://github.com/dmlc/wormhole/blob/master/le
int main(int argc, char *argv[]) {
...
rabit::Init(argc, argv);
// sync on expected model size before load checkpoint, if we pass rabit_bootstrap_cache=true
rabit::Allreduce<rabit::op::Max>(&model.size(), 1);
// load the latest checked model
int version = rabit::LoadCheckPoint(&model);
// initialize the model if it is the first version
Expand Down Expand Up @@ -370,3 +372,12 @@ Allreduce/Broadcast calls after the checkpoint from some alive nodes.

This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated,
and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.

Rabit Timeout
---------------

In certain cases, rabit cluster may suffer lack of resources to retry failed workers.
Thanks to fault tolerant assumption with infinite retry, it might cause entire cluster hang infinitely.
We introduce sidecar thread which runs when rabit fault tolerant runtime observed allreduce/broadcast errors.
By default, it will wait for 30 mins before all workers program exit.
User can opt-in this feature and change treshold by passing rabit_timeout=true and rabit_timeout_sec=x (in seconds).
8 changes: 7 additions & 1 deletion include/rabit/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef RABIT_INTERNAL_UTILS_H_
#define RABIT_INTERNAL_UTILS_H_
#define _CRT_SECURE_NO_WARNINGS
#include <string.h>
#include <cstdio>
#include <string>
#include <cstdlib>
Expand Down Expand Up @@ -66,6 +67,11 @@ const int kPrintBuffer = 1 << 12;
* co-locate in the same process */
extern bool STOP_PROCESS_ON_ERROR;

/* \brief parse config string too bool*/
inline bool StringToBool(const char* s) {
return strcasecmp(s, "true") == 0 || atoi(s) != 0;
}

#ifndef RABIT_CUSTOMIZE_MSG_
/*!
* \brief handling of Assert error, caused by inappropriate input
Expand All @@ -86,7 +92,7 @@ inline void HandleAssertError(const char *msg) {
*/
inline void HandleCheckError(const char *msg) {
if (STOP_PROCESS_ON_ERROR) {
fprintf(stderr, "%s, shutting down process", msg);
fprintf(stderr, "%s, shutting down process\n", msg);
exit(-1);
} else {
fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
Expand Down
5 changes: 3 additions & 2 deletions scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ if [ ${TASK} == "cmake-test" ]; then
mkdir build
cd build
cmake -DRABIT_BUILD_TESTS=ON -DRABIT_BUILD_DMLC=ON -DGTEST_ROOT=${HOME}/.local ..
#unit tests
make
# known osx gtest 1.8 issue
cp ${HOME}/.local/lib/*.dylib .
make -j$(nproc)
make test
make install || exit -1
cd ../test
Expand Down
17 changes: 12 additions & 5 deletions src/allreduce_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool AllreduceBase::Init(int argc, char* argv[]) {
if (task_id == NULL) {
task_id = getenv("mapreduce_task_id");
}
if (hadoop_mode != 0) {
if (hadoop_mode) {
utils::Check(task_id != NULL,
"hadoop_mode is set but cannot find mapred_task_id");
}
Expand All @@ -94,7 +94,7 @@ bool AllreduceBase::Init(int argc, char* argv[]) {
if (num_task == NULL) {
num_task = getenv("mapreduce_job_maps");
}
if (hadoop_mode != 0) {
if (hadoop_mode) {
utils::Check(num_task != NULL,
"hadoop_mode is set but cannot find mapred_map_tasks");
}
Expand Down Expand Up @@ -188,7 +188,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
if (!strcmp(name, "DMLC_TASK_ID")) task_id = val;
if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val;
if (!strcmp(name, "rabit_world_size")) world_size = atoi(val);
if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = atoi(val);
if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val);
if (!strcmp(name, "rabit_reduce_ring_mincount")) {
reduce_ring_mincount = atoi(val);
utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0");
Expand All @@ -209,10 +209,17 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
}
}
if (!strcmp(name, "rabit_bootstrap_cache")) {
rabit_bootstrap_cache = atoi(val);
rabit_bootstrap_cache = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_debug")) {
rabit_debug = atoi(val);
rabit_debug = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_timeout")) {
rabit_timeout = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_timeout_sec")) {
timeout_sec = atoi(val);
utils::Assert(rabit_timeout > 0, "rabit_timeout_sec should be greater than 0 second");
}
}
/*!
Expand Down
10 changes: 7 additions & 3 deletions src/allreduce_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class AllreduceBase : public IEngine {
// version number of model
int version_number;
// whether the job is running in hadoop
int hadoop_mode;
bool hadoop_mode;
//---- local data related to link ----
// index of parent link, can be -1, meaning this is root of the tree
int parent_index;
Expand Down Expand Up @@ -543,9 +543,13 @@ class AllreduceBase : public IEngine {
// backdoor port
int port = 0;
// enable bootstrap cache 0 false 1 true
int rabit_bootstrap_cache = 0;
bool rabit_bootstrap_cache = false;
// enable detailed logging
int rabit_debug = 0;
bool rabit_debug = false;
// by default, if rabit worker not recover in half an hour exit
int timeout_sec = 1800;
// flag to enable rabit_timeout
bool rabit_timeout = false;
};
} // namespace engine
} // namespace rabit
Expand Down
2 changes: 1 addition & 1 deletion src/allreduce_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class AllreduceMock : public AllreduceRobust {
if (mock_map.count(key) != 0) {
num_trial += 1;
// data processing frameworks runs on shared process
utils::Error("[%d]@@@Hit Mock Error:%s\n", rank, name);
_error("[%d]@@@Hit Mock Error:%s ", rank, name);
}
}
};
Expand Down
Loading

0 comments on commit 5d1b613

Please sign in to comment.