Skip to content

Commit

Permalink
@@@ LSP: Prevent deadlock with synchronous client
Browse files Browse the repository at this point in the history
@@@ winders
@@@ perf. lots of copying. sadge. =[
@@@ test LSP server manually

If an LSP client sends a bunch of requests, but does not read from
quick-lint-js' stdout, quick-lint-js can hang in pipe_reader::write.
(This came up when writing some benchmarks. My LSP client naively made a
bunch of requests without reading responses during the requests.)

Prevent this (unlikely) deadlock by switching stdout to non-blocking
mode, buffering data in memory, and resuming reading from stdin as soon
as possible.

The current implementation is relatively inefficient. Data is copied
many times. Maybe we'll clean this up in the future.
  • Loading branch information
strager committed Apr 18, 2021
1 parent d6d3afb commit 9c74e51
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 74 deletions.
11 changes: 10 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ option(
FALSE
)

find_package(Threads REQUIRED)

quick_lint_js_add_executable(
quick-lint-js
main.cpp
Expand Down Expand Up @@ -129,7 +131,14 @@ quick_lint_js_add_library(
wasm-demo-location.cpp
)
target_include_directories(quick-lint-js-lib PUBLIC .)
target_link_libraries(quick-lint-js-lib PUBLIC boost_container simdjson)
target_link_libraries(
quick-lint-js-lib
PUBLIC
boost_container
simdjson
PRIVATE
Threads::Threads
)
if (${CMAKE_VERSION} VERSION_GREATER_EQUAL 3.17.3)
target_precompile_headers(
quick-lint-js-lib
Expand Down
80 changes: 80 additions & 0 deletions src/file-handle.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) 2020 Matthew Glazar
// See end of file for extended copyright information.

#include <array>
#include <cerrno>
#include <cstddef>
#include <cstdio>
Expand All @@ -22,6 +23,10 @@
#include <sys/stat.h>
#endif

#if QLJS_HAVE_POLL
#include <poll.h>
#endif

#if QLJS_HAVE_UNISTD_H
#include <unistd.h>
#endif
Expand Down Expand Up @@ -83,6 +88,36 @@ std::optional<int> windows_handle_file_ref::write(const void *buffer,
return narrow_cast<int>(write_size);
}

bool windows_handle_file_ref::is_pipe_non_blocking() {
DWORD state;
BOOL ok = ::GetNamedPipeHandleStateA(this->get(),
/*lpState=*/&state,
/*lpCurInstances=*/nullptr,
/*lpMaxCollectionCount=*/nullptr,
/*lpCollectDataTimeout=*/nullptr,
/*lpUserName=*/nullptr,
/*nMaxUserNameSize=*/0);
if (!ok) {
QLJS_UNIMPLEMENTED();
}
return (state & PIPE_NOWAIT) == PIPE_NOWAIT;
}

void windows_handle_file_ref::set_pipe_non_blocking() {
DWORD mode = PIPE_READMODE_BYTE | PIPE_NOWAIT;
BOOL ok = ::SetNamedPipeHandleState(this->get(), /*lpMode=*/&mode,
/*lpMaxCollectionCount=*/nullptr,
/*lpCollectDataTimeout=*/nullptr);
if (!ok) {
QLJS_UNIMPLEMENTED();
}
}

void windows_handle_file_ref::block_until_pipe_is_writeable_or_broken() {
// @@@ we should block yo. but how?
// https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
}

std::string windows_handle_file_ref::get_last_error_message() {
return windows_error_message(::GetLastError());
}
Expand Down Expand Up @@ -141,6 +176,51 @@ std::optional<int> posix_fd_file_ref::write(const void *buffer,
return narrow_cast<int>(written_size);
}

bool posix_fd_file_ref::is_pipe_non_blocking() {
#if QLJS_HAVE_FCNTL_H
int rc = ::fcntl(this->get(), F_GETFL, O_NONBLOCK);
if (rc == -1) {
QLJS_UNIMPLEMENTED();
}
return rc != 0;
#else
#error "Unsupported platform"
#endif
}

void posix_fd_file_ref::set_pipe_non_blocking() {
#if QLJS_HAVE_FCNTL_H
int rc = ::fcntl(this->get(), F_SETFL, O_NONBLOCK);
if (rc != 0) {
QLJS_UNIMPLEMENTED();
}
#else
#error "Unsupported platform"
#endif
}

void posix_fd_file_ref::block_until_pipe_is_writeable_or_broken() {
#if QLJS_HAVE_POLL
retry:
std::array<::pollfd, 1> fds;
fds[0].fd = this->get();
fds[0].events = POLLOUT;
int rc =
::poll(fds.data(), narrow_cast<::nfds_t>(fds.size()), /*timeout=*/-1);
if (rc == -1) {
if (errno == EAGAIN || errno == EINTR) {
goto retry;
}
QLJS_UNIMPLEMENTED();
}
QLJS_ASSERT(rc != 0); // Shouldn't time out.
QLJS_ASSERT(rc == 1);
QLJS_ASSERT((fds[0].revents & (POLLOUT | POLLERR | POLLHUP)) != 0);
#else
#error "Unsupported platform"
#endif
}

std::string posix_fd_file_ref::get_last_error_message() {
return std::strerror(errno);
}
Expand Down
82 changes: 81 additions & 1 deletion src/lsp-pipe-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,30 @@
// See end of file for extended copyright information.

#include <array>
#include <condition_variable>
#include <mutex>
#include <quick-lint-js/assert.h>
#include <quick-lint-js/byte-buffer.h>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <quick-lint-js/file.h>
#include <quick-lint-js/integer.h>
#include <quick-lint-js/lsp-pipe-writer.h>
#include <quick-lint-js/narrow-cast.h>
#include <thread>

namespace quick_lint_js {
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {}
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {
QLJS_ASSERT(this->pipe_.is_pipe_non_blocking());
}

lsp_pipe_writer::~lsp_pipe_writer() {
if (this->flushing_thread_.joinable()) {
this->stop_ = true;
this->data_is_pending_.notify_one();
this->flushing_thread_.join();
}
}

void lsp_pipe_writer::send_message(const byte_buffer& message) {
this->write(u8"Content-Length: ");
Expand All @@ -25,6 +39,12 @@ void lsp_pipe_writer::send_message(const byte_buffer& message) {
this->write(message_string);
}

void lsp_pipe_writer::flush() {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
this->data_is_flushed_.wait(lock, [this] { return this->pending_.empty(); });
}

template <class T>
void lsp_pipe_writer::write_integer(T value) {
std::array<char8, integer_string_length<T>> buffer;
Expand All @@ -34,14 +54,74 @@ void lsp_pipe_writer::write_integer(T value) {
}

void lsp_pipe_writer::write(string8_view message) {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
if (this->pending_.empty()) {
// The current thread has control over this->pipe_.
string8_view unwritten = this->write_as_much_as_possible_now(message);
if (!unwritten.empty()) {
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);
lock.unlock();
this->start_flushing_thread_if_needed();
}
} else {
// The flushing thread has control over this->pipe_.
this->pending_.append_copy(message);
}
}

string8_view lsp_pipe_writer::write_as_much_as_possible_now(
string8_view message) {
while (!message.empty()) {
std::optional<int> bytes_written =
this->pipe_.write(message.data(), narrow_cast<int>(message.size()));
if (!bytes_written.has_value()) {
if (errno == EAGAIN) {
break;
}
QLJS_UNIMPLEMENTED();
}
message = message.substr(narrow_cast<std::size_t>(*bytes_written));
}
return message;
}

void lsp_pipe_writer::start_flushing_thread_if_needed() {
if (!this->flushing_thread_.joinable()) {
this->flushing_thread_ =
std::thread([this] { this->run_flushing_thread(); });
}
}

void lsp_pipe_writer::run_flushing_thread() {
std::unique_lock<std::mutex> lock(this->mutex_);
for (;;) {
this->data_is_pending_.wait(
lock, [this] { return this->stop_ || !this->pending_.empty(); });
if (this->stop_) {
break;
}
QLJS_ASSERT(!this->pending_.empty());

// TODO(strager): Don't copy. Write all the chunks with writev if possible.
string8 message_string;
message_string.resize(this->pending_.size());
this->pending_.copy_to(message_string.data());
this->pending_ = byte_buffer();
string8_view unwritten =
this->write_as_much_as_possible_now(message_string);
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);

if (this->pending_.empty()) {
this->data_is_flushed_.notify_one();
} else {
lock.unlock();
this->pipe_.block_until_pipe_is_writeable_or_broken();
lock.lock();
}
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/quick-lint-js/file-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class windows_handle_file_ref {
file_read_result read(void *buffer, int buffer_size) noexcept;
std::optional<int> write(const void *buffer, int buffer_size) noexcept;

bool is_pipe_non_blocking();
void set_pipe_non_blocking();
void block_until_pipe_is_writeable_or_broken();

static std::string get_last_error_message();

protected:
Expand All @@ -74,9 +78,12 @@ class windows_handle_file : private windows_handle_file_ref {

windows_handle_file_ref ref() noexcept;

using windows_handle_file_ref::block_until_pipe_is_writeable_or_broken;
using windows_handle_file_ref::get;
using windows_handle_file_ref::get_last_error_message;
using windows_handle_file_ref::is_pipe_non_blocking;
using windows_handle_file_ref::read;
using windows_handle_file_ref::set_pipe_non_blocking;
using windows_handle_file_ref::write;

private:
Expand All @@ -95,6 +102,10 @@ class posix_fd_file_ref {
file_read_result read(void *buffer, int buffer_size) noexcept;
std::optional<int> write(const void *buffer, int buffer_size) noexcept;

bool is_pipe_non_blocking();
void set_pipe_non_blocking();
void block_until_pipe_is_writeable_or_broken();

static std::string get_last_error_message();

protected:
Expand All @@ -115,9 +126,12 @@ class posix_fd_file : private posix_fd_file_ref {

posix_fd_file_ref ref() noexcept;

using posix_fd_file_ref::block_until_pipe_is_writeable_or_broken;
using posix_fd_file_ref::get;
using posix_fd_file_ref::get_last_error_message;
using posix_fd_file_ref::is_pipe_non_blocking;
using posix_fd_file_ref::read;
using posix_fd_file_ref::set_pipe_non_blocking;
using posix_fd_file_ref::write;

private:
Expand Down
8 changes: 8 additions & 0 deletions src/quick-lint-js/have.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@
#endif
#endif

#if !defined(QLJS_HAVE_POLL)
#if defined(_POSIX_VERSION) && _POSIX_VERSION >= 200112L
#define QLJS_HAVE_POLL 1
#else
#define QLJS_HAVE_POLL 0
#endif
#endif

#if !defined(QLJS_HAVE_SETRLIMIT)
#if (defined(_POSIX_VERSION) && _POSIX_VERSION >= 200809L) || \
(defined(__APPLE__) && defined(_POSIX_VERSION) && \
Expand Down
34 changes: 33 additions & 1 deletion src/quick-lint-js/lsp-pipe-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
#ifndef QUICK_LINT_JS_LSP_PIPE_WRITER_H
#define QUICK_LINT_JS_LSP_PIPE_WRITER_H

#include <condition_variable>
#include <mutex>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <thread>

namespace quick_lint_js {
class byte_buffer;
Expand All @@ -14,19 +17,48 @@ class byte_buffer;
// a pipe or socket.
//
// lsp_pipe_writer satisfies lsp_endpoint_remote.
//
// lsp_pipe_writer is not thread-safe.
class lsp_pipe_writer {
public:
// Precondition: pipe is non-blocking
explicit lsp_pipe_writer(platform_file_ref pipe);

void send_message(const byte_buffer&);
lsp_pipe_writer(const lsp_pipe_writer &) = delete;
lsp_pipe_writer &operator=(const lsp_pipe_writer &) = delete;

~lsp_pipe_writer();

// send_message is non-blocking. It might defer the work of sending to a
// separate thread.
void send_message(const byte_buffer &);

// Block waiting for previous calls to send_message to fully complete. After
// flush returns, lsp_pipe_writer won't write data to the pipe until
// send_message is called again.
void flush();

private:
template <class T>
void write_integer(T);

void write(string8_view);

string8_view write_as_much_as_possible_now(string8_view);

void start_flushing_thread_if_needed();
void run_flushing_thread();

platform_file_ref pipe_;

std::thread flushing_thread_;
std::mutex mutex_;
std::condition_variable data_is_pending_;
std::condition_variable data_is_flushed_;

// Protected by mutex_:
byte_buffer pending_;
bool stop_ = false;
};
}

Expand Down
Loading

0 comments on commit 9c74e51

Please sign in to comment.