Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17859: [C++] Use self-pipe in signal-receiving StopSource #14250

Merged
merged 8 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 147 additions & 43 deletions cpp/src/arrow/util/cancel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include <atomic>
#include <mutex>
#include <sstream>
#include <thread>
#include <utility>

#include "arrow/result.h"
#include "arrow/util/atfork_internal.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand All @@ -33,7 +36,9 @@ namespace arrow {
#error Lock-free atomic int required for signal safety
#endif

using internal::AtForkHandler;
using internal::ReinstateSignalHandler;
using internal::SelfPipe;
using internal::SetSignalHandler;
using internal::SignalHandler;

Expand Down Expand Up @@ -99,16 +104,58 @@ Status StopToken::Poll() const {

namespace {

struct SignalStopState {
struct SignalStopState : public std::enable_shared_from_this<SignalStopState> {
struct SavedSignalHandler {
int signum;
SignalHandler handler;
};

// NOTE: shared_from_this() doesn't work from constructor
void Init() {
// XXX this pattern appears in several places, factor it out?
atfork_handler_ = std::make_shared<AtForkHandler>(
/*before=*/
[weak_self = std::weak_ptr<SignalStopState>(shared_from_this())] {
auto self = weak_self.lock();
if (self) {
self->BeforeFork();
}
return self;
},
/*parent_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
self->ParentAfterFork();
},
/*child_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
self->ChildAfterFork();
});
RegisterAtFork(atfork_handler_);
}

Status RegisterHandlers(const std::vector<int>& signals) {
std::lock_guard<std::mutex> lock(mutex_);
if (!saved_handlers_.empty()) {
return Status::Invalid("Signal handlers already registered");
}
if (!self_pipe_) {
// Make sure the self-pipe is initialized
// (NOTE: avoid std::atomic_is_lock_free() which may require libatomic)
#if ATOMIC_POINTER_LOCK_FREE != 2
return Status::NotImplemented(
"Cannot setup signal StopSource because atomic pointers are not "
"lock-free on this platform");
#else
ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
#endif
}
if (!signal_receiving_thread_) {
// Spawn thread for receiving signals
SpawnSignalReceivingThread();
}
self_pipe_ptr_.store(self_pipe_.get());
for (int signum : signals) {
ARROW_ASSIGN_OR_RAISE(auto handler,
SetSignalHandler(signum, SignalHandler{&HandleSignal}));
Expand All @@ -118,78 +165,135 @@ struct SignalStopState {
}

void UnregisterHandlers() {
std::lock_guard<std::mutex> lock(mutex_);
self_pipe_ptr_.store(nullptr);
auto handlers = std::move(saved_handlers_);
for (const auto& h : handlers) {
ARROW_CHECK_OK(SetSignalHandler(h.signum, h.handler).status());
}
}

~SignalStopState() {
atfork_handler_.reset();
UnregisterHandlers();
Disable();
if (signal_receiving_thread_) {
// Tell the receiving thread to stop
auto st = self_pipe_->Shutdown();
ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe");
if (st.ok()) {
signal_receiving_thread_->join();
} else {
signal_receiving_thread_->detach();
}
}
}

StopSource* stop_source() { return stop_source_.get(); }
StopSource* stop_source() {
std::lock_guard<std::mutex> lock(mutex_);
return stop_source_.get();
}

bool enabled() { return stop_source_ != nullptr; }
bool enabled() {
std::lock_guard<std::mutex> lock(mutex_);
return stop_source_ != nullptr;
}

void Enable() {
// Before creating a new StopSource, delete any lingering reference to
// the previous one in the trash can. See DoHandleSignal() for details.
EmptyTrashCan();
std::atomic_store(&stop_source_, std::make_shared<StopSource>());
std::lock_guard<std::mutex> lock(mutex_);
stop_source_ = std::make_shared<StopSource>();
}

void Disable() { std::atomic_store(&stop_source_, NullSource()); }
void Disable() {
std::lock_guard<std::mutex> lock(mutex_);
stop_source_.reset();
}

static SignalStopState* instance() { return &instance_; }
static SignalStopState* instance() {
static std::shared_ptr<SignalStopState> instance = []() {
auto ptr = std::make_shared<SignalStopState>();
ptr->Init();
return ptr;
}();
return instance.get();
}

private:
// For readability
std::shared_ptr<StopSource> NullSource() { return nullptr; }

void EmptyTrashCan() { std::atomic_store(&trash_can_, NullSource()); }
void SpawnSignalReceivingThread() {
signal_receiving_thread_ = std::make_unique<std::thread>(ReceiveSignals, self_pipe_);
}

static void HandleSignal(int signum) { instance_.DoHandleSignal(signum); }
static void HandleSignal(int signum) {
auto self = instance();
if (self) {
self->DoHandleSignal(signum);
}
}

void DoHandleSignal(int signum) {
// async-signal-safe code only
auto source = std::atomic_load(&stop_source_);
if (source) {
source->RequestStopFromSignal(signum);
// Disable() may have been called in the meantime, but we can't
// deallocate a shared_ptr here, so instead move it to a "trash can".
// This minimizes the possibility of running a deallocator here,
// however it doesn't entirely preclude it.
//
// Possible case:
// - a signal handler (A) starts running, fetches the current source
// - Disable() then Enable() are called, emptying the trash can and
// replacing the current source
// - a signal handler (B) starts running, fetches the current source
// - signal handler A resumes, moves its source (the old source) into
// the trash can (the only remaining reference)
// - signal handler B resumes, moves its source (the current source)
// into the trash can. This triggers deallocation of the old source,
// since the trash can had the only remaining reference to it.
//
// This case should be sufficiently unlikely, but we cannot entirely
// rule it out. The problem might be solved properly with a lock-free
// linked list of StopSources.
std::atomic_store(&trash_can_, std::move(source));
SelfPipe* self_pipe = self_pipe_ptr_.load();
if (self_pipe) {
self_pipe->Send(/*payload=*/signum);
}
ReinstateSignalHandler(signum, &HandleSignal);
}

std::shared_ptr<StopSource> stop_source_;
std::shared_ptr<StopSource> trash_can_;
static void ReceiveSignals(std::shared_ptr<SelfPipe> self_pipe) {
// Wait for signals on the self-pipe and propagate them to the current StopSource
DCHECK(self_pipe);
while (true) {
auto maybe_payload = self_pipe->Wait();
if (maybe_payload.status().IsInvalid()) {
// Pipe shut down
return;
}
if (!maybe_payload.ok()) {
maybe_payload.status().Warn();
return;
}
const int signum = static_cast<int>(maybe_payload.ValueUnsafe());
instance()->ReceiveSignal(signum);
}
}

std::vector<SavedSignalHandler> saved_handlers_;
void ReceiveSignal(int signum) {
std::lock_guard<std::mutex> lock(mutex_);
if (stop_source_) {
stop_source_->RequestStopFromSignal(signum);
}
}

static SignalStopState instance_;
};
// At-fork handlers

void BeforeFork() { mutex_.lock(); }

void ParentAfterFork() { mutex_.unlock(); }

SignalStopState SignalStopState::instance_{};
void ChildAfterFork() {
new (&mutex_) std::mutex;
// Leak previous thread, as it has become invalid.
// We can't spawn a new one here as it would have unfortunate side effects;
// especially in the frequent context of a fork+exec.
// (for example the Python subprocess module closes all fds before calling exec)
ARROW_UNUSED(signal_receiving_thread_.release());
// Make internal state consistent: with no listening thread, we shouldn't
// feed the self-pipe from the signal handler.
UnregisterHandlers();
}

std::mutex mutex_;
std::vector<SavedSignalHandler> saved_handlers_;
std::shared_ptr<StopSource> stop_source_;
std::unique_ptr<std::thread> signal_receiving_thread_;
std::shared_ptr<AtForkHandler> atfork_handler_;

// For signal handler interaction
std::shared_ptr<SelfPipe> self_pipe_;
// Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free
// (it is not on libstdc++).
std::atomic<SelfPipe*> self_pipe_ptr_;
};

} // namespace

Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/cancel.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ARROW_EXPORT StopSource {
// Consumer API (the side that stops)
void RequestStop();
void RequestStop(Status error);
// Async-signal-safe. TODO Deprecate this?
void RequestStopFromSignal(int signum);

StopToken token();
Expand Down Expand Up @@ -103,6 +104,10 @@ ARROW_EXPORT
void ResetSignalStopSource();

/// EXPERIMENTAL: Register signal handler triggering the signal-receiving StopSource
///
/// Note that those handlers are automatically un-registered in a fork()ed process,
/// therefore the child process will need to call RegisterCancellingSignalHandler()
/// if desired.
ARROW_EXPORT
Status RegisterCancellingSignalHandler(const std::vector<int>& signals);

Expand Down
68 changes: 68 additions & 0 deletions cpp/src/arrow/util/cancel_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <cmath>
#include <functional>
#include <optional>
#include <sstream>
#include <string>
Expand All @@ -29,6 +30,8 @@
#include <signal.h>
#ifndef _WIN32
#include <sys/time.h> // for setitimer()
#include <sys/types.h>
#include <unistd.h>
#endif

#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -201,6 +204,23 @@ class SignalCancelTest : public CancelTest {
ASSERT_EQ(internal::SignalFromStatus(st), expected_signal_);
}

#ifndef _WIN32
void RunInChild(std::function<void()> func) {
auto child_pid = fork();
if (child_pid == -1) {
ASSERT_OK(internal::IOErrorFromErrno(errno, "Error calling fork(): "));
}
if (child_pid == 0) {
// Child
ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process";
std::exit(0);
} else {
// Parent
AssertChildExit(child_pid);
}
}
#endif

protected:
#ifdef _WIN32
const int expected_signal_ = SIGINT;
Expand Down Expand Up @@ -238,6 +258,54 @@ TEST_F(SignalCancelTest, RegisterUnregister) {
AssertStopRequested();
}

#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))
TEST_F(SignalCancelTest, ForkSafetyUnregisteredHandlers) {
RunInChild([&]() {
// Child
TriggerSignal();
AssertStopNotRequested();

RegisterHandler();
TriggerSignal();
AssertStopRequested();
});

// Parent: shouldn't notice signals raised in child
AssertStopNotRequested();

// Stop source still usable in parent
TriggerSignal();
AssertStopNotRequested();

RegisterHandler();
TriggerSignal();
AssertStopRequested();
}

TEST_F(SignalCancelTest, ForkSafetyRegisteredHandlers) {
RegisterHandler();

RunInChild([&]() {
// Child: signal handlers are unregistered and need to be re-registered
TriggerSignal();
AssertStopNotRequested();

// Can re-register and receive signals
RegisterHandler();
TriggerSignal();
AssertStopRequested();
});

// Parent: shouldn't notice signals raised in child
AssertStopNotRequested();

// Stop source still usable in parent
TriggerSignal();
AssertStopRequested();
}
#endif

TEST_F(CancelTest, ThreadedPollSuccess) {
constexpr int kNumThreads = 10;

Expand Down
Loading