Skip to content

Commit

Permalink
Use atfork handler in signal StopSource
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 14, 2022
1 parent 260b4e6 commit 9ef40c2
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 66 deletions.
112 changes: 57 additions & 55 deletions cpp/src/arrow/util/cancel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@
#include <thread>
#include <utility>

#ifndef _WIN32
// For getpid()
#include <sys/types.h>
#include <unistd.h>
#endif

#include "arrow/result.h"
#include "arrow/util/atfork_internal.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"
Expand All @@ -41,6 +36,7 @@ namespace arrow {
#error Lock-free atomic int required for signal safety
#endif

using internal::AtForkHandler;
using internal::ReinstateSignalHandler;
using internal::SelfPipe;
using internal::SetSignalHandler;
Expand Down Expand Up @@ -108,12 +104,37 @@ Status StopToken::Poll() const {

namespace {

struct SignalStopState {
struct SignalStopState : public std::enable_shared_from_this<SignalStopState> {
struct SavedSignalHandler {
int signum;
SignalHandler handler;
};

// NOTE: shared_from_this() doesn't work from constructor
void Init() {
// XXX this pattern appears in several places, factor it out?
atfork_handler_ = std::make_shared<AtForkHandler>(
/*before=*/
[weak_self = std::weak_ptr<SignalStopState>(shared_from_this())] {
auto self = weak_self.lock();
if (self) {
self->BeforeFork();
}
return self;
},
/*parent_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
self->ParentAfterFork();
},
/*child_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
self->ChildAfterFork();
});
RegisterAtFork(atfork_handler_);
}

Status RegisterHandlers(const std::vector<int>& signals) {
std::lock_guard<std::mutex> lock(mutex_);
if (!saved_handlers_.empty()) {
Expand Down Expand Up @@ -152,22 +173,17 @@ struct SignalStopState {
}

~SignalStopState() {
atfork_handler_.reset();
UnregisterHandlers();
Disable();
if (signal_receiving_thread_) {
if (InForkedChild()) {
// std::thread is basically unusuable in a forked child, even the
// destructor can crash.
signal_receiving_thread_.release();
// Tell the receiving thread to stop
auto st = self_pipe_->Shutdown();
ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe");
if (st.ok()) {
signal_receiving_thread_->join();
} else {
// Tell the receiving thread to stop
auto st = self_pipe_->Shutdown();
ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe");
if (st.ok()) {
signal_receiving_thread_->join();
} else {
signal_receiving_thread_->detach();
}
signal_receiving_thread_->detach();
}
}
}
Expand All @@ -189,42 +205,25 @@ struct SignalStopState {

void Disable() {
std::lock_guard<std::mutex> lock(mutex_);
stop_source_ = NullSource();
stop_source_.reset();
}

static SignalStopState* instance(bool signal_safe = false) {
static auto instance = std::make_shared<SignalStopState>();
#ifndef _WIN32
if (instance->InForkedChild()) {
// In child process
if (signal_safe) {
return nullptr;
}
// Not called from a signal => can reinitialize
auto lock = arrow::util::GlobalForkSafeMutex()->Lock();
if (instance->pid_.load() != getpid()) {
bool thread_was_launched{instance->signal_receiving_thread_};
// Reinitialize internal structures after fork
instance = std::make_shared<SignalStopState>();
if (thread_was_launched) {
instance->SpawnSignalReceivingThread();
}
}
}
#endif
static SignalStopState* instance() {
static std::shared_ptr<SignalStopState> instance = []() {
auto ptr = std::make_shared<SignalStopState>();
ptr->Init();
return ptr;
}();
return instance.get();
}

private:
// For readability
std::shared_ptr<StopSource> NullSource() { return nullptr; }

void SpawnSignalReceivingThread() {
signal_receiving_thread_ = std::make_unique<std::thread>(ReceiveSignals, self_pipe_);
}

static void HandleSignal(int signum) {
auto self = instance(/*signal_safe=*/true);
auto self = instance();
if (self) {
self->DoHandleSignal(signum);
}
Expand Down Expand Up @@ -264,29 +263,32 @@ struct SignalStopState {
}
}

bool InForkedChild() {
#ifndef _WIN32
return pid_.load() != getpid();
#else
return false;
#endif
// At-fork handlers

void BeforeFork() { mutex_.lock(); }

void ParentAfterFork() { mutex_.unlock(); }

void ChildAfterFork() {
new (&mutex_) std::mutex;
if (signal_receiving_thread_) {
// Leak previous thread and re-spawn
ARROW_UNUSED(signal_receiving_thread_.release());
SpawnSignalReceivingThread();
}
}

std::mutex mutex_;
std::vector<SavedSignalHandler> saved_handlers_;
std::shared_ptr<StopSource> stop_source_;
std::unique_ptr<std::thread> signal_receiving_thread_;
std::shared_ptr<AtForkHandler> atfork_handler_;

// For signal handler interaction
std::shared_ptr<SelfPipe> self_pipe_;
// Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free
// (it is not on libstdc++).
std::atomic<SelfPipe*> self_pipe_ptr_;

// For fork safety
#ifndef _WIN32
std::atomic<pid_t> pid_{getpid()};
#endif
};

} // namespace
Expand Down
38 changes: 27 additions & 11 deletions cpp/src/arrow/util/cancel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <cmath>
#include <functional>
#include <optional>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -203,6 +204,23 @@ class SignalCancelTest : public CancelTest {
ASSERT_EQ(internal::SignalFromStatus(st), expected_signal_);
}

#ifndef _WIN32
void RunInChild(std::function<void()> func) {
auto child_pid = fork();
if (child_pid == -1) {
ASSERT_OK(internal::IOErrorFromErrno(errno, "Error calling fork(): "));
}
if (child_pid == 0) {
// Child
ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process";
std::exit(0);
} else {
// Parent
AssertChildExit(child_pid);
}
}
#endif

protected:
#ifdef _WIN32
const int expected_signal_ = SIGINT;
Expand Down Expand Up @@ -244,21 +262,19 @@ TEST_F(SignalCancelTest, RegisterUnregister) {
TEST_F(SignalCancelTest, ForkSafety) {
RegisterHandler();

auto child_pid = fork();
if (child_pid == 0) {
RunInChild([&]() {
// Child: trigger signal
TriggerSignal();
// Stop source destruction should neither crash not affect parent
std::exit(0);
} else {
// Parent: shouldn't notice signal raised in child
AssertChildExit(child_pid);
AssertStopNotRequested();

// Stop source still usable in parent
TriggerSignal();
AssertStopRequested();
}
});

// Parent: shouldn't notice signals raised in child
AssertStopNotRequested();

// Stop source still usable in parent
TriggerSignal();
AssertStopRequested();
}
#endif

Expand Down

0 comments on commit 9ef40c2

Please sign in to comment.