Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSP: Prevent deadlock with synchronous client #237

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ option(
FALSE
)

find_package(Threads REQUIRED)

quick_lint_js_add_executable(
quick-lint-js
main.cpp
Expand Down Expand Up @@ -129,7 +131,14 @@ quick_lint_js_add_library(
wasm-demo-location.cpp
)
target_include_directories(quick-lint-js-lib PUBLIC .)
target_link_libraries(quick-lint-js-lib PUBLIC boost_container simdjson)
target_link_libraries(
quick-lint-js-lib
PUBLIC
boost_container
simdjson
PRIVATE
Threads::Threads
)
if (${CMAKE_VERSION} VERSION_GREATER_EQUAL 3.17.3)
target_precompile_headers(
quick-lint-js-lib
Expand Down
80 changes: 80 additions & 0 deletions src/file-handle.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (C) 2020 Matthew Glazar
// See end of file for extended copyright information.

#include <array>
#include <cerrno>
#include <cstddef>
#include <cstdio>
Expand All @@ -22,6 +23,10 @@
#include <sys/stat.h>
#endif

#if QLJS_HAVE_POLL
#include <poll.h>
#endif

#if QLJS_HAVE_UNISTD_H
#include <unistd.h>
#endif
Expand Down Expand Up @@ -83,6 +88,36 @@ std::optional<int> windows_handle_file_ref::write(const void *buffer,
return narrow_cast<int>(write_size);
}

bool windows_handle_file_ref::is_pipe_non_blocking() {
DWORD state;
BOOL ok = ::GetNamedPipeHandleStateA(this->get(),
/*lpState=*/&state,
/*lpCurInstances=*/nullptr,
/*lpMaxCollectionCount=*/nullptr,
/*lpCollectDataTimeout=*/nullptr,
/*lpUserName=*/nullptr,
/*nMaxUserNameSize=*/0);
if (!ok) {
QLJS_UNIMPLEMENTED();
}
return (state & PIPE_NOWAIT) == PIPE_NOWAIT;
}

void windows_handle_file_ref::set_pipe_non_blocking() {
DWORD mode = PIPE_READMODE_BYTE | PIPE_NOWAIT;
BOOL ok = ::SetNamedPipeHandleState(this->get(), /*lpMode=*/&mode,
/*lpMaxCollectionCount=*/nullptr,
/*lpCollectDataTimeout=*/nullptr);
if (!ok) {
QLJS_UNIMPLEMENTED();
}
}

void windows_handle_file_ref::block_until_pipe_is_writeable_or_broken() {
// @@@ we should block yo. but how?
// https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
}

std::string windows_handle_file_ref::get_last_error_message() {
return windows_error_message(::GetLastError());
}
Expand Down Expand Up @@ -141,6 +176,51 @@ std::optional<int> posix_fd_file_ref::write(const void *buffer,
return narrow_cast<int>(written_size);
}

bool posix_fd_file_ref::is_pipe_non_blocking() {
#if QLJS_HAVE_FCNTL_H
int rc = ::fcntl(this->get(), F_GETFL, O_NONBLOCK);
if (rc == -1) {
QLJS_UNIMPLEMENTED();
}
return rc != 0;
#else
#error "Unsupported platform"
#endif
}

void posix_fd_file_ref::set_pipe_non_blocking() {
#if QLJS_HAVE_FCNTL_H
int rc = ::fcntl(this->get(), F_SETFL, O_NONBLOCK);
if (rc != 0) {
QLJS_UNIMPLEMENTED();
}
#else
#error "Unsupported platform"
#endif
}

void posix_fd_file_ref::block_until_pipe_is_writeable_or_broken() {
#if QLJS_HAVE_POLL
retry:
std::array<::pollfd, 1> fds;
fds[0].fd = this->get();
fds[0].events = POLLOUT;
int rc =
::poll(fds.data(), narrow_cast<::nfds_t>(fds.size()), /*timeout=*/-1);
if (rc == -1) {
if (errno == EAGAIN || errno == EINTR) {
goto retry;
}
QLJS_UNIMPLEMENTED();
}
QLJS_ASSERT(rc != 0); // Shouldn't time out.
QLJS_ASSERT(rc == 1);
QLJS_ASSERT((fds[0].revents & (POLLOUT | POLLERR | POLLHUP)) != 0);
#else
#error "Unsupported platform"
#endif
}

std::string posix_fd_file_ref::get_last_error_message() {
return std::strerror(errno);
}
Expand Down
82 changes: 81 additions & 1 deletion src/lsp-pipe-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,30 @@
// See end of file for extended copyright information.

#include <array>
#include <condition_variable>
#include <mutex>
#include <quick-lint-js/assert.h>
#include <quick-lint-js/byte-buffer.h>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <quick-lint-js/file.h>
#include <quick-lint-js/integer.h>
#include <quick-lint-js/lsp-pipe-writer.h>
#include <quick-lint-js/narrow-cast.h>
#include <thread>

namespace quick_lint_js {
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {}
lsp_pipe_writer::lsp_pipe_writer(platform_file_ref pipe) : pipe_(pipe) {
QLJS_ASSERT(this->pipe_.is_pipe_non_blocking());
}

lsp_pipe_writer::~lsp_pipe_writer() {
if (this->flushing_thread_.joinable()) {
this->stop_ = true;
this->data_is_pending_.notify_one();
this->flushing_thread_.join();
}
}

void lsp_pipe_writer::send_message(const byte_buffer& message) {
this->write(u8"Content-Length: ");
Expand All @@ -25,6 +39,12 @@ void lsp_pipe_writer::send_message(const byte_buffer& message) {
this->write(message_string);
}

void lsp_pipe_writer::flush() {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
this->data_is_flushed_.wait(lock, [this] { return this->pending_.empty(); });
}

template <class T>
void lsp_pipe_writer::write_integer(T value) {
std::array<char8, integer_string_length<T>> buffer;
Expand All @@ -34,14 +54,74 @@ void lsp_pipe_writer::write_integer(T value) {
}

void lsp_pipe_writer::write(string8_view message) {
std::unique_lock<std::mutex> lock(this->mutex_);
QLJS_ASSERT(!this->stop_);
if (this->pending_.empty()) {
// The current thread has control over this->pipe_.
string8_view unwritten = this->write_as_much_as_possible_now(message);
if (!unwritten.empty()) {
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);
lock.unlock();
this->start_flushing_thread_if_needed();
}
} else {
// The flushing thread has control over this->pipe_.
this->pending_.append_copy(message);
}
}

string8_view lsp_pipe_writer::write_as_much_as_possible_now(
string8_view message) {
while (!message.empty()) {
std::optional<int> bytes_written =
this->pipe_.write(message.data(), narrow_cast<int>(message.size()));
if (!bytes_written.has_value()) {
if (errno == EAGAIN) {
break;
}
QLJS_UNIMPLEMENTED();
}
message = message.substr(narrow_cast<std::size_t>(*bytes_written));
}
return message;
}

void lsp_pipe_writer::start_flushing_thread_if_needed() {
if (!this->flushing_thread_.joinable()) {
this->flushing_thread_ =
std::thread([this] { this->run_flushing_thread(); });
}
}

void lsp_pipe_writer::run_flushing_thread() {
std::unique_lock<std::mutex> lock(this->mutex_);
for (;;) {
this->data_is_pending_.wait(
lock, [this] { return this->stop_ || !this->pending_.empty(); });
if (this->stop_) {
break;
}
QLJS_ASSERT(!this->pending_.empty());

// TODO(strager): Don't copy. Write all the chunks with writev if possible.
string8 message_string;
message_string.resize(this->pending_.size());
this->pending_.copy_to(message_string.data());
this->pending_ = byte_buffer();
string8_view unwritten =
this->write_as_much_as_possible_now(message_string);
// TODO(strager): Avoid copying.
this->pending_.append_copy(unwritten);

if (this->pending_.empty()) {
this->data_is_flushed_.notify_one();
} else {
lock.unlock();
this->pipe_.block_until_pipe_is_writeable_or_broken();
lock.lock();
}
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions src/quick-lint-js/file-handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class windows_handle_file_ref {
file_read_result read(void *buffer, int buffer_size) noexcept;
std::optional<int> write(const void *buffer, int buffer_size) noexcept;

bool is_pipe_non_blocking();
void set_pipe_non_blocking();
void block_until_pipe_is_writeable_or_broken();

static std::string get_last_error_message();

protected:
Expand All @@ -74,9 +78,12 @@ class windows_handle_file : private windows_handle_file_ref {

windows_handle_file_ref ref() noexcept;

using windows_handle_file_ref::block_until_pipe_is_writeable_or_broken;
using windows_handle_file_ref::get;
using windows_handle_file_ref::get_last_error_message;
using windows_handle_file_ref::is_pipe_non_blocking;
using windows_handle_file_ref::read;
using windows_handle_file_ref::set_pipe_non_blocking;
using windows_handle_file_ref::write;

private:
Expand All @@ -95,6 +102,10 @@ class posix_fd_file_ref {
file_read_result read(void *buffer, int buffer_size) noexcept;
std::optional<int> write(const void *buffer, int buffer_size) noexcept;

bool is_pipe_non_blocking();
void set_pipe_non_blocking();
void block_until_pipe_is_writeable_or_broken();

static std::string get_last_error_message();

protected:
Expand All @@ -115,9 +126,12 @@ class posix_fd_file : private posix_fd_file_ref {

posix_fd_file_ref ref() noexcept;

using posix_fd_file_ref::block_until_pipe_is_writeable_or_broken;
using posix_fd_file_ref::get;
using posix_fd_file_ref::get_last_error_message;
using posix_fd_file_ref::is_pipe_non_blocking;
using posix_fd_file_ref::read;
using posix_fd_file_ref::set_pipe_non_blocking;
using posix_fd_file_ref::write;

private:
Expand Down
8 changes: 8 additions & 0 deletions src/quick-lint-js/have.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@
#endif
#endif

#if !defined(QLJS_HAVE_POLL)
#if defined(_POSIX_VERSION) && _POSIX_VERSION >= 200112L
#define QLJS_HAVE_POLL 1
#else
#define QLJS_HAVE_POLL 0
#endif
#endif

#if !defined(QLJS_HAVE_SETRLIMIT)
#if (defined(_POSIX_VERSION) && _POSIX_VERSION >= 200809L) || \
(defined(__APPLE__) && defined(_POSIX_VERSION) && \
Expand Down
34 changes: 33 additions & 1 deletion src/quick-lint-js/lsp-pipe-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
#ifndef QUICK_LINT_JS_LSP_PIPE_WRITER_H
#define QUICK_LINT_JS_LSP_PIPE_WRITER_H

#include <condition_variable>
#include <mutex>
#include <quick-lint-js/char8.h>
#include <quick-lint-js/file-handle.h>
#include <thread>

namespace quick_lint_js {
class byte_buffer;
Expand All @@ -14,19 +17,48 @@ class byte_buffer;
// a pipe or socket.
//
// lsp_pipe_writer satisfies lsp_endpoint_remote.
//
// lsp_pipe_writer is not thread-safe.
class lsp_pipe_writer {
public:
// Precondition: pipe is non-blocking
explicit lsp_pipe_writer(platform_file_ref pipe);

void send_message(const byte_buffer&);
lsp_pipe_writer(const lsp_pipe_writer &) = delete;
lsp_pipe_writer &operator=(const lsp_pipe_writer &) = delete;

~lsp_pipe_writer();

// send_message is non-blocking. It might defer the work of sending to a
// separate thread.
void send_message(const byte_buffer &);

// Block waiting for previous calls to send_message to fully complete. After
// flush returns, lsp_pipe_writer won't write data to the pipe until
// send_message is called again.
void flush();

private:
template <class T>
void write_integer(T);

void write(string8_view);

string8_view write_as_much_as_possible_now(string8_view);

void start_flushing_thread_if_needed();
void run_flushing_thread();

platform_file_ref pipe_;

std::thread flushing_thread_;
std::mutex mutex_;
std::condition_variable data_is_pending_;
std::condition_variable data_is_flushed_;

// Protected by mutex_:
byte_buffer pending_;
bool stop_ = false;
};
}

Expand Down
Loading