From fe2759f0153d78309251ff09bd63caa67f4e57b1 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 27 Sep 2022 18:12:14 +0200 Subject: [PATCH 1/8] ARROW-17859: [C++] Use self-pipe in signal-receiving StopSource --- cpp/src/arrow/util/cancel.cc | 163 ++++++++++++++++++++++++++-------- cpp/src/arrow/util/cancel.h | 1 + cpp/src/arrow/util/io_util.cc | 24 +++++ 3 files changed, 150 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index 5fe4ae3e30488..3143dd6681b7a 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -20,11 +20,19 @@ #include #include #include +#include #include +#ifndef _WIN32 +// For getpid() +#include +#include +#endif + #include "arrow/result.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" #include "arrow/util/visibility.h" namespace arrow { @@ -34,6 +42,7 @@ namespace arrow { #endif using internal::ReinstateSignalHandler; +using internal::SelfPipe; using internal::SetSignalHandler; using internal::SignalHandler; @@ -106,9 +115,23 @@ struct SignalStopState { }; Status RegisterHandlers(const std::vector& signals) { + std::lock_guard lock(mutex_); if (!saved_handlers_.empty()) { return Status::Invalid("Signal handlers already registered"); } + if (!self_pipe_) { + // Make sure the self-pipe is initialized + if (!std::atomic_is_lock_free(&self_pipe_ptr_)) { + return Status::NotImplemented( + "Cannot setup signal StopSource because atomic pointers are not " + "lock-free on this platform"); + } + ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); + // Spawn thread for receiving signals + DCHECK(!signal_receiving_thread_); + SpawnSignalReceivingThread(); + } + self_pipe_ptr_.store(self_pipe_.get()); for (int signum : signals) { ARROW_ASSIGN_OR_RAISE(auto handler, SetSignalHandler(signum, SignalHandler{&HandleSignal})); @@ -118,6 +141,8 @@ struct SignalStopState { } void UnregisterHandlers() { + std::lock_guard lock(mutex_); + self_pipe_ptr_.store(nullptr); auto handlers = std::move(saved_handlers_); for (const auto& h : handlers) { ARROW_CHECK_OK(SetSignalHandler(h.signum, h.handler).status()); @@ -127,69 +152,131 @@ struct SignalStopState { ~SignalStopState() { UnregisterHandlers(); Disable(); + if (signal_receiving_thread_) { + if (InForkedChild()) { + // std::thread is basically unusuable in a forked child, even the + // destructor can crash. + signal_receiving_thread_.release(); + } else { + // Tell the receiving thread to stop + auto st = self_pipe_->Shutdown(); + ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe"); + if (st.ok()) { + signal_receiving_thread_->join(); + } else { + signal_receiving_thread_->detach(); + } + } + } } - StopSource* stop_source() { return stop_source_.get(); } + StopSource* stop_source() { + std::lock_guard lock(mutex_); + return stop_source_.get(); + } - bool enabled() { return stop_source_ != nullptr; } + bool enabled() { + std::lock_guard lock(mutex_); + return stop_source_ != nullptr; + } void Enable() { - // Before creating a new StopSource, delete any lingering reference to - // the previous one in the trash can. See DoHandleSignal() for details. - EmptyTrashCan(); - std::atomic_store(&stop_source_, std::make_shared()); + std::lock_guard lock(mutex_); + stop_source_ = std::make_shared(); } - void Disable() { std::atomic_store(&stop_source_, NullSource()); } + void Disable() { + std::lock_guard lock(mutex_); + stop_source_ = NullSource(); + } - static SignalStopState* instance() { return &instance_; } + static const std::shared_ptr& instance() { + static auto instance = std::make_shared(); +#ifndef _WIN32 + if (instance->InForkedChild()) { + // In child process + auto lock = arrow::util::GlobalForkSafeMutex()->Lock(); + if (instance->pid_.load() != getpid()) { + bool thread_was_launched{instance->signal_receiving_thread_}; + // Reinitialize internal structures after fork + instance = std::make_shared(); + if (thread_was_launched) { + instance->SpawnSignalReceivingThread(); + } + } + } +#endif + return instance; + } private: // For readability std::shared_ptr NullSource() { return nullptr; } - void EmptyTrashCan() { std::atomic_store(&trash_can_, NullSource()); } + void SpawnSignalReceivingThread() { + signal_receiving_thread_ = std::make_unique(ReceiveSignals, self_pipe_); + } - static void HandleSignal(int signum) { instance_.DoHandleSignal(signum); } + static void HandleSignal(int signum) { instance()->DoHandleSignal(signum); } void DoHandleSignal(int signum) { // async-signal-safe code only - auto source = std::atomic_load(&stop_source_); - if (source) { - source->RequestStopFromSignal(signum); - // Disable() may have been called in the meantime, but we can't - // deallocate a shared_ptr here, so instead move it to a "trash can". - // This minimizes the possibility of running a deallocator here, - // however it doesn't entirely preclude it. - // - // Possible case: - // - a signal handler (A) starts running, fetches the current source - // - Disable() then Enable() are called, emptying the trash can and - // replacing the current source - // - a signal handler (B) starts running, fetches the current source - // - signal handler A resumes, moves its source (the old source) into - // the trash can (the only remaining reference) - // - signal handler B resumes, moves its source (the current source) - // into the trash can. This triggers deallocation of the old source, - // since the trash can had the only remaining reference to it. - // - // This case should be sufficiently unlikely, but we cannot entirely - // rule it out. The problem might be solved properly with a lock-free - // linked list of StopSources. - std::atomic_store(&trash_can_, std::move(source)); + SelfPipe* self_pipe = self_pipe_ptr_.load(); + if (self_pipe) { + self_pipe->Send(/*payload=*/signum); } ReinstateSignalHandler(signum, &HandleSignal); } - std::shared_ptr stop_source_; - std::shared_ptr trash_can_; + static void ReceiveSignals(std::shared_ptr self_pipe) { + // Wait for signals on the self-pipe and propagate them to the current StopSource + DCHECK(self_pipe); + while (true) { + auto maybe_payload = self_pipe->Wait(); + if (maybe_payload.status().IsInvalid()) { + // Pipe shut down + return; + } + if (!maybe_payload.ok()) { + maybe_payload.status().Warn(); + return; + } + const int signum = static_cast(maybe_payload.ValueUnsafe()); + instance()->ReceiveSignal(signum); + } + } + + void ReceiveSignal(int signum) { + std::lock_guard lock(mutex_); + if (stop_source_) { + stop_source_->RequestStopFromSignal(signum); + } + } + bool InForkedChild() { +#ifndef _WIN32 + return pid_.load() != getpid(); +#else + return false; +#endif + } + + std::mutex mutex_; std::vector saved_handlers_; + std::shared_ptr stop_source_; + std::unique_ptr signal_receiving_thread_; - static SignalStopState instance_; -}; + // For signal handler interaction + std::shared_ptr self_pipe_; + // Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free + // (it is not on libstdc++). + std::atomic self_pipe_ptr_; -SignalStopState SignalStopState::instance_{}; + // For fork safety +#ifndef _WIN32 + std::atomic pid_{getpid()}; +#endif +}; } // namespace diff --git a/cpp/src/arrow/util/cancel.h b/cpp/src/arrow/util/cancel.h index 7fc62271959ed..9ba167674cb48 100644 --- a/cpp/src/arrow/util/cancel.h +++ b/cpp/src/arrow/util/cancel.h @@ -42,6 +42,7 @@ class ARROW_EXPORT StopSource { // Consumer API (the side that stops) void RequestStop(); void RequestStop(Status error); + // Async-signal-safe. TODO Deprecate this? void RequestStopFromSignal(int signum); StopToken token(); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 571b49c1d7f30..0de29572b8cda 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -95,6 +95,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" // For filename conversion #if defined(_WIN32) @@ -1212,6 +1213,10 @@ class SelfPipeImpl : public SelfPipe { // Already closed return ClosedPipe(); } + if (InForkedChild()) { + return Status::Invalid( + "Self-pipe was created in parent process, cannot wait in child"); + } uint64_t payload = 0; char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); @@ -1248,6 +1253,12 @@ class SelfPipeImpl : public SelfPipe { } Status Shutdown() override { + if (InForkedChild()) { + // We're in a forked child process, avoid sending an EOF payload + // that may be received by the parent; just close our file descriptor + // (shared with the parent). + return pipe_.wfd.Close(); + } please_shutdown_.store(true); errno = 0; if (!DoSend(kEofPayload)) { @@ -1265,12 +1276,24 @@ class SelfPipeImpl : public SelfPipe { protected: Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + bool InForkedChild() { +#ifndef _WIN32 + return pid_.load() != getpid(); +#else + return false; +#endif + } + bool DoSend(uint64_t payload) { // This needs to be async-signal safe as it's called from Send() if (pipe_.wfd.closed()) { // Already closed return false; } + if (InForkedChild()) { + // We're in a forked child process, avoid sending payload to parent. + return false; + } const char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); while (buf_size > 0) { @@ -1294,6 +1317,7 @@ class SelfPipeImpl : public SelfPipe { const bool signal_safe_; Pipe pipe_; std::atomic please_shutdown_{false}; + std::atomic pid_{getpid()}; }; #undef PIPE_WRITE From 085c7ad4291c818051f5ce706a25e030bb82c518 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 27 Sep 2022 18:26:34 +0200 Subject: [PATCH 2/8] Fix for Windows --- cpp/src/arrow/util/io_util.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 0de29572b8cda..e7543467ef2ca 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1317,7 +1317,9 @@ class SelfPipeImpl : public SelfPipe { const bool signal_safe_; Pipe pipe_; std::atomic please_shutdown_{false}; +#ifndef _WIN32 std::atomic pid_{getpid()}; +#endif }; #undef PIPE_WRITE From 449a1e2ab488c58ce0a80e7d82162e8423292703 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 27 Sep 2022 19:15:48 +0200 Subject: [PATCH 3/8] Avoid requiring libatomic --- cpp/src/arrow/util/cancel.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index 3143dd6681b7a..a00128f79dab2 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -121,15 +121,17 @@ struct SignalStopState { } if (!self_pipe_) { // Make sure the self-pipe is initialized - if (!std::atomic_is_lock_free(&self_pipe_ptr_)) { - return Status::NotImplemented( - "Cannot setup signal StopSource because atomic pointers are not " - "lock-free on this platform"); - } + // (NOTE: avoid std::atomic_is_lock_free() which may require libatomic) +#if ATOMIC_POINTER_LOCK_FREE != 2 + return Status::NotImplemented( + "Cannot setup signal StopSource because atomic pointers are not " + "lock-free on this platform"); +#else ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); // Spawn thread for receiving signals DCHECK(!signal_receiving_thread_); SpawnSignalReceivingThread(); +#endif } self_pipe_ptr_.store(self_pipe_.get()); for (int signum : signals) { From a204b86a53c43892bd13ac9ff3689ca423a5a10e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 10 Oct 2022 17:47:43 +0200 Subject: [PATCH 4/8] Add fork safety tests --- cpp/src/arrow/util/cancel.cc | 15 ++++++++++++--- cpp/src/arrow/util/cancel_test.cc | 24 ++++++++++++++++++++++++ cpp/src/arrow/util/io_util_test.cc | 28 ++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index a00128f79dab2..9e288c77b67ab 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -192,11 +192,15 @@ struct SignalStopState { stop_source_ = NullSource(); } - static const std::shared_ptr& instance() { + static SignalStopState* instance(bool signal_safe = false) { static auto instance = std::make_shared(); #ifndef _WIN32 if (instance->InForkedChild()) { // In child process + if (signal_safe) { + return nullptr; + } + // Not called from a signal => can reinitialize auto lock = arrow::util::GlobalForkSafeMutex()->Lock(); if (instance->pid_.load() != getpid()) { bool thread_was_launched{instance->signal_receiving_thread_}; @@ -208,7 +212,7 @@ struct SignalStopState { } } #endif - return instance; + return instance.get(); } private: @@ -219,7 +223,12 @@ struct SignalStopState { signal_receiving_thread_ = std::make_unique(ReceiveSignals, self_pipe_); } - static void HandleSignal(int signum) { instance()->DoHandleSignal(signum); } + static void HandleSignal(int signum) { + auto self = instance(/*signal_safe=*/true); + if (self) { + self->DoHandleSignal(signum); + } + } void DoHandleSignal(int signum) { // async-signal-safe code only diff --git a/cpp/src/arrow/util/cancel_test.cc b/cpp/src/arrow/util/cancel_test.cc index bca78034c04df..d505f6b5af887 100644 --- a/cpp/src/arrow/util/cancel_test.cc +++ b/cpp/src/arrow/util/cancel_test.cc @@ -29,6 +29,8 @@ #include #ifndef _WIN32 #include // for setitimer() +#include +#include #endif #include "arrow/testing/gtest_util.h" @@ -238,6 +240,28 @@ TEST_F(SignalCancelTest, RegisterUnregister) { AssertStopRequested(); } +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) +TEST_F(SignalCancelTest, ForkSafety) { + RegisterHandler(); + + auto child_pid = fork(); + if (child_pid == 0) { + // Child: trigger signal + TriggerSignal(); + // Stop source destruction should neither crash not affect parent + std::exit(0); + } else { + // Parent: shouldn't notice signal raised in child + AssertChildExit(child_pid); + AssertStopNotRequested(); + + // Stop source still usable in parent + TriggerSignal(); + AssertStopRequested(); + } +} +#endif + TEST_F(CancelTest, ThreadedPollSuccess) { constexpr int kNumThreads = 10; diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index f4fcc26d07201..652d1530ae17b 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -30,6 +30,7 @@ #ifndef _WIN32 #include +#include #include #endif @@ -446,6 +447,33 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) { ASSERT_OK(ReadStatus()); } +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) +TEST_F(TestSelfPipe, ForkSafety) { + // Self-pipe isn't usable from child, but should neither crash at exit + // nor disrupt parent + self_pipe_->Send(123456789123456789ULL); + + auto child_pid = fork(); + if (child_pid == 0) { + // Child: pipe is unusable + self_pipe_->Send(41ULL); + self_pipe_->Send(42ULL); + ASSERT_RAISES(Invalid, self_pipe_->Wait()); + self_pipe_.reset(); + std::exit(0); + } else { + // Parent: pipe is still usable, data is read correctly + AssertChildExit(child_pid); + StartReading(); + SleepABit(); + self_pipe_->Send(987654321987654321ULL); + + AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); + ASSERT_OK(ReadStatus()); + } +} +#endif + TEST(PlatformFilename, RoundtripAscii) { PlatformFilename fn; ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b")); From 43f18851c43e88e7845de09f4da2aa4bbdfae8c8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2022 15:15:17 +0100 Subject: [PATCH 5/8] Use atfork handler in SelfPipe --- cpp/src/arrow/util/io_util.cc | 63 ++++++++++++++++++------------ cpp/src/arrow/util/io_util_test.cc | 14 ++++--- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index e7543467ef2ca..358e0a88f444b 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -92,6 +92,7 @@ #include "arrow/buffer.h" #include "arrow/result.h" +#include "arrow/util/atfork_internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" @@ -1190,7 +1191,7 @@ namespace { #define PIPE_READ read #endif -class SelfPipeImpl : public SelfPipe { +class SelfPipeImpl : public SelfPipe, public std::enable_shared_from_this { static constexpr uint64_t kEofPayload = 5804561806345822987ULL; public: @@ -1205,6 +1206,28 @@ class SelfPipeImpl : public SelfPipe { // We cannot afford blocking writes in a signal handler RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd())); } + + atfork_handler_ = std::make_shared( + /*before=*/ + [weak_self = std::weak_ptr(shared_from_this())] { + auto self = weak_self.lock(); + if (self) { + self->BeforeFork(); + } + return self; + }, + /*parent_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ParentAfterFork(); + }, + /*child_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ChildAfterFork(); + }); + RegisterAtFork(atfork_handler_); + return Status::OK(); } @@ -1213,10 +1236,6 @@ class SelfPipeImpl : public SelfPipe { // Already closed return ClosedPipe(); } - if (InForkedChild()) { - return Status::Invalid( - "Self-pipe was created in parent process, cannot wait in child"); - } uint64_t payload = 0; char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); @@ -1253,12 +1272,6 @@ class SelfPipeImpl : public SelfPipe { } Status Shutdown() override { - if (InForkedChild()) { - // We're in a forked child process, avoid sending an EOF payload - // that may be received by the parent; just close our file descriptor - // (shared with the parent). - return pipe_.wfd.Close(); - } please_shutdown_.store(true); errno = 0; if (!DoSend(kEofPayload)) { @@ -1274,26 +1287,27 @@ class SelfPipeImpl : public SelfPipe { ~SelfPipeImpl() { ARROW_WARN_NOT_OK(Shutdown(), "On self-pipe destruction"); } protected: - Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + void BeforeFork() {} - bool InForkedChild() { -#ifndef _WIN32 - return pid_.load() != getpid(); -#else - return false; -#endif + void ParentAfterFork() {} + + void ChildAfterFork() { + // Close and recreate pipe, to avoid interfering with parent. + const bool was_closed = pipe_.rfd.closed() || pipe_.wfd.closed(); + ARROW_CHECK_OK(pipe_.Close()); + if (!was_closed) { + ARROW_CHECK_OK(CreatePipe().Value(&pipe_)); + } } + Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + bool DoSend(uint64_t payload) { // This needs to be async-signal safe as it's called from Send() if (pipe_.wfd.closed()) { // Already closed return false; } - if (InForkedChild()) { - // We're in a forked child process, avoid sending payload to parent. - return false; - } const char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); while (buf_size > 0) { @@ -1317,9 +1331,8 @@ class SelfPipeImpl : public SelfPipe { const bool signal_safe_; Pipe pipe_; std::atomic please_shutdown_{false}; -#ifndef _WIN32 - std::atomic pid_{getpid()}; -#endif + + std::shared_ptr atfork_handler_; }; #undef PIPE_WRITE diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 652d1530ae17b..2c15e6f3fae77 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -449,27 +449,29 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) { #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) TEST_F(TestSelfPipe, ForkSafety) { - // Self-pipe isn't usable from child, but should neither crash at exit - // nor disrupt parent self_pipe_->Send(123456789123456789ULL); auto child_pid = fork(); if (child_pid == 0) { - // Child: pipe is unusable + // Child: pipe is reinitialized and usable without interfering with parent self_pipe_->Send(41ULL); + StartReading(); + SleepABit(); self_pipe_->Send(42ULL); - ASSERT_RAISES(Invalid, self_pipe_->Wait()); + AssertPayloadsEventually({41ULL, 42ULL}); + self_pipe_.reset(); std::exit(0); } else { - // Parent: pipe is still usable, data is read correctly - AssertChildExit(child_pid); + // Parent: pipe is usable concurrently with child, data is read correctly StartReading(); SleepABit(); self_pipe_->Send(987654321987654321ULL); AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); ASSERT_OK(ReadStatus()); + + AssertChildExit(child_pid); } } #endif From 4e727fbd268ad1510fc7bd02bfae5c10aa4ef053 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2022 15:59:33 +0100 Subject: [PATCH 6/8] Use atfork handler in signal StopSource --- cpp/src/arrow/util/cancel.cc | 112 +++++++++++++++--------------- cpp/src/arrow/util/cancel_test.cc | 38 +++++++--- 2 files changed, 84 insertions(+), 66 deletions(-) diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index 9e288c77b67ab..3bcabe238e6bc 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -23,13 +23,8 @@ #include #include -#ifndef _WIN32 -// For getpid() -#include -#include -#endif - #include "arrow/result.h" +#include "arrow/util/atfork_internal.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/mutex.h" @@ -41,6 +36,7 @@ namespace arrow { #error Lock-free atomic int required for signal safety #endif +using internal::AtForkHandler; using internal::ReinstateSignalHandler; using internal::SelfPipe; using internal::SetSignalHandler; @@ -108,12 +104,37 @@ Status StopToken::Poll() const { namespace { -struct SignalStopState { +struct SignalStopState : public std::enable_shared_from_this { struct SavedSignalHandler { int signum; SignalHandler handler; }; + // NOTE: shared_from_this() doesn't work from constructor + void Init() { + // XXX this pattern appears in several places, factor it out? + atfork_handler_ = std::make_shared( + /*before=*/ + [weak_self = std::weak_ptr(shared_from_this())] { + auto self = weak_self.lock(); + if (self) { + self->BeforeFork(); + } + return self; + }, + /*parent_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ParentAfterFork(); + }, + /*child_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ChildAfterFork(); + }); + RegisterAtFork(atfork_handler_); + } + Status RegisterHandlers(const std::vector& signals) { std::lock_guard lock(mutex_); if (!saved_handlers_.empty()) { @@ -152,22 +173,17 @@ struct SignalStopState { } ~SignalStopState() { + atfork_handler_.reset(); UnregisterHandlers(); Disable(); if (signal_receiving_thread_) { - if (InForkedChild()) { - // std::thread is basically unusuable in a forked child, even the - // destructor can crash. - signal_receiving_thread_.release(); + // Tell the receiving thread to stop + auto st = self_pipe_->Shutdown(); + ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe"); + if (st.ok()) { + signal_receiving_thread_->join(); } else { - // Tell the receiving thread to stop - auto st = self_pipe_->Shutdown(); - ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe"); - if (st.ok()) { - signal_receiving_thread_->join(); - } else { - signal_receiving_thread_->detach(); - } + signal_receiving_thread_->detach(); } } } @@ -189,42 +205,25 @@ struct SignalStopState { void Disable() { std::lock_guard lock(mutex_); - stop_source_ = NullSource(); + stop_source_.reset(); } - static SignalStopState* instance(bool signal_safe = false) { - static auto instance = std::make_shared(); -#ifndef _WIN32 - if (instance->InForkedChild()) { - // In child process - if (signal_safe) { - return nullptr; - } - // Not called from a signal => can reinitialize - auto lock = arrow::util::GlobalForkSafeMutex()->Lock(); - if (instance->pid_.load() != getpid()) { - bool thread_was_launched{instance->signal_receiving_thread_}; - // Reinitialize internal structures after fork - instance = std::make_shared(); - if (thread_was_launched) { - instance->SpawnSignalReceivingThread(); - } - } - } -#endif + static SignalStopState* instance() { + static std::shared_ptr instance = []() { + auto ptr = std::make_shared(); + ptr->Init(); + return ptr; + }(); return instance.get(); } private: - // For readability - std::shared_ptr NullSource() { return nullptr; } - void SpawnSignalReceivingThread() { signal_receiving_thread_ = std::make_unique(ReceiveSignals, self_pipe_); } static void HandleSignal(int signum) { - auto self = instance(/*signal_safe=*/true); + auto self = instance(); if (self) { self->DoHandleSignal(signum); } @@ -264,29 +263,32 @@ struct SignalStopState { } } - bool InForkedChild() { -#ifndef _WIN32 - return pid_.load() != getpid(); -#else - return false; -#endif + // At-fork handlers + + void BeforeFork() { mutex_.lock(); } + + void ParentAfterFork() { mutex_.unlock(); } + + void ChildAfterFork() { + new (&mutex_) std::mutex; + if (signal_receiving_thread_) { + // Leak previous thread and re-spawn + ARROW_UNUSED(signal_receiving_thread_.release()); + SpawnSignalReceivingThread(); + } } std::mutex mutex_; std::vector saved_handlers_; std::shared_ptr stop_source_; std::unique_ptr signal_receiving_thread_; + std::shared_ptr atfork_handler_; // For signal handler interaction std::shared_ptr self_pipe_; // Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free // (it is not on libstdc++). std::atomic self_pipe_ptr_; - - // For fork safety -#ifndef _WIN32 - std::atomic pid_{getpid()}; -#endif }; } // namespace diff --git a/cpp/src/arrow/util/cancel_test.cc b/cpp/src/arrow/util/cancel_test.cc index d505f6b5af887..76fb8d0b09123 100644 --- a/cpp/src/arrow/util/cancel_test.cc +++ b/cpp/src/arrow/util/cancel_test.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -203,6 +204,23 @@ class SignalCancelTest : public CancelTest { ASSERT_EQ(internal::SignalFromStatus(st), expected_signal_); } +#ifndef _WIN32 + void RunInChild(std::function func) { + auto child_pid = fork(); + if (child_pid == -1) { + ASSERT_OK(internal::IOErrorFromErrno(errno, "Error calling fork(): ")); + } + if (child_pid == 0) { + // Child + ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process"; + std::exit(0); + } else { + // Parent + AssertChildExit(child_pid); + } + } +#endif + protected: #ifdef _WIN32 const int expected_signal_ = SIGINT; @@ -244,21 +262,19 @@ TEST_F(SignalCancelTest, RegisterUnregister) { TEST_F(SignalCancelTest, ForkSafety) { RegisterHandler(); - auto child_pid = fork(); - if (child_pid == 0) { + RunInChild([&]() { // Child: trigger signal - TriggerSignal(); - // Stop source destruction should neither crash not affect parent - std::exit(0); - } else { - // Parent: shouldn't notice signal raised in child - AssertChildExit(child_pid); AssertStopNotRequested(); - - // Stop source still usable in parent TriggerSignal(); AssertStopRequested(); - } + }); + + // Parent: shouldn't notice signals raised in child + AssertStopNotRequested(); + + // Stop source still usable in parent + TriggerSignal(); + AssertStopRequested(); } #endif From ea5ecfa20f88b3145dd04cc58f4bc252c3be6000 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 15 Nov 2022 13:06:43 +0100 Subject: [PATCH 7/8] Fix fork+exec with Python subprocess --- cpp/src/arrow/util/cancel.cc | 18 ++++++++++------ cpp/src/arrow/util/cancel.h | 4 ++++ cpp/src/arrow/util/cancel_test.cc | 34 +++++++++++++++++++++++++++--- cpp/src/arrow/util/io_util.cc | 34 ++++++++++++++++++++++++++---- cpp/src/arrow/util/io_util_test.cc | 3 ++- 5 files changed, 78 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/util/cancel.cc b/cpp/src/arrow/util/cancel.cc index 3bcabe238e6bc..2648059af81ee 100644 --- a/cpp/src/arrow/util/cancel.cc +++ b/cpp/src/arrow/util/cancel.cc @@ -149,10 +149,11 @@ struct SignalStopState : public std::enable_shared_from_this { "lock-free on this platform"); #else ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); +#endif + } + if (!signal_receiving_thread_) { // Spawn thread for receiving signals - DCHECK(!signal_receiving_thread_); SpawnSignalReceivingThread(); -#endif } self_pipe_ptr_.store(self_pipe_.get()); for (int signum : signals) { @@ -271,11 +272,14 @@ struct SignalStopState : public std::enable_shared_from_this { void ChildAfterFork() { new (&mutex_) std::mutex; - if (signal_receiving_thread_) { - // Leak previous thread and re-spawn - ARROW_UNUSED(signal_receiving_thread_.release()); - SpawnSignalReceivingThread(); - } + // Leak previous thread, as it has become invalid. + // We can't spawn a new one here as it would have unfortunate side effects; + // especially in the frequent context of a fork+exec. + // (for example the Python subprocess module closes all fds before calling exec) + ARROW_UNUSED(signal_receiving_thread_.release()); + // Make internal state consistent: with no listening thread, we shouldn't + // feed the self-pipe from the signal handler. + UnregisterHandlers(); } std::mutex mutex_; diff --git a/cpp/src/arrow/util/cancel.h b/cpp/src/arrow/util/cancel.h index 9ba167674cb48..f0d704b2ce086 100644 --- a/cpp/src/arrow/util/cancel.h +++ b/cpp/src/arrow/util/cancel.h @@ -104,6 +104,10 @@ ARROW_EXPORT void ResetSignalStopSource(); /// EXPERIMENTAL: Register signal handler triggering the signal-receiving StopSource +/// +/// Note that those handlers are automatically un-registered in a fork()ed process, +/// therefore the child process will need to call RegisterCancellingSignalHandler() +/// if desired. ARROW_EXPORT Status RegisterCancellingSignalHandler(const std::vector& signals); diff --git a/cpp/src/arrow/util/cancel_test.cc b/cpp/src/arrow/util/cancel_test.cc index 76fb8d0b09123..45f6cde4f5579 100644 --- a/cpp/src/arrow/util/cancel_test.cc +++ b/cpp/src/arrow/util/cancel_test.cc @@ -258,13 +258,41 @@ TEST_F(SignalCancelTest, RegisterUnregister) { AssertStopRequested(); } -#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) -TEST_F(SignalCancelTest, ForkSafety) { +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ + defined(THREAD_SANITIZER)) +TEST_F(SignalCancelTest, ForkSafetyUnregisteredHandlers) { + RunInChild([&]() { + // Child + TriggerSignal(); + AssertStopNotRequested(); + + RegisterHandler(); + TriggerSignal(); + AssertStopRequested(); + }); + + // Parent: shouldn't notice signals raised in child + AssertStopNotRequested(); + + // Stop source still usable in parent + TriggerSignal(); + AssertStopNotRequested(); + + RegisterHandler(); + TriggerSignal(); + AssertStopRequested(); +} + +TEST_F(SignalCancelTest, ForkSafetyRegisteredHandlers) { RegisterHandler(); RunInChild([&]() { - // Child: trigger signal + // Child: signal handlers are unregistered and need to be re-registered + TriggerSignal(); AssertStopNotRequested(); + + // Can re-register and receive signals + RegisterHandler(); TriggerSignal(); AssertStopRequested(); }); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 358e0a88f444b..379343cd5b316 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1150,19 +1150,45 @@ Result FileTell(int fd) { } Result CreatePipe() { - int ret; + bool ok; int fds[2]; + Pipe pipe; #if defined(_WIN32) ret = _pipe(fds, 4096, _O_BINARY); + ok = _pipe(fds, 4096, _O_BINARY) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + } +#elif defined(__linux__) && defined(__GLIBC__) + // On Unix, we don't want the file descriptors to survive after an exec() call + ok = pipe2(fds, O_CLOEXEC) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + } #else - ret = ::pipe(fds); + auto set_cloexec = [](int fd) -> bool { + int ret = fcntl(fd, F_GETFD); + if (ret >= 0) { + ret = fcntl(fd, F_SETFD, ret | FD_CLOEXEC); + } + return ret >= 0; + }; + + ok = ::pipe(fds) >= 0; + if (ok) { + pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + ok &= set_cloexec(fds[0]); + if (ok) { + ok &= set_cloexec(fds[1]); + } + } #endif - if (ret == -1) { + if (!ok) { return IOErrorFromErrno(errno, "Error creating pipe"); } - return Pipe{FileDescriptor(fds[0]), FileDescriptor(fds[1])}; + return pipe; } Status SetPipeFileDescriptorNonBlocking(int fd) { diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 2c15e6f3fae77..bb5113440bb21 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -447,7 +447,8 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) { ASSERT_OK(ReadStatus()); } -#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) +#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \ + defined(THREAD_SANITIZER)) TEST_F(TestSelfPipe, ForkSafety) { self_pipe_->Send(123456789123456789ULL); From 17b576d67ef46a65c138b8b06f248c9c903c2621 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 15 Nov 2022 13:25:22 +0100 Subject: [PATCH 8/8] Fix compilation failure on Windows --- cpp/src/arrow/util/io_util.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 379343cd5b316..1f117de7b2cf5 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1155,7 +1155,6 @@ Result CreatePipe() { Pipe pipe; #if defined(_WIN32) - ret = _pipe(fds, 4096, _O_BINARY); ok = _pipe(fds, 4096, _O_BINARY) >= 0; if (ok) { pipe = {FileDescriptor(fds[0]), FileDescriptor(fds[1])}; @@ -1168,11 +1167,11 @@ Result CreatePipe() { } #else auto set_cloexec = [](int fd) -> bool { - int ret = fcntl(fd, F_GETFD); - if (ret >= 0) { - ret = fcntl(fd, F_SETFD, ret | FD_CLOEXEC); + int flags = fcntl(fd, F_GETFD); + if (flags >= 0) { + flags = fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } - return ret >= 0; + return flags >= 0; }; ok = ::pipe(fds) >= 0;