Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exit when allreduce/broadcast error cause timeout #112

Merged
merged 9 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ mpich-3.2/
cmake-build-debug/
.vscode/

# cmake
build/

15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ osx_image: xcode10.2

dist: xenial

language: cpp

# Use Build Matrix to do lint and build seperately
env:
matrix:
- TASK=lint LINT_LANG=cpp
- TASK=lint LINT_LANG=python
- TASK=doc
- TASK=build
# - TASK=build
- TASK=mpi-build
- TASK=cmake-test

matrix:
exclude:
- os: osx
env: TASK=lint LINT_LANG=cpp
- os: osx
env: TASK=lint LINT_LANG=python
- os: osx
env: TASK=doc
- os: osx
env: TASK=build

chenqin marked this conversation as resolved.
Show resolved Hide resolved
# dependent apt packages
addons:
apt:
Expand Down
9 changes: 7 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ if(R_LIB OR MINGW OR WIN32)
CXX_STANDARD_REQUIRED ON
POSITION_INDEPENDENT_CODE ON)
else()
add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)
find_package(Threads REQUIRED)
chenqin marked this conversation as resolved.
Show resolved Hide resolved
add_library(rabit_empty src/engine_empty.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)

add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
target_link_libraries(rabit Threads::Threads)
target_link_libraries(rabit_mock_static Threads::Threads)
target_link_libraries(rabit_mock Threads::Threads)

set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static)
set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static
Expand Down
11 changes: 11 additions & 0 deletions doc/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ you can also refer to [wormhole](https://github.com/dmlc/wormhole/blob/master/le
int main(int argc, char *argv[]) {
...
rabit::Init(argc, argv);
// sync on expected model size before load checkpoint, if we pass rabit_bootstrap_cache=true
rabit::Allreduce<rabit::op::Max>(&model.size(), 1);
// load the latest checked model
int version = rabit::LoadCheckPoint(&model);
// initialize the model if it is the first version
Expand Down Expand Up @@ -370,3 +372,12 @@ Allreduce/Broadcast calls after the checkpoint from some alive nodes.

This is just a conceptual introduction to rabit's fault tolerance model. The actual implementation is more sophisticated,
and can deal with more complicated cases such as multiple nodes failure and node failure during recovery phase.

Rabit Timeout
---------------

In certain cases, rabit cluster may suffer lack of resources to retry failed workers.
Thanks to fault tolerant assumption with infinite retry, it might cause entire cluster hang infinitely.
We introduce sidecar thread which runs when rabit fault tolerant runtime observed allreduce/broadcast errors.
By default, it will wait for 30 mins before all workers program exit.
User can opt-in this feature and change treshold by passing rabit_timeout=true and rabit_timeout_sec=x (in seconds).
8 changes: 7 additions & 1 deletion include/rabit/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef RABIT_INTERNAL_UTILS_H_
#define RABIT_INTERNAL_UTILS_H_
#define _CRT_SECURE_NO_WARNINGS
#include <string.h>
#include <cstdio>
#include <string>
#include <cstdlib>
Expand Down Expand Up @@ -66,6 +67,11 @@ const int kPrintBuffer = 1 << 12;
* co-locate in the same process */
extern bool STOP_PROCESS_ON_ERROR;

/* \brief parse config string too bool*/
inline bool StringToBool(const char* s) {
return strcasecmp(s, "true") == 0 || atoi(s) != 0;
}

#ifndef RABIT_CUSTOMIZE_MSG_
/*!
* \brief handling of Assert error, caused by inappropriate input
Expand All @@ -86,7 +92,7 @@ inline void HandleAssertError(const char *msg) {
*/
inline void HandleCheckError(const char *msg) {
if (STOP_PROCESS_ON_ERROR) {
fprintf(stderr, "%s, shutting down process", msg);
fprintf(stderr, "%s, shutting down process\n", msg);
chenqin marked this conversation as resolved.
Show resolved Hide resolved
exit(-1);
} else {
fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
Expand Down
5 changes: 3 additions & 2 deletions scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ if [ ${TASK} == "cmake-test" ]; then
mkdir build
cd build
cmake -DRABIT_BUILD_TESTS=ON -DRABIT_BUILD_DMLC=ON -DGTEST_ROOT=${HOME}/.local ..
#unit tests
make
# known osx gtest 1.8 issue
cp ${HOME}/.local/lib/*.dylib .
make -j$(nproc)
make test
make install || exit -1
cd ../test
Expand Down
17 changes: 12 additions & 5 deletions src/allreduce_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool AllreduceBase::Init(int argc, char* argv[]) {
if (task_id == NULL) {
task_id = getenv("mapreduce_task_id");
}
if (hadoop_mode != 0) {
if (hadoop_mode) {
utils::Check(task_id != NULL,
"hadoop_mode is set but cannot find mapred_task_id");
}
Expand All @@ -94,7 +94,7 @@ bool AllreduceBase::Init(int argc, char* argv[]) {
if (num_task == NULL) {
num_task = getenv("mapreduce_job_maps");
}
if (hadoop_mode != 0) {
if (hadoop_mode) {
utils::Check(num_task != NULL,
"hadoop_mode is set but cannot find mapred_map_tasks");
}
Expand Down Expand Up @@ -188,7 +188,7 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
if (!strcmp(name, "DMLC_TASK_ID")) task_id = val;
if (!strcmp(name, "DMLC_ROLE")) dmlc_role = val;
if (!strcmp(name, "rabit_world_size")) world_size = atoi(val);
if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = atoi(val);
if (!strcmp(name, "rabit_hadoop_mode")) hadoop_mode = utils::StringToBool(val);
if (!strcmp(name, "rabit_reduce_ring_mincount")) {
reduce_ring_mincount = atoi(val);
utils::Assert(reduce_ring_mincount > 0, "rabit_reduce_ring_mincount should be greater than 0");
Expand All @@ -209,10 +209,17 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
}
}
if (!strcmp(name, "rabit_bootstrap_cache")) {
rabit_bootstrap_cache = atoi(val);
rabit_bootstrap_cache = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_debug")) {
rabit_debug = atoi(val);
rabit_debug = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_timeout")) {
rabit_timeout = utils::StringToBool(val);
}
if (!strcmp(name, "rabit_timeout_sec")) {
timeout_sec = atoi(val);
utils::Assert(rabit_timeout > 0, "rabit_timeout_sec should be greater than 0 second");
}
}
/*!
Expand Down
10 changes: 7 additions & 3 deletions src/allreduce_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class AllreduceBase : public IEngine {
// version number of model
int version_number;
// whether the job is running in hadoop
int hadoop_mode;
bool hadoop_mode;
//---- local data related to link ----
// index of parent link, can be -1, meaning this is root of the tree
int parent_index;
Expand Down Expand Up @@ -543,9 +543,13 @@ class AllreduceBase : public IEngine {
// backdoor port
int port = 0;
// enable bootstrap cache 0 false 1 true
int rabit_bootstrap_cache = 0;
bool rabit_bootstrap_cache = false;
// enable detailed logging
int rabit_debug = 0;
bool rabit_debug = false;
// by default, if rabit worker not recover in half an hour exit
int timeout_sec = 1800;
// flag to enable rabit_timeout
bool rabit_timeout = false;
};
} // namespace engine
} // namespace rabit
Expand Down
2 changes: 1 addition & 1 deletion src/allreduce_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class AllreduceMock : public AllreduceRobust {
if (mock_map.count(key) != 0) {
num_trial += 1;
// data processing frameworks runs on shared process
utils::Error("[%d]@@@Hit Mock Error:%s\n", rank, name);
_error("[%d]@@@Hit Mock Error:%s ", rank, name);
}
}
};
Expand Down
Loading