From 9c74e51c70bc9271f9193908e6536f53849e6c8c Mon Sep 17 00:00:00 2001 From: "Matthew \"strager\" Glazar" Date: Sat, 17 Apr 2021 22:05:34 -0700 Subject: [PATCH] @@@ LSP: Prevent deadlock with synchronous client @@@ winders @@@ perf. lots of copying. sadge. =[ @@@ test LSP server manually If an LSP client sends a bunch of requests, but does not read from quick-lint-js' stdout, quick-lint-js can hang in pipe_reader::write. (This came up when writing some benchmarks. My LSP client naively made a bunch of requests without reading responses during the requests.) Prevent this (unlikely) deadlock by switching stdout to non-blocking mode, buffering data in memory, and resuming reading from stdin as soon as possible. The current implementation is relatively inefficient. Data is copied many times. Maybe we'll clean this up in the future. --- src/CMakeLists.txt | 11 +++- src/file-handle.cpp | 80 +++++++++++++++++++++++ src/lsp-pipe-writer.cpp | 82 +++++++++++++++++++++++- src/quick-lint-js/file-handle.h | 14 +++++ src/quick-lint-js/have.h | 8 +++ src/quick-lint-js/lsp-pipe-writer.h | 34 +++++++++- test/test-lsp-pipe-writer.cpp | 98 ++++++++--------------------- 7 files changed, 253 insertions(+), 74 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3bac66a9dd..722aa0a794 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,8 @@ option( FALSE ) +find_package(Threads REQUIRED) + quick_lint_js_add_executable( quick-lint-js main.cpp @@ -129,7 +131,14 @@ quick_lint_js_add_library( wasm-demo-location.cpp ) target_include_directories(quick-lint-js-lib PUBLIC .) -target_link_libraries(quick-lint-js-lib PUBLIC boost_container simdjson) +target_link_libraries( + quick-lint-js-lib + PUBLIC + boost_container + simdjson + PRIVATE + Threads::Threads +) if (${CMAKE_VERSION} VERSION_GREATER_EQUAL 3.17.3) target_precompile_headers( quick-lint-js-lib diff --git a/src/file-handle.cpp b/src/file-handle.cpp index b59cca4c1b..c48a56ce61 100644 --- a/src/file-handle.cpp +++ b/src/file-handle.cpp @@ -1,6 +1,7 @@ // Copyright (C) 2020 Matthew Glazar // See end of file for extended copyright information. +#include #include #include #include @@ -22,6 +23,10 @@ #include #endif +#if QLJS_HAVE_POLL +#include +#endif + #if QLJS_HAVE_UNISTD_H #include #endif @@ -83,6 +88,36 @@ std::optional windows_handle_file_ref::write(const void *buffer, return narrow_cast(write_size); } +bool windows_handle_file_ref::is_pipe_non_blocking() { + DWORD state; + BOOL ok = ::GetNamedPipeHandleStateA(this->get(), + /*lpState=*/&state, + /*lpCurInstances=*/nullptr, + /*lpMaxCollectionCount=*/nullptr, + /*lpCollectDataTimeout=*/nullptr, + /*lpUserName=*/nullptr, + /*nMaxUserNameSize=*/0); + if (!ok) { + QLJS_UNIMPLEMENTED(); + } + return (state & PIPE_NOWAIT) == PIPE_NOWAIT; +} + +void windows_handle_file_ref::set_pipe_non_blocking() { + DWORD mode = PIPE_READMODE_BYTE | PIPE_NOWAIT; + BOOL ok = ::SetNamedPipeHandleState(this->get(), /*lpMode=*/&mode, + /*lpMaxCollectionCount=*/nullptr, + /*lpCollectDataTimeout=*/nullptr); + if (!ok) { + QLJS_UNIMPLEMENTED(); + } +} + +void windows_handle_file_ref::block_until_pipe_is_writeable_or_broken() { + // @@@ we should block yo. but how? + // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes +} + std::string windows_handle_file_ref::get_last_error_message() { return windows_error_message(::GetLastError()); } @@ -141,6 +176,51 @@ std::optional posix_fd_file_ref::write(const void *buffer, return narrow_cast(written_size); } +bool posix_fd_file_ref::is_pipe_non_blocking() { +#if QLJS_HAVE_FCNTL_H + int rc = ::fcntl(this->get(), F_GETFL, O_NONBLOCK); + if (rc == -1) { + QLJS_UNIMPLEMENTED(); + } + return rc != 0; +#else +#error "Unsupported platform" +#endif +} + +void posix_fd_file_ref::set_pipe_non_blocking() { +#if QLJS_HAVE_FCNTL_H + int rc = ::fcntl(this->get(), F_SETFL, O_NONBLOCK); + if (rc != 0) { + QLJS_UNIMPLEMENTED(); + } +#else +#error "Unsupported platform" +#endif +} + +void posix_fd_file_ref::block_until_pipe_is_writeable_or_broken() { +#if QLJS_HAVE_POLL +retry: + std::array<::pollfd, 1> fds; + fds[0].fd = this->get(); + fds[0].events = POLLOUT; + int rc = + ::poll(fds.data(), narrow_cast<::nfds_t>(fds.size()), /*timeout=*/-1); + if (rc == -1) { + if (errno == EAGAIN || errno == EINTR) { + goto retry; + } + QLJS_UNIMPLEMENTED(); + } + QLJS_ASSERT(rc != 0); // Shouldn't time out. + QLJS_ASSERT(rc == 1); + QLJS_ASSERT((fds[0].revents & (POLLOUT | POLLERR | POLLHUP)) != 0); +#else +#error "Unsupported platform" +#endif +} + std::string posix_fd_file_ref::get_last_error_message() { return std::strerror(errno); } diff --git a/src/lsp-pipe-writer.cpp b/src/lsp-pipe-writer.cpp index 61298032c1..adf1a5fcf5 100644 --- a/src/lsp-pipe-writer.cpp +++ b/src/lsp-pipe-writer.cpp @@ -2,16 +2,30 @@ // See end of file for extended copyright information. #include +#include +#include #include #include +#include #include #include #include #include #include +#include namespace quick_lint_js { -lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {} +lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) { + QLJS_ASSERT(this->pipe_.is_pipe_non_blocking()); +} + +lsp_pipe_writer::~lsp_pipe_writer() { + if (this->flushing_thread_.joinable()) { + this->stop_ = true; + this->data_is_pending_.notify_one(); + this->flushing_thread_.join(); + } +} void lsp_pipe_writer::send_message(const byte_buffer& message) { this->write(u8"Content-Length: "); @@ -25,6 +39,12 @@ void lsp_pipe_writer::send_message(const byte_buffer& message) { this->write(message_string); } +void lsp_pipe_writer::flush() { + std::unique_lock lock(this->mutex_); + QLJS_ASSERT(!this->stop_); + this->data_is_flushed_.wait(lock, [this] { return this->pending_.empty(); }); +} + template void lsp_pipe_writer::write_integer(T value) { std::array> buffer; @@ -34,14 +54,74 @@ void lsp_pipe_writer::write_integer(T value) { } void lsp_pipe_writer::write(string8_view message) { + std::unique_lock lock(this->mutex_); + QLJS_ASSERT(!this->stop_); + if (this->pending_.empty()) { + // The current thread has control over this->pipe_. + string8_view unwritten = this->write_as_much_as_possible_now(message); + if (!unwritten.empty()) { + // TODO(strager): Avoid copying. + this->pending_.append_copy(unwritten); + lock.unlock(); + this->start_flushing_thread_if_needed(); + } + } else { + // The flushing thread has control over this->pipe_. + this->pending_.append_copy(message); + } +} + +string8_view lsp_pipe_writer::write_as_much_as_possible_now( + string8_view message) { while (!message.empty()) { std::optional bytes_written = this->pipe_.write(message.data(), narrow_cast(message.size())); if (!bytes_written.has_value()) { + if (errno == EAGAIN) { + break; + } QLJS_UNIMPLEMENTED(); } message = message.substr(narrow_cast(*bytes_written)); } + return message; +} + +void lsp_pipe_writer::start_flushing_thread_if_needed() { + if (!this->flushing_thread_.joinable()) { + this->flushing_thread_ = + std::thread([this] { this->run_flushing_thread(); }); + } +} + +void lsp_pipe_writer::run_flushing_thread() { + std::unique_lock lock(this->mutex_); + for (;;) { + this->data_is_pending_.wait( + lock, [this] { return this->stop_ || !this->pending_.empty(); }); + if (this->stop_) { + break; + } + QLJS_ASSERT(!this->pending_.empty()); + + // TODO(strager): Don't copy. Write all the chunks with writev if possible. + string8 message_string; + message_string.resize(this->pending_.size()); + this->pending_.copy_to(message_string.data()); + this->pending_ = byte_buffer(); + string8_view unwritten = + this->write_as_much_as_possible_now(message_string); + // TODO(strager): Avoid copying. + this->pending_.append_copy(unwritten); + + if (this->pending_.empty()) { + this->data_is_flushed_.notify_one(); + } else { + lock.unlock(); + this->pipe_.block_until_pipe_is_writeable_or_broken(); + lock.lock(); + } + } } } diff --git a/src/quick-lint-js/file-handle.h b/src/quick-lint-js/file-handle.h index 6f04aa6182..39e9ce8b4f 100644 --- a/src/quick-lint-js/file-handle.h +++ b/src/quick-lint-js/file-handle.h @@ -54,6 +54,10 @@ class windows_handle_file_ref { file_read_result read(void *buffer, int buffer_size) noexcept; std::optional write(const void *buffer, int buffer_size) noexcept; + bool is_pipe_non_blocking(); + void set_pipe_non_blocking(); + void block_until_pipe_is_writeable_or_broken(); + static std::string get_last_error_message(); protected: @@ -74,9 +78,12 @@ class windows_handle_file : private windows_handle_file_ref { windows_handle_file_ref ref() noexcept; + using windows_handle_file_ref::block_until_pipe_is_writeable_or_broken; using windows_handle_file_ref::get; using windows_handle_file_ref::get_last_error_message; + using windows_handle_file_ref::is_pipe_non_blocking; using windows_handle_file_ref::read; + using windows_handle_file_ref::set_pipe_non_blocking; using windows_handle_file_ref::write; private: @@ -95,6 +102,10 @@ class posix_fd_file_ref { file_read_result read(void *buffer, int buffer_size) noexcept; std::optional write(const void *buffer, int buffer_size) noexcept; + bool is_pipe_non_blocking(); + void set_pipe_non_blocking(); + void block_until_pipe_is_writeable_or_broken(); + static std::string get_last_error_message(); protected: @@ -115,9 +126,12 @@ class posix_fd_file : private posix_fd_file_ref { posix_fd_file_ref ref() noexcept; + using posix_fd_file_ref::block_until_pipe_is_writeable_or_broken; using posix_fd_file_ref::get; using posix_fd_file_ref::get_last_error_message; + using posix_fd_file_ref::is_pipe_non_blocking; using posix_fd_file_ref::read; + using posix_fd_file_ref::set_pipe_non_blocking; using posix_fd_file_ref::write; private: diff --git a/src/quick-lint-js/have.h b/src/quick-lint-js/have.h index 7be10eddb9..9b46c3b3aa 100644 --- a/src/quick-lint-js/have.h +++ b/src/quick-lint-js/have.h @@ -128,6 +128,14 @@ #endif #endif +#if !defined(QLJS_HAVE_POLL) +#if defined(_POSIX_VERSION) && _POSIX_VERSION >= 200112L +#define QLJS_HAVE_POLL 1 +#else +#define QLJS_HAVE_POLL 0 +#endif +#endif + #if !defined(QLJS_HAVE_SETRLIMIT) #if (defined(_POSIX_VERSION) && _POSIX_VERSION >= 200809L) || \ (defined(__APPLE__) && defined(_POSIX_VERSION) && \ diff --git a/src/quick-lint-js/lsp-pipe-writer.h b/src/quick-lint-js/lsp-pipe-writer.h index fb945e7cc2..9a2361ed40 100644 --- a/src/quick-lint-js/lsp-pipe-writer.h +++ b/src/quick-lint-js/lsp-pipe-writer.h @@ -4,8 +4,11 @@ #ifndef QUICK_LINT_JS_LSP_PIPE_WRITER_H #define QUICK_LINT_JS_LSP_PIPE_WRITER_H +#include +#include #include #include +#include namespace quick_lint_js { class byte_buffer; @@ -14,11 +17,26 @@ class byte_buffer; // a pipe or socket. // // lsp_pipe_writer satisfies lsp_endpoint_remote. +// +// lsp_pipe_writer is not thread-safe. class lsp_pipe_writer { public: + // Precondition: pipe is non-blocking explicit lsp_pipe_writer(platform_file_ref pipe); - void send_message(const byte_buffer&); + lsp_pipe_writer(const lsp_pipe_writer &) = delete; + lsp_pipe_writer &operator=(const lsp_pipe_writer &) = delete; + + ~lsp_pipe_writer(); + + // send_message is non-blocking. It might defer the work of sending to a + // separate thread. + void send_message(const byte_buffer &); + + // Block waiting for previous calls to send_message to fully complete. After + // flush returns, lsp_pipe_writer won't write data to the pipe until + // send_message is called again. + void flush(); private: template @@ -26,7 +44,21 @@ class lsp_pipe_writer { void write(string8_view); + string8_view write_as_much_as_possible_now(string8_view); + + void start_flushing_thread_if_needed(); + void run_flushing_thread(); + platform_file_ref pipe_; + + std::thread flushing_thread_; + std::mutex mutex_; + std::condition_variable data_is_pending_; + std::condition_variable data_is_flushed_; + + // Protected by mutex_: + byte_buffer pending_; + bool stop_ = false; }; } diff --git a/test/test-lsp-pipe-writer.cpp b/test/test-lsp-pipe-writer.cpp index 7b1022cbd9..8e8e83eaf2 100644 --- a/test/test-lsp-pipe-writer.cpp +++ b/test/test-lsp-pipe-writer.cpp @@ -11,16 +11,13 @@ #include #include #include +#include #include #include +#include #include #include -#if QLJS_HAVE_PTHREAD_KILL -#include -#include -#endif - #if QLJS_HAVE_FCNTL_H #include #endif @@ -33,32 +30,10 @@ namespace quick_lint_js { namespace { std::size_t pipe_buffer_size(platform_file_ref); -#if QLJS_HAVE_PTHREAD_KILL -class sigaction_guard { - public: - explicit sigaction_guard(int signal_number) : signal_number_(signal_number) { - int rc = - ::sigaction(this->signal_number_, nullptr, &this->saved_sigaction_); - QLJS_ALWAYS_ASSERT(rc == 0); - } - - ~sigaction_guard() { - int rc = - ::sigaction(this->signal_number_, &this->saved_sigaction_, nullptr); - QLJS_ALWAYS_ASSERT(rc == 0); - } - - sigaction_guard(const sigaction_guard &) = delete; - sigaction_guard &operator=(const sigaction_guard &) = delete; - - private: - int signal_number_; - struct sigaction saved_sigaction_; -}; -#endif - class test_lsp_pipe_writer : public ::testing::Test { public: + explicit test_lsp_pipe_writer() { this->pipe.writer.set_pipe_non_blocking(); } + pipe_fds pipe = make_pipe(); lsp_pipe_writer writer{this->pipe.writer.ref()}; }; @@ -71,6 +46,7 @@ byte_buffer byte_buffer_of(string8_view data) { TEST_F(test_lsp_pipe_writer, small_message_includes_content_length) { this->writer.send_message(byte_buffer_of(u8"hi")); + this->writer.flush(); this->pipe.writer.close(); read_file_result data = read_file("", this->pipe.reader.ref()); @@ -87,6 +63,7 @@ TEST_F(test_lsp_pipe_writer, large_message_sends_fully) { u8"[" + string8(pipe_buffer_size(this->pipe.writer.ref()) * 3, u8'x') + u8"]"; this->writer.send_message(byte_buffer_of(message)); + this->writer.flush(); this->pipe.writer.close(); read_file_result data = data_future.get(); @@ -96,52 +73,31 @@ TEST_F(test_lsp_pipe_writer, large_message_sends_fully) { EXPECT_NE(data_content.find(message), data_content.npos); } -#if QLJS_HAVE_PTHREAD_KILL -TEST_F(test_lsp_pipe_writer, large_message_sends_fully_with_interrupt) { - sigaction_guard signal_guard(SIGALRM); - - ::pthread_t writer_thread_id = ::pthread_self(); - - ASSERT_NE(::signal(SIGALRM, - [](int) { - // Do nothing. Just interrupt syscalls. - }), - SIG_ERR) - << std::strerror(errno); - - std::future data_future = - std::async(std::launch::async, [this, writer_thread_id]() { - int rc; - - // Interrupt the write() syscall, causing it to return early. - std::this_thread::sleep_for(10ms); // Wait for write() to execute. - rc = ::pthread_kill(writer_thread_id, SIGALRM); - EXPECT_EQ(rc, 0) << std::strerror(rc); - // The pipe's buffer should now be full. - - // Interrupt the write() syscall again, causing it to restart. This - // write() call shouldn't have written anything, because the pipe's - // buffer is already full. - std::this_thread::sleep_for(1ms); // Wait for write() to execute. - rc = ::pthread_kill(writer_thread_id, SIGALRM); - EXPECT_EQ(rc, 0) << std::strerror(rc); - - return read_file("", this->pipe.reader.ref()); - }); - +TEST_F(test_lsp_pipe_writer, large_message_with_no_reader_does_not_block) { string8 message = u8"[" + string8(pipe_buffer_size(this->pipe.writer.ref()) * 3, u8'x') + u8"]"; - this->writer.send_message(byte_buffer_of(message)); - this->pipe.writer.close(); - - read_file_result data = data_future.get(); - ASSERT_TRUE(data.ok()) << data.error; - - string8_view data_content = data.content.string_view(); - EXPECT_NE(data_content.find(message), data_content.npos); + this->writer.send_message(byte_buffer_of(message)); // Shouldn't block. + + static std::promise received_message_promise; + received_message_promise = std::promise(); + + // Read exactly as many bytes as needed to parse the message, then set + // received_message_promise. + std::future receiving_thread = std::async(std::launch::async, [this] { + struct message_handler : lsp_message_parser { + void message_parsed(string8_view message_content) { + received_message_promise.set_value(string8(message_content)); + } + }; + pipe_reader reader(this->pipe.reader.ref()); + reader.run(); + }); + + string8 data = received_message_promise.get_future().get(); + this->pipe.writer.close(); // Stop receiving_thread ASAP. + EXPECT_EQ(data, message); } -#endif std::size_t pipe_buffer_size([[maybe_unused]] platform_file_ref pipe) { #if QLJS_HAVE_F_GETPIPE_SZ