Skip to content

Commit

Permalink
Use atfork handler in SelfPipe
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Nov 14, 2022
1 parent 42f171d commit 260b4e6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
63 changes: 38 additions & 25 deletions cpp/src/arrow/util/io_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@

#include "arrow/buffer.h"
#include "arrow/result.h"
#include "arrow/util/atfork_internal.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -1190,7 +1191,7 @@ namespace {
#define PIPE_READ read
#endif

class SelfPipeImpl : public SelfPipe {
class SelfPipeImpl : public SelfPipe, public std::enable_shared_from_this<SelfPipeImpl> {
static constexpr uint64_t kEofPayload = 5804561806345822987ULL;

public:
Expand All @@ -1205,6 +1206,28 @@ class SelfPipeImpl : public SelfPipe {
// We cannot afford blocking writes in a signal handler
RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd()));
}

atfork_handler_ = std::make_shared<AtForkHandler>(
/*before=*/
[weak_self = std::weak_ptr<SelfPipeImpl>(shared_from_this())] {
auto self = weak_self.lock();
if (self) {
self->BeforeFork();
}
return self;
},
/*parent_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SelfPipeImpl>>(std::move(token));
self->ParentAfterFork();
},
/*child_after=*/
[](std::any token) {
auto self = std::any_cast<std::shared_ptr<SelfPipeImpl>>(std::move(token));
self->ChildAfterFork();
});
RegisterAtFork(atfork_handler_);

return Status::OK();
}

Expand All @@ -1213,10 +1236,6 @@ class SelfPipeImpl : public SelfPipe {
// Already closed
return ClosedPipe();
}
if (InForkedChild()) {
return Status::Invalid(
"Self-pipe was created in parent process, cannot wait in child");
}
uint64_t payload = 0;
char* buf = reinterpret_cast<char*>(&payload);
auto buf_size = static_cast<int64_t>(sizeof(payload));
Expand Down Expand Up @@ -1253,12 +1272,6 @@ class SelfPipeImpl : public SelfPipe {
}

Status Shutdown() override {
if (InForkedChild()) {
// We're in a forked child process, avoid sending an EOF payload
// that may be received by the parent; just close our file descriptor
// (shared with the parent).
return pipe_.wfd.Close();
}
please_shutdown_.store(true);
errno = 0;
if (!DoSend(kEofPayload)) {
Expand All @@ -1274,26 +1287,27 @@ class SelfPipeImpl : public SelfPipe {
~SelfPipeImpl() { ARROW_WARN_NOT_OK(Shutdown(), "On self-pipe destruction"); }

protected:
Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); }
void BeforeFork() {}

bool InForkedChild() {
#ifndef _WIN32
return pid_.load() != getpid();
#else
return false;
#endif
void ParentAfterFork() {}

void ChildAfterFork() {
// Close and recreate pipe, to avoid interfering with parent.
const bool was_closed = pipe_.rfd.closed() || pipe_.wfd.closed();
ARROW_CHECK_OK(pipe_.Close());
if (!was_closed) {
ARROW_CHECK_OK(CreatePipe().Value(&pipe_));
}
}

Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); }

bool DoSend(uint64_t payload) {
// This needs to be async-signal safe as it's called from Send()
if (pipe_.wfd.closed()) {
// Already closed
return false;
}
if (InForkedChild()) {
// We're in a forked child process, avoid sending payload to parent.
return false;
}
const char* buf = reinterpret_cast<const char*>(&payload);
auto buf_size = static_cast<int64_t>(sizeof(payload));
while (buf_size > 0) {
Expand All @@ -1317,9 +1331,8 @@ class SelfPipeImpl : public SelfPipe {
const bool signal_safe_;
Pipe pipe_;
std::atomic<bool> please_shutdown_{false};
#ifndef _WIN32
std::atomic<pid_t> pid_{getpid()};
#endif

std::shared_ptr<AtForkHandler> atfork_handler_;
};

#undef PIPE_WRITE
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/util/io_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,27 +449,29 @@ TEST_F(TestSelfPipe, SendFromSignalAndWait) {

#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER))
TEST_F(TestSelfPipe, ForkSafety) {
// Self-pipe isn't usable from child, but should neither crash at exit
// nor disrupt parent
self_pipe_->Send(123456789123456789ULL);

auto child_pid = fork();
if (child_pid == 0) {
// Child: pipe is unusable
// Child: pipe is reinitialized and usable without interfering with parent
self_pipe_->Send(41ULL);
StartReading();
SleepABit();
self_pipe_->Send(42ULL);
ASSERT_RAISES(Invalid, self_pipe_->Wait());
AssertPayloadsEventually({41ULL, 42ULL});

self_pipe_.reset();
std::exit(0);
} else {
// Parent: pipe is still usable, data is read correctly
AssertChildExit(child_pid);
// Parent: pipe is usable concurrently with child, data is read correctly
StartReading();
SleepABit();
self_pipe_->Send(987654321987654321ULL);

AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL});
ASSERT_OK(ReadStatus());

AssertChildExit(child_pid);
}
}
#endif
Expand Down

0 comments on commit 260b4e6

Please sign in to comment.