Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exit when allreduce/broadcast error cause timeout #112

Merged
merged 9 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ osx_image: xcode10.2

dist: xenial

language: cpp

# Use Build Matrix to do lint and build seperately
env:
matrix:
- TASK=lint LINT_LANG=cpp
- TASK=lint LINT_LANG=python
- TASK=doc
- TASK=build
# - TASK=build
- TASK=mpi-build
- TASK=cmake-test

matrix:
exclude:
- os: osx
env: TASK=lint LINT_LANG=cpp
- os: osx
env: TASK=lint LINT_LANG=python
- os: osx
env: TASK=doc
- os: osx
env: TASK=build

chenqin marked this conversation as resolved.
Show resolved Hide resolved
# dependent apt packages
addons:
apt:
Expand Down
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ if(R_LIB OR MINGW OR WIN32)
CXX_STANDARD_REQUIRED ON
POSITION_INDEPENDENT_CODE ON)
else()
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)
find_package(Threads REQUIRED)
chenqin marked this conversation as resolved.
Show resolved Hide resolved
add_library(rabit_empty src/engine_empty.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)

add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
target_link_libraries(rabit Threads::Threads)
target_link_libraries(rabit_mock_static Threads::Threads)
target_link_libraries(rabit_mock Threads::Threads)

set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static)
set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static
Expand Down
2 changes: 1 addition & 1 deletion include/rabit/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ inline void HandleAssertError(const char *msg) {
*/
inline void HandleCheckError(const char *msg) {
if (STOP_PROCESS_ON_ERROR) {
fprintf(stderr, "%s, shutting down process", msg);
fprintf(stderr, "%s, shutting down process\n", msg);
chenqin marked this conversation as resolved.
Show resolved Hide resolved
exit(-1);
} else {
fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
Expand Down
3 changes: 2 additions & 1 deletion scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ if [ ${TASK} == "cmake-test" ]; then
# known osx gtest 1.8 issue
cp ${HOME}/.local/lib/*.dylib .
#unit tests
make
make -j4
chenqin marked this conversation as resolved.
Show resolved Hide resolved
make test
#make test VERBOSE=1 || exit -1
chenqin marked this conversation as resolved.
Show resolved Hide resolved
make install || exit -1
cd ../test
../scripts/travis_runtest.sh || exit -1
Expand Down
5 changes: 4 additions & 1 deletion src/allreduce_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
}
if (!strcmp(name, "rabit_timeout")) {
rabit_timeout = atoi(val);
utils::Assert(rabit_timeout > 1, "rabit_timeout should be greater than 1 second");
}
if (!strcmp(name, "rabit_timeout_sec")) {
timeout_sec = atoi(val);
utils::Assert(rabit_timeout > 0, "rabit_timeout_sec should be greater than 0 second");
}
}
/*!
Expand Down
6 changes: 4 additions & 2 deletions src/allreduce_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,10 @@ class AllreduceBase : public IEngine {
int rabit_bootstrap_cache = 0;
// enable detailed logging
int rabit_debug = 0;
// if rabit worker not recover in half an hour exit
int rabit_timeout = 1800;
// by default, if rabit worker not recover in half an hour exit
int timeout_sec = 1800;
// flag to enable rabit_timeout
int rabit_timeout = 0;
chenqin marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace engine
} // namespace rabit
Expand Down
2 changes: 1 addition & 1 deletion src/allreduce_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class AllreduceMock : public AllreduceRobust {
if (mock_map.count(key) != 0) {
num_trial += 1;
// data processing frameworks runs on shared process
utils::Error("[%d]@@@Hit Mock Error:%s\n", rank, name);
_error("[%d]@@@Hit Mock Error:%s ", rank, name);
}
}
};
Expand Down
43 changes: 25 additions & 18 deletions src/allreduce_robust.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ bool AllreduceRobust::Shutdown(void) {
// execute check ack step, load happens here
utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck,
ActionSummary::kSpecialOp, cur_cache_seq), "Shutdown: check ack must return true");
_mutex.lock(); _exit = true; _mutex.unlock();
shutdown_timeout = true;
if (rabit_timeout_task.valid()) rabit_timeout_task.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return AllreduceBase::Shutdown();
} catch (const std::exception& e) {
fprintf(stderr, "%s\n", e.what());
Expand Down Expand Up @@ -600,35 +602,40 @@ AllreduceRobust::ReturnType AllreduceRobust::TryResetLinks(void) {
* \return true if err_type is kSuccess, false otherwise
*/
bool AllreduceRobust::CheckAndRecover(ReturnType err_type) {
_mutex.lock(); _exit = err_type == kSuccess; _mutex.unlock();
shutdown_timeout = err_type == kSuccess;
if (err_type == kSuccess) return true;

utils::Assert(err_link != NULL, "must know the error source");
utils::Assert(err_link != NULL, "must know the error link");
recover_counter += 1;
// async launch timeout task
rabit_timeout_task = std::async(std::launch::async, [=](){
_mutex.lock();
// async launch timeout task if enable_rabit_timeout is set
if (rabit_timeout != 0) {
utils::Printf("[EXPERIMENTAL] rabit timeout thread expires in %d second(s)\n", timeout_sec);
rabit_timeout_task = std::async(std::launch::async, [=]() {
if (rabit_debug) {
utils::Printf("[%d] timeout task thread %d is running,"
"with _exit set to %d\n", rank,
std::this_thread::get_id(), _exit);
utils::Printf("[%d] rabit timeout thread %ld starts\n", rank,
std::this_thread::get_id());
}
_mutex.unlock();
int time = 0;
bool exit = true;
// check if rabit recovered every 100ms
while (time++ < 10 * rabit_timeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
_mutex.lock(); exit = _exit; _mutex.unlock();
if (exit) return;
while (time++ < 10 * timeout_sec) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (shutdown_timeout.load()) {
if (rabit_debug) {
utils::Printf("[%d] rabit timeout task thread %ld exits\n",
rank, std::this_thread::get_id());
}
return true;
}
}
utils::Error("[%d] exit due to rabit time out %d s\n", rank, rabit_timeout);
});

_error("[%d] exit due to rabit time out %d s\n", rank, timeout_sec);
return false;
});
}
// simple way, shutdown all links
for (size_t i = 0; i < all_links.size(); ++i) {
if (!all_links[i].sock.BadSocket()) all_links[i].sock.Close();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10*rank));
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
ReConnectLinks("recover");
return false;
}
Expand Down
11 changes: 6 additions & 5 deletions src/allreduce_robust.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#ifndef RABIT_ALLREDUCE_ROBUST_H_
#define RABIT_ALLREDUCE_ROBUST_H_
#include <future>
#include <mutex>
#include <vector>
#include <string>
#include <algorithm>
Expand Down Expand Up @@ -634,10 +633,12 @@ o * the input state must exactly one saved state(local state of current node)
int local_chkpt_version;
// if checkpoint were loaded, used to distinguish results boostrap cache from seqno cache
bool checkpoint_loaded;
// sidecar executing timeout
std::future<void> rabit_timeout_task;
std::mutex _mutex;
bool _exit;
// sidecar executing timeout task
std::future<bool> rabit_timeout_task;
// flag to shutdown rabit_timeout_task before timeout
std::atomic<bool> shutdown_timeout{false};
// error handler
void (* _error)(const char *fmt, ...) = utils::Error;
};
} // namespace engine
} // namespace rabit
Expand Down
128 changes: 110 additions & 18 deletions test/cpp/allreduce_robust_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,141 @@
#include <iostream>
#include "../../src/allreduce_robust.h"

inline void mockerr(const char *fmt, ...) {
EXPECT_STRCASEEQ(fmt, "[%d] exit due to rabit time out %d s\n");
}

TEST(allreduce_robust, sync_error_timeout)
{
rabit::engine::AllreduceRobust m;

std::string mock_str = "rabit_timeout=2";
char cmd[mock_str.size()+1];
std::copy(mock_str.begin(), mock_str.end(), cmd);
cmd[mock_str.size()] = '\0';
std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

char* argv[] = {cmd};
m.Init(1, argv);
char* argv[] = {cmd,cmd1};
m.Init(2, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
m._error = mockerr;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::utils::STOP_PROCESS_ON_ERROR = false;
EXPECT_EQ(m.CheckAndRecover(err_type), false);

EXPECT_EXIT({
std::this_thread::sleep_for(std::chrono::seconds(3));
}, ::testing::ExitedWithCode(255), "");
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
EXPECT_EQ(m.rabit_timeout_task.get(), false);
}

TEST(allreduce_robust, sync_error_reset)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=3";
std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd2};
m.Init(2, argv);
char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(m.CheckAndRecover(succ_type), false);
EXPECT_EXIT({
std::this_thread::sleep_for(std::chrono::seconds(3));
}, ::testing::ExitedWithCode(0), "");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
EXPECT_EQ(m.rabit_timeout_task.get(), true);
m.Shutdown();
}

TEST(allreduce_robust, sync_success_error_timeout)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
m._error = mockerr;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
EXPECT_EQ(m.rabit_timeout_task.get(), false);
}

TEST(allreduce_robust, sync_success_error_success)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(10));

EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(1100));
EXPECT_EQ(m.rabit_timeout_task.get(), true);
m.Shutdown();
}
1 change: 1 addition & 0 deletions test/cpp/test_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}
14 changes: 7 additions & 7 deletions test/test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ all: model_recover_10_10k model_recover_10_10k_die_same model_recover_10_10k_di

# this experiment test recovery with actually process exit, use keepalive to keep program alive
model_recover_10_10k:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 rabit_bootstrap_cache=-1 rabit_debug=1 rabit_reduce_ring_mincount=1 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 rabit_bootstrap_cache=-1 rabit_debug=1 rabit_timeout=1 rabit_timeout_sec=5

model_recover_10_10k_die_same:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 rabit_bootstrap_cache=1 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 rabit_bootstrap_cache=1

model_recover_10_10k_die_hard:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 rabit_bootstrap_cache=1 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 rabit_bootstrap_cache=1

local_recover_10_10k:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1

pylocal_recover_10_10k:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 local_recover.py 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=1,1,1,1

lazy_recover_10_10k_die_hard:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=1,1,1,1 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 mock=8,1,2,0 mock=4,1,3,0

lazy_recover_10_10k_die_same:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 rabit_timeout=5
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 lazy_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0

ringallreduce_10_10k:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 model_recover 10000 rabit_reduce_ring_mincount=10