Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Integrate scoped dup2 #51179

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ ray_cc_library(
hdrs = ["stream_redirection_utils.h"],
deps = [
":pipe_logger",
":scoped_dup2_wrapper",
":stream_redirection_options",
":util",
],
Expand Down
55 changes: 12 additions & 43 deletions src/ray/util/stream_redirection_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#include <cstring>
#include <functional>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include "ray/util/compat.h"
#include "ray/util/pipe_logger.h"
#include "ray/util/scoped_dup2_wrapper.h"
#include "ray/util/util.h"

#if defined(_WIN32)
Expand All @@ -36,72 +38,39 @@ namespace {
struct RedirectionHandleWrapper {
RedirectionFileHandle redirection_file_handle;
// Used for restoration.
MEMFD_TYPE_NON_UNIQUE saved_stream_handle;
std::unique_ptr<ScopedDup2Wrapper> scoped_dup2_wrapper;
};

// TODO(hjiang): Revisit later, should be able to save some heap allocation with
// absl::InlinedVector.
//
// Maps from original stream file handle (i.e. stdout/stderr) to its stream redirector.
absl::flat_hash_map<int, RedirectionHandleWrapper> redirection_file_handles;
absl::flat_hash_map<MEMFD_TYPE_NON_UNIQUE, RedirectionHandleWrapper>
redirection_file_handles;

// Block synchronize on stream redirection related completion, should be call **EXACTLY
// ONCE** at program termination.
std::once_flag stream_exit_once_flag;
void SyncOnStreamRedirection() {
for (auto &[stream_fd, handle] : redirection_file_handles) {
// Restore old stream fd.
#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_NE(dup2(handle.saved_stream_handle, stream_fd), -1)
<< "Fails to restore file descriptor " << strerror(errno);
#elif defined(_WIN32)
int duped_fd = _open_osfhandle(reinterpret_cast<intptr_t>(handle.saved_stream_handle),
_O_WRONLY);
RAY_CHECK_NE(_dup2(duped_fd, stream_fd), -1) << "Fails to duplicate file descriptor.";
#endif

for (auto &[_, handle] : redirection_file_handles) {
handle.scoped_dup2_wrapper = nullptr;
handle.redirection_file_handle.Close();
}
}

// Redirect the given [stream_fd] based on the specified option.
void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
void RedirectStream(MEMFD_TYPE_NON_UNIQUE stream_fd, const StreamRedirectionOption &opt) {
std::call_once(stream_exit_once_flag, []() {
RAY_CHECK_EQ(std::atexit(SyncOnStreamRedirection), 0)
<< "Fails to register stream redirection termination hook.";
});

RedirectionFileHandle handle = CreateRedirectionFileHandle(opt);

#if defined(__APPLE__) || defined(__linux__)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd = dup(stream_fd);
RAY_CHECK_NE(duped_stream_fd, -1)
<< "Fails to duplicate stream fd " << stream_fd << " because " << strerror(errno);

RAY_CHECK_NE(dup2(handle.GetWriteHandle(), stream_fd), -1)
<< "Fails to duplicate file descriptor " << strerror(errno);
#elif defined(_WIN32)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd;
BOOL result = DuplicateHandle(GetCurrentProcess(),
(HANDLE)_get_osfhandle(stream_fd),
GetCurrentProcess(),
&duped_stream_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result);

int pipe_write_fd =
_open_osfhandle(reinterpret_cast<intptr_t>(handle.GetWriteHandle()), _O_WRONLY);
RAY_CHECK_NE(_dup2(pipe_write_fd, stream_fd), -1)
<< "Fails to duplicate file descriptor.";
#endif
auto scoped_dup2_wrapper = ScopedDup2Wrapper::New(handle.GetWriteHandle(), stream_fd);

RedirectionHandleWrapper handle_wrapper;
handle_wrapper.redirection_file_handle = std::move(handle);
handle_wrapper.saved_stream_handle = duped_stream_fd;
handle_wrapper.scoped_dup2_wrapper = std::move(scoped_dup2_wrapper);

const bool is_new =
redirection_file_handles.emplace(stream_fd, std::move(handle_wrapper)).second;
Expand All @@ -118,10 +87,10 @@ void FlushOnRedirectedStream(int stream_fd) {
} // namespace

void RedirectStdout(const StreamRedirectionOption &opt) {
RedirectStream(GetStdoutFd(), opt);
RedirectStream(GetStdoutHandle(), opt);
}
void RedirectStderr(const StreamRedirectionOption &opt) {
RedirectStream(GetStderrFd(), opt);
RedirectStream(GetStderrHandle(), opt);
}
void FlushOnRedirectedStdout() { FlushOnRedirectedStream(GetStdoutFd()); }
void FlushOnRedirectedStderr() { FlushOnRedirectedStream(GetStderrFd()); }
Expand Down