From 260b4e690db64f1613bb850883e5836e2df99617 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 14 Nov 2022 15:15:17 +0100 Subject: [PATCH] Use atfork handler in SelfPipe --- cpp/src/arrow/util/io_util.cc | 63 ++++++++++++++++++------------ cpp/src/arrow/util/io_util_test.cc | 14 ++++--- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index e7543467ef2ca..358e0a88f444b 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -92,6 +92,7 @@ #include "arrow/buffer.h" #include "arrow/result.h" +#include "arrow/util/atfork_internal.h" #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" @@ -1190,7 +1191,7 @@ namespace { #define PIPE_READ read #endif -class SelfPipeImpl : public SelfPipe { +class SelfPipeImpl : public SelfPipe, public std::enable_shared_from_this { static constexpr uint64_t kEofPayload = 5804561806345822987ULL; public: @@ -1205,6 +1206,28 @@ class SelfPipeImpl : public SelfPipe { // We cannot afford blocking writes in a signal handler RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd())); } + + atfork_handler_ = std::make_shared( + /*before=*/ + [weak_self = std::weak_ptr(shared_from_this())] { + auto self = weak_self.lock(); + if (self) { + self->BeforeFork(); + } + return self; + }, + /*parent_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ParentAfterFork(); + }, + /*child_after=*/ + [](std::any token) { + auto self = std::any_cast>(std::move(token)); + self->ChildAfterFork(); + }); + RegisterAtFork(atfork_handler_); + return Status::OK(); } @@ -1213,10 +1236,6 @@ class SelfPipeImpl : public SelfPipe { // Already closed return ClosedPipe(); } - if (InForkedChild()) { - return Status::Invalid( - "Self-pipe was created in parent process, cannot wait in child"); - } uint64_t payload = 0; char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); @@ -1253,12 +1272,6 @@ class SelfPipeImpl : public SelfPipe { } Status Shutdown() override { - if (InForkedChild()) { - // We're in a forked child process, avoid sending an EOF payload - // that may be received by the parent; just close our file descriptor - // (shared with the parent). - return pipe_.wfd.Close(); - } please_shutdown_.store(true); errno = 0; if (!DoSend(kEofPayload)) { @@ -1274,26 +1287,27 @@ class SelfPipeImpl : public SelfPipe { ~SelfPipeImpl() { ARROW_WARN_NOT_OK(Shutdown(), "On self-pipe destruction"); } protected: - Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + void BeforeFork() {} - bool InForkedChild() { -#ifndef _WIN32 - return pid_.load() != getpid(); -#else - return false; -#endif + void ParentAfterFork() {} + + void ChildAfterFork() { + // Close and recreate pipe, to avoid interfering with parent. + const bool was_closed = pipe_.rfd.closed() || pipe_.wfd.closed(); + ARROW_CHECK_OK(pipe_.Close()); + if (!was_closed) { + ARROW_CHECK_OK(CreatePipe().Value(&pipe_)); + } } + Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + bool DoSend(uint64_t payload) { // This needs to be async-signal safe as it's called from Send() if (pipe_.wfd.closed()) { // Already closed return false; } - if (InForkedChild()) { - // We're in a forked child process, avoid sending payload to parent. - return false; - } const char* buf = reinterpret_cast(&payload); auto buf_size = static_cast(sizeof(payload)); while (buf_size > 0) { @@ -1317,9 +1331,8 @@ class SelfPipeImpl : public SelfPipe { const bool signal_safe_; Pipe pipe_; std::atomic please_shutdown_{false}; -#ifndef _WIN32 - std::atomic pid_{getpid()}; -#endif + + std::shared_ptr atfork_handler_; }; #undef PIPE_WRITE diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 652d1530ae17b..2c15e6f3fae77 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -449,27 +449,29 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) { #if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)) TEST_F(TestSelfPipe, ForkSafety) { - // Self-pipe isn't usable from child, but should neither crash at exit - // nor disrupt parent self_pipe_->Send(123456789123456789ULL); auto child_pid = fork(); if (child_pid == 0) { - // Child: pipe is unusable + // Child: pipe is reinitialized and usable without interfering with parent self_pipe_->Send(41ULL); + StartReading(); + SleepABit(); self_pipe_->Send(42ULL); - ASSERT_RAISES(Invalid, self_pipe_->Wait()); + AssertPayloadsEventually({41ULL, 42ULL}); + self_pipe_.reset(); std::exit(0); } else { - // Parent: pipe is still usable, data is read correctly - AssertChildExit(child_pid); + // Parent: pipe is usable concurrently with child, data is read correctly StartReading(); SleepABit(); self_pipe_->Send(987654321987654321ULL); AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); ASSERT_OK(ReadStatus()); + + AssertChildExit(child_pid); } } #endif