Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exit when allreduce/broadcast error cause timeout #112

Merged
merged 9 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ mpich-3.2/
cmake-build-debug/
.vscode/

# cmake
build/

15 changes: 14 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ osx_image: xcode10.2

dist: xenial

language: cpp

# Use Build Matrix to do lint and build seperately
env:
matrix:
- TASK=lint LINT_LANG=cpp
- TASK=lint LINT_LANG=python
- TASK=doc
- TASK=build
# - TASK=build
- TASK=mpi-build
- TASK=cmake-test

matrix:
exclude:
- os: osx
env: TASK=lint LINT_LANG=cpp
- os: osx
env: TASK=lint LINT_LANG=python
- os: osx
env: TASK=doc
- os: osx
env: TASK=build

chenqin marked this conversation as resolved.
Show resolved Hide resolved
# dependent apt packages
addons:
apt:
Expand Down
9 changes: 7 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ if(R_LIB OR MINGW OR WIN32)
CXX_STANDARD_REQUIRED ON
POSITION_INDEPENDENT_CODE ON)
else()
add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)
find_package(Threads REQUIRED)
chenqin marked this conversation as resolved.
Show resolved Hide resolved
add_library(rabit_empty src/engine_empty.cc src/c_api.cc)
add_library(rabit_base src/allreduce_base.cc src/engine_base.cc src/c_api.cc)

add_library(rabit src/allreduce_base.cc src/allreduce_robust.cc src/engine.cc src/c_api.cc)
add_library(rabit_mock_static src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
add_library(rabit_mock SHARED src/allreduce_base.cc src/allreduce_robust.cc src/engine_mock.cc src/c_api.cc)
target_link_libraries(rabit Threads::Threads)
target_link_libraries(rabit_mock_static Threads::Threads)
target_link_libraries(rabit_mock Threads::Threads)

set(rabit_libs rabit rabit_base rabit_empty rabit_mock rabit_mock_static)
set_target_properties(rabit rabit_base rabit_empty rabit_mock rabit_mock_static
Expand Down
2 changes: 1 addition & 1 deletion include/rabit/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ inline void HandleAssertError(const char *msg) {
*/
inline void HandleCheckError(const char *msg) {
if (STOP_PROCESS_ON_ERROR) {
fprintf(stderr, "%s, shutting down process", msg);
fprintf(stderr, "%s, shutting down process\n", msg);
chenqin marked this conversation as resolved.
Show resolved Hide resolved
exit(-1);
} else {
fprintf(stderr, "%s, rabit is configured to keep process running\n", msg);
Expand Down
5 changes: 4 additions & 1 deletion scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ if [ ${TASK} == "cmake-test" ]; then
mkdir build
cd build
cmake -DRABIT_BUILD_TESTS=ON -DRABIT_BUILD_DMLC=ON -DGTEST_ROOT=${HOME}/.local ..
# known osx gtest 1.8 issue
cp ${HOME}/.local/lib/*.dylib .
#unit tests
make
make -j4
chenqin marked this conversation as resolved.
Show resolved Hide resolved
make test
#make test VERBOSE=1 || exit -1
chenqin marked this conversation as resolved.
Show resolved Hide resolved
make install || exit -1
cd ../test
../scripts/travis_runtest.sh || exit -1
Expand Down
7 changes: 7 additions & 0 deletions src/allreduce_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ void AllreduceBase::SetParam(const char *name, const char *val) {
if (!strcmp(name, "rabit_debug")) {
rabit_debug = atoi(val);
}
if (!strcmp(name, "rabit_timeout")) {
rabit_timeout = atoi(val);
}
if (!strcmp(name, "rabit_timeout_sec")) {
timeout_sec = atoi(val);
utils::Assert(rabit_timeout > 0, "rabit_timeout_sec should be greater than 0 second");
}
}
/*!
* \brief initialize connection to the tracker
Expand Down
4 changes: 4 additions & 0 deletions src/allreduce_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@ class AllreduceBase : public IEngine {
int rabit_bootstrap_cache = 0;
// enable detailed logging
int rabit_debug = 0;
// by default, if rabit worker not recover in half an hour exit
int timeout_sec = 1800;
// flag to enable rabit_timeout
int rabit_timeout = 0;
chenqin marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace engine
} // namespace rabit
Expand Down
2 changes: 1 addition & 1 deletion src/allreduce_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class AllreduceMock : public AllreduceRobust {
if (mock_map.count(key) != 0) {
num_trial += 1;
// data processing frameworks runs on shared process
utils::Error("[%d]@@@Hit Mock Error:%s\n", rank, name);
_error("[%d]@@@Hit Mock Error:%s ", rank, name);
}
}
};
Expand Down
38 changes: 33 additions & 5 deletions src/allreduce_robust.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE
#define NOMINMAX
#include <chrono>
#include <thread>
#include <limits>
#include <utility>
#include "rabit/internal/io.h"
Expand Down Expand Up @@ -64,9 +66,9 @@ bool AllreduceRobust::Shutdown(void) {
// execute check ack step, load happens here
utils::Assert(RecoverExec(NULL, 0, ActionSummary::kCheckAck,
ActionSummary::kSpecialOp, cur_cache_seq), "Shutdown: check ack must return true");
#if defined (__APPLE__)
sleep(1);
#endif
shutdown_timeout = true;
if (rabit_timeout_task.valid()) rabit_timeout_task.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return AllreduceBase::Shutdown();
} catch (const std::exception& e) {
fprintf(stderr, "%s\n", e.what());
Expand Down Expand Up @@ -600,14 +602,40 @@ AllreduceRobust::ReturnType AllreduceRobust::TryResetLinks(void) {
* \return true if err_type is kSuccess, false otherwise
*/
bool AllreduceRobust::CheckAndRecover(ReturnType err_type) {
shutdown_timeout = err_type == kSuccess;
if (err_type == kSuccess) return true;
utils::Assert(err_link != NULL, "must know the error source");
recover_counter += 1;

utils::Assert(err_link != NULL, "must know the error link");
recover_counter += 1;
// async launch timeout task if enable_rabit_timeout is set
if (rabit_timeout != 0) {
utils::Printf("[EXPERIMENTAL] rabit timeout thread expires in %d second(s)\n", timeout_sec);
rabit_timeout_task = std::async(std::launch::async, [=]() {
if (rabit_debug) {
utils::Printf("[%d] rabit timeout thread %ld starts\n", rank,
std::this_thread::get_id());
}
int time = 0;
// check if rabit recovered every 100ms
while (time++ < 10 * timeout_sec) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (shutdown_timeout.load()) {
if (rabit_debug) {
utils::Printf("[%d] rabit timeout task thread %ld exits\n",
rank, std::this_thread::get_id());
}
return true;
}
}
_error("[%d] exit due to rabit time out %d s\n", rank, timeout_sec);
return false;
});
}
// simple way, shutdown all links
for (size_t i = 0; i < all_links.size(); ++i) {
if (!all_links[i].sock.BadSocket()) all_links[i].sock.Close();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10*rank));
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
ReConnectLinks("recover");
return false;
}
Expand Down
7 changes: 7 additions & 0 deletions src/allreduce_robust.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#ifndef RABIT_ALLREDUCE_ROBUST_H_
#define RABIT_ALLREDUCE_ROBUST_H_
#include <future>
#include <vector>
#include <string>
#include <algorithm>
Expand Down Expand Up @@ -632,6 +633,12 @@ o * the input state must exactly one saved state(local state of current node)
int local_chkpt_version;
// if checkpoint were loaded, used to distinguish results boostrap cache from seqno cache
bool checkpoint_loaded;
// sidecar executing timeout task
std::future<bool> rabit_timeout_task;
// flag to shutdown rabit_timeout_task before timeout
std::atomic<bool> shutdown_timeout{false};
// error handler
void (* _error)(const char *fmt, ...) = utils::Error;
};
} // namespace engine
} // namespace rabit
Expand Down
7 changes: 3 additions & 4 deletions test/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ find_package(GTest REQUIRED)

add_executable(
unit_tests
allreduce_base_test.cpp
allreduce_mock_test.cpp
allreduce_base_test.cc allreduce_robust_test.cc allreduce_mock_test.cc
test_main.cpp)

target_link_libraries(
unit_tests
unit_tests PRIVATE
GTest::GTest GTest::Main
rabit_base rabit_mock)
rabit_base rabit_mock rabit)

target_include_directories(unit_tests PUBLIC
"$<BUILD_INTERFACE:${rabit_SOURCE_DIR}/include>"
Expand Down
File renamed without changes.
File renamed without changes.
146 changes: 146 additions & 0 deletions test/cpp/allreduce_robust_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#define RABIT_CXXTESTDEFS_H
#include <gtest/gtest.h>

#include <chrono>
#include <string>
#include <iostream>
#include "../../src/allreduce_robust.h"

inline void mockerr(const char *fmt, ...) {
EXPECT_STRCASEEQ(fmt, "[%d] exit due to rabit time out %d s\n");
}

TEST(allreduce_robust, sync_error_timeout)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

char* argv[] = {cmd,cmd1};
m.Init(2, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
m._error = mockerr;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::utils::STOP_PROCESS_ON_ERROR = false;
EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
EXPECT_EQ(m.rabit_timeout_task.get(), false);
}

TEST(allreduce_robust, sync_error_reset)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
EXPECT_EQ(m.rabit_timeout_task.get(), true);
m.Shutdown();
}

TEST(allreduce_robust, sync_success_error_timeout)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
m._error = mockerr;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
EXPECT_EQ(m.rabit_timeout_task.get(), false);
}

TEST(allreduce_robust, sync_success_error_success)
{
rabit::engine::AllreduceRobust m;

std::string rabit_timeout = "rabit_timeout=1";
char cmd[rabit_timeout.size()+1];
std::copy(rabit_timeout.begin(), rabit_timeout.end(), cmd);
cmd[rabit_timeout.size()] = '\0';

std::string rabit_timeout_sec = "rabit_timeout_sec=1";
char cmd1[rabit_timeout_sec.size()+1];
std::copy(rabit_timeout_sec.begin(), rabit_timeout_sec.end(), cmd1);
cmd1[rabit_timeout_sec.size()] = '\0';

std::string rabit_debug = "rabit_debug=1";
char cmd2[rabit_debug.size()+1];
std::copy(rabit_debug.begin(), rabit_debug.end(), cmd2);
cmd2[rabit_debug.size()] = '\0';

char* argv[] = {cmd, cmd1,cmd2};
m.Init(3, argv);
m.rank = 0;
m.rabit_bootstrap_cache = 1;
rabit::engine::AllreduceRobust::LinkRecord a;
m.err_link = &a;
rabit::engine::AllreduceRobust::ReturnType err_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSockError);
rabit::engine::AllreduceRobust::ReturnType succ_type(rabit::engine::AllreduceRobust::ReturnTypeEnum::kSuccess);
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(10));

EXPECT_EQ(m.CheckAndRecover(err_type), false);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_EQ(m.CheckAndRecover(succ_type), true);
std::this_thread::sleep_for(std::chrono::milliseconds(1100));
EXPECT_EQ(m.rabit_timeout_task.get(), true);
m.Shutdown();
}
1 change: 1 addition & 0 deletions test/cpp/test_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}
2 changes: 1 addition & 1 deletion test/test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ all: model_recover_10_10k model_recover_10_10k_die_same model_recover_10_10k_di

# this experiment test recovery with actually process exit, use keepalive to keep program alive
model_recover_10_10k:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 rabit_bootstrap_cache=-1 rabit_debug=1 rabit_reduce_ring_mincount=1
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 rabit_bootstrap_cache=-1 rabit_debug=1 rabit_timeout=1 rabit_timeout_sec=5

model_recover_10_10k_die_same:
$(DMLC)/tracker/dmlc-submit --cluster local --num-workers=10 --local-num-attempt=20 model_recover 10000 mock=0,0,1,0 mock=1,1,1,0 mock=0,1,1,0 mock=4,1,1,0 mock=9,1,1,0 rabit_bootstrap_cache=1
Expand Down