Skip to content

Commit

Permalink
ruff server no longer hangs after shutdown (#11222)
Browse files Browse the repository at this point in the history
## Summary

Fixes #11207.

The server would hang after handling a shutdown request on
`IoThreads::join()` because a global sender (`MESSENGER`, used to send
`window/showMessage` notifications) would remain allocated even after
the event loop finished, which kept the writer I/O thread channel open.

To fix this, I've made a few structural changes to `ruff server`. I've
wrapped the send/receive channels and thread join handle behind a new
struct, `Connection`, which facilitates message sending and receiving,
and also runs `IoThreads::join()` after the event loop finishes. To
control the number of sender channels, the `Connection` wraps the sender
channel in an `Arc` and only allows the creation of a wrapper type,
`ClientSender`, which hold a weak reference to this `Arc` instead of
direct channel access. The wrapper type implements the channel methods
directly to prevent access to the inner channel (which would allow the
channel to be cloned). ClientSender's function is analogous to
[`WeakSender` in
`tokio`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.WeakSender.html).
Additionally, the receiver channel cannot be accessed directly - the
`Connection` only exposes an iterator over it.

These changes will guarantee that all channels are closed before the I/O
threads are joined.

## Test Plan

Repeatedly open and close an editor utilizing `ruff server` while
observing the task monitor. The net total amount of open `ruff`
instances should be zero once all editor windows have closed.

The following logs should also appear after the server is shut down:

<img width="835" alt="Screenshot 2024-04-30 at 3 56 22 PM"
src="https://github.com/astral-sh/ruff/assets/19577865/404b74f5-ef08-4bb4-9fa2-72e72b946695">

This can be tested on VS Code by changing the settings and then checking
`Output`.
  • Loading branch information
snowsignal authored May 3, 2024
1 parent 9e69cd6 commit dfbeca5
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 53 deletions.
4 changes: 2 additions & 2 deletions crates/ruff_server/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::server::ClientSender;

static MESSENGER: OnceLock<ClientSender> = OnceLock::new();

pub(crate) fn init_messenger(client_sender: &ClientSender) {
pub(crate) fn init_messenger(client_sender: ClientSender) {
MESSENGER
.set(client_sender.clone())
.set(client_sender)
.expect("messenger should only be initialized once");

// unregister any previously registered panic hook
Expand Down
69 changes: 33 additions & 36 deletions crates/ruff_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::num::NonZeroUsize;

use lsp::Connection;
use lsp_server as lsp;
use lsp_types as types;
use types::ClientCapabilities;
Expand All @@ -18,6 +17,8 @@ use types::TextDocumentSyncOptions;
use types::WorkDoneProgressOptions;
use types::WorkspaceFoldersServerCapabilities;

use self::connection::Connection;
use self::connection::ConnectionInitializer;
use self::schedule::event_loop_thread;
use self::schedule::Scheduler;
use self::schedule::Task;
Expand All @@ -28,34 +29,39 @@ use crate::PositionEncoding;

mod api;
mod client;
mod connection;
mod schedule;

pub(crate) use client::ClientSender;
pub(crate) use connection::ClientSender;

pub(crate) type Result<T> = std::result::Result<T, api::Error>;

pub struct Server {
conn: lsp::Connection,
connection: Connection,
client_capabilities: ClientCapabilities,
threads: lsp::IoThreads,
worker_threads: NonZeroUsize,
session: Session,
}

impl Server {
pub fn new(worker_threads: NonZeroUsize) -> crate::Result<Self> {
let (conn, threads) = lsp::Connection::stdio();
let connection = ConnectionInitializer::stdio();

crate::message::init_messenger(&conn.sender);

let (id, params) = conn.initialize_start()?;

let init_params: types::InitializeParams = serde_json::from_value(params)?;
let (id, init_params) = connection.initialize_start()?;

let client_capabilities = init_params.capabilities;
let position_encoding = Self::find_best_position_encoding(&client_capabilities);
let server_capabilities = Self::server_capabilities(position_encoding);

let connection = connection.initialize_finish(
id,
&server_capabilities,
crate::SERVER_NAME,
crate::version(),
)?;

crate::message::init_messenger(connection.make_sender());

let AllSettings {
global_settings,
mut workspace_settings,
Expand Down Expand Up @@ -86,19 +92,8 @@ impl Server {
anyhow::anyhow!("Failed to get the current working directory while creating a default workspace.")
})?;

let initialize_data = serde_json::json!({
"capabilities": server_capabilities,
"serverInfo": {
"name": crate::SERVER_NAME,
"version": crate::version()
}
});

conn.initialize_finish(id, initialize_data)?;

Ok(Self {
conn,
threads,
connection,
worker_threads,
session: Session::new(
&client_capabilities,
Expand All @@ -111,17 +106,20 @@ impl Server {
}

pub fn run(self) -> crate::Result<()> {
let result = event_loop_thread(move || {
event_loop_thread(move || {
Self::event_loop(
&self.conn,
&self.connection,
&self.client_capabilities,
self.session,
self.worker_threads,
)
)?;
self.connection.close()?;
// Note: when we start routing tracing through the LSP,
// this should be replaced with a log directly to `stderr`.
tracing::info!("Server has shut down successfully");
Ok(())
})?
.join();
self.threads.join()?;
result
.join()
}

#[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet.
Expand All @@ -132,22 +130,21 @@ impl Server {
worker_threads: NonZeroUsize,
) -> crate::Result<()> {
let mut scheduler =
schedule::Scheduler::new(&mut session, worker_threads, &connection.sender);
schedule::Scheduler::new(&mut session, worker_threads, connection.make_sender());

Self::try_register_capabilities(client_capabilities, &mut scheduler);
for msg in &connection.receiver {
for msg in connection.incoming() {
if connection.handle_shutdown(&msg)? {
break;
}
let task = match msg {
lsp::Message::Request(req) => {
if connection.handle_shutdown(&req)? {
return Ok(());
}
api::request(req)
}
lsp::Message::Request(req) => api::request(req),
lsp::Message::Notification(notification) => api::notification(notification),
lsp::Message::Response(response) => scheduler.response(response),
};
scheduler.dispatch(task);
}

Ok(())
}

Expand Down
19 changes: 8 additions & 11 deletions crates/ruff_server/src/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use lsp_server::{Notification, RequestId};
use rustc_hash::FxHashMap;
use serde_json::Value;

use super::schedule::Task;

pub(crate) type ClientSender = crossbeam::channel::Sender<lsp_server::Message>;
use super::{schedule::Task, ClientSender};

type ResponseBuilder<'s> = Box<dyn FnOnce(lsp_server::Response) -> Task<'s>>;

Expand All @@ -29,12 +27,12 @@ pub(crate) struct Requester<'s> {
}

impl<'s> Client<'s> {
pub(super) fn new(sender: &ClientSender) -> Self {
pub(super) fn new(sender: ClientSender) -> Self {
Self {
notifier: Notifier(sender.clone()),
responder: Responder(sender.clone()),
requester: Requester {
sender: sender.clone(),
sender,
next_request_id: 1,
response_handlers: FxHashMap::default(),
},
Expand All @@ -60,16 +58,15 @@ impl Notifier {

let message = lsp_server::Message::Notification(Notification::new(method, params));

Ok(self.0.send(message)?)
self.0.send(message)
}

pub(crate) fn notify_method(&self, method: String) -> crate::Result<()> {
Ok(self
.0
self.0
.send(lsp_server::Message::Notification(Notification::new(
method,
Value::Null,
)))?)
)))
}
}

Expand All @@ -82,15 +79,15 @@ impl Responder {
where
R: serde::Serialize,
{
Ok(self.0.send(
self.0.send(
match result {
Ok(res) => lsp_server::Response::new_ok(id, res),
Err(crate::server::api::Error { code, error }) => {
lsp_server::Response::new_err(id, code as i32, format!("{error}"))
}
}
.into(),
)?)
)
}
}

Expand Down
144 changes: 144 additions & 0 deletions crates/ruff_server/src/server/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use lsp_server as lsp;
use lsp_types::{notification::Notification, request::Request};
use std::sync::{Arc, Weak};

type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
type ConnectionReceiver = crossbeam::channel::Receiver<lsp::Message>;

/// A builder for `Connection` that handles LSP initialization.
pub(crate) struct ConnectionInitializer {
connection: lsp::Connection,
threads: lsp::IoThreads,
}

/// Handles inbound and outbound messages with the client.
pub(crate) struct Connection {
sender: Arc<ConnectionSender>,
receiver: ConnectionReceiver,
threads: lsp::IoThreads,
}

impl ConnectionInitializer {
/// Create a new LSP server connection over stdin/stdout.
pub(super) fn stdio() -> Self {
let (connection, threads) = lsp::Connection::stdio();
Self {
connection,
threads,
}
}

/// Starts the initialization process with the client by listening for an initialization request.
/// Returns a request ID that should be passed into `initialize_finish` later,
/// along with the initialization parameters that were provided.
pub(super) fn initialize_start(
&self,
) -> crate::Result<(lsp::RequestId, lsp_types::InitializeParams)> {
let (id, params) = self.connection.initialize_start()?;
Ok((id, serde_json::from_value(params)?))
}

/// Finishes the initialization process with the client,
/// returning an initialized `Connection`.
pub(super) fn initialize_finish(
self,
id: lsp::RequestId,
server_capabilities: &lsp_types::ServerCapabilities,
name: &str,
version: &str,
) -> crate::Result<Connection> {
self.connection.initialize_finish(
id,
serde_json::json!({
"capabilities": server_capabilities,
"serverInfo": {
"name": name,
"version": version
}
}),
)?;
let Self {
connection: lsp::Connection { sender, receiver },
threads,
} = self;
Ok(Connection {
sender: Arc::new(sender),
receiver,
threads,
})
}
}

impl Connection {
/// Make a new `ClientSender` for sending messages to the client.
pub(super) fn make_sender(&self) -> ClientSender {
ClientSender {
weak_sender: Arc::downgrade(&self.sender),
}
}

/// An iterator over incoming messages from the client.
pub(super) fn incoming(&self) -> crossbeam::channel::Iter<lsp::Message> {
self.receiver.iter()
}

/// Check and respond to any incoming shutdown requests; returns`true` if the server should be shutdown.
pub(super) fn handle_shutdown(&self, message: &lsp::Message) -> crate::Result<bool> {
match message {
lsp::Message::Request(lsp::Request { id, method, .. })
if method == lsp_types::request::Shutdown::METHOD =>
{
self.sender
.send(lsp::Response::new_ok(id.clone(), ()).into())?;
tracing::info!("Shutdown request received. Waiting for an exit notification...");
match self.receiver.recv_timeout(std::time::Duration::from_secs(30))? {
lsp::Message::Notification(lsp::Notification { method, .. }) if method == lsp_types::notification::Exit::METHOD => {
tracing::info!("Exit notification received. Server shutting down...");
Ok(true)
},
message => anyhow::bail!("Server received unexpected message {message:?} while waiting for exit notification")
}
}
lsp::Message::Notification(lsp::Notification { method, .. })
if method == lsp_types::notification::Exit::METHOD =>
{
tracing::error!("Server received an exit notification before a shutdown request was sent. Exiting...");
Ok(true)
}
_ => Ok(false),
}
}

/// Join the I/O threads that underpin this connection.
/// This is guaranteed to be nearly immediate since
/// we close the only active channels to these threads prior
/// to joining them.
pub(super) fn close(self) -> crate::Result<()> {
std::mem::drop(
Arc::into_inner(self.sender)
.expect("the client sender shouldn't have more than one strong reference"),
);
std::mem::drop(self.receiver);
self.threads.join()?;
Ok(())
}
}

/// A weak reference to an underlying sender channel, used for communication with the client.
/// If the `Connection` that created this `ClientSender` is dropped, any `send` calls will throw
/// an error.
#[derive(Clone, Debug)]
pub(crate) struct ClientSender {
weak_sender: Weak<ConnectionSender>,
}

// note: additional wrapper functions for senders may be implemented as needed.
impl ClientSender {
pub(crate) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
let Some(sender) = self.weak_sender.upgrade() else {
anyhow::bail!("The connection with the client has been closed");
};

Ok(sender.send(msg)?)
}
}
6 changes: 2 additions & 4 deletions crates/ruff_server/src/server/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::num::NonZeroUsize;

use crossbeam::channel::Sender;

use crate::session::Session;

mod task;
Expand All @@ -14,7 +12,7 @@ use self::{
thread::ThreadPriority,
};

use super::client::Client;
use super::{client::Client, ClientSender};

/// The event loop thread is actually a secondary thread that we spawn from the
/// _actual_ main thread. This secondary thread has a larger stack size
Expand Down Expand Up @@ -45,7 +43,7 @@ impl<'s> Scheduler<'s> {
pub(super) fn new(
session: &'s mut Session,
worker_threads: NonZeroUsize,
sender: &Sender<lsp_server::Message>,
sender: ClientSender,
) -> Self {
const FMT_THREADS: usize = 1;
Self {
Expand Down

0 comments on commit dfbeca5

Please sign in to comment.