Skip to content
This repository has been archived by the owner on Dec 29, 2022. It is now read-only.

Commit

Permalink
Implement asynchronous message reading
Browse files Browse the repository at this point in the history
  • Loading branch information
Xanewok committed Jan 4, 2019
1 parent 4f0bed3 commit 96085ac
Showing 1 changed file with 112 additions and 84 deletions.
196 changes: 112 additions & 84 deletions tests/support/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
//! Tokio-based LSP client. The tokio `current_thread::Runtime` allows for a
//! cheap, single-threaded blocking until a certain message is received from the
//! server. It also allows enforcing timeouts, which are necessary for testing.
//!
//! More concretely, we couple spawned RLS handle with the Tokio runtime on
//! current thread. A message reader `Stream<Item = Value, ...>` future is
//! spawned on the runtime, which allows us to queue channel senders which can
//! be notified when resolving the reader future. On each message reception we
//! check if a channel sender was registered with an associated predicate being
//! true for the message received, and if so we send the message, notifying the
//! receiver (thus, implementing the Future<Item = Value> model).
use std::cell::{Ref, RefCell};
use std::process::{Command, Stdio};
use std::time::Duration;
use std::rc::Rc;

use futures::sink::Sink;
use futures::stream::Stream;
use futures::unsync::oneshot;
use futures::Future;
use lsp_codec::{LspDecoder, LspEncoder};
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::codec::{FramedRead, FramedWrite};
use tokio::runtime::current_thread::Runtime;
use tokio_process::{Child, ChildStdin, ChildStdout, CommandExt};
use tokio_timer::Timeout;

use serde::Deserialize;
use tokio::util::FutureExt;
use tokio_process::{Child, ChildStdin, CommandExt};

use super::project_builder::Project;
use super::rls_exe;
use super::{rls_exe, rls_timeout};

// `Rc` because we share those in message reader stream and the RlsHandle.
// `RefCell` because borrows don't overlap. This is safe, because `process_msg`
// is only called (synchronously) when we execute some work on the runtime,
// however we only call `Runtime::block_on` and whenever we do it, there are no
// active borrows in scope.
type Messages = Rc<RefCell<Vec<Value>>>;
type Channels = Rc<RefCell<Vec<(Box<Fn(&Value) -> bool>, oneshot::Sender<Value>)>>>;

impl Project {
pub fn spawn_rls_async(&self) -> RlsHandle {
Expand All @@ -31,52 +49,93 @@ impl Project {
let stdin = child.stdin().take().unwrap();
let stdout = child.stdout().take().unwrap();

let reader = Some(FramedRead::new(
std::io::BufReader::new(stdout),
LspDecoder::default(),
));
let msgs = Messages::default();
let chans = Channels::default();

let reader = FramedRead::new(std::io::BufReader::new(stdout), LspDecoder::default())
.map_err(|_| ())
.for_each({
let msgs = Rc::clone(&msgs);
let chans = Rc::clone(&chans);
move |msg| process_msg(msg, msgs.clone(), chans.clone())
})
.timeout(rls_timeout());

let writer = Some(FramedWrite::new(stdin, LspEncoder));

let mut rt = Runtime::new().unwrap();
rt.spawn(reader.map_err(|_| ()));

RlsHandle {
reader,
writer,
child,
runtime: Runtime::new().unwrap(),
messages: Vec::new(),
runtime: rt,
messages: msgs,
channels: chans,
}
}
}

fn process_msg(msg: Value, msgs: Messages, chans: Channels) -> Result<(), ()> {
eprintln!("Processing message: {:?}", msg);

let mut chans = chans.borrow_mut();

if chans.len() > 0 {
let mut idx = (chans.len() - 1) as isize;

// Poor man's drain_filter. Iterates over entire collection starting
// from the end, takes ownership over the element and the predicate is
// true, then we consume the value; otherwise, we push it to the back,
// effectively undoing swap_remove (post-swap). This is correct, because
// on every iteration we decrease idx by 1, so we won't loop and we will
// check every element.
while idx >= 0 {
let (pred, tx) = chans.swap_remove(idx as usize);
if pred(&msg) {
tx.send(msg.clone()).map_err(|_| ())?;
} else {
chans.push((pred, tx));
}

idx -= 1;
}

debug_assert!(chans.iter().all(|(pred, _)| !pred(&msg)));
}

msgs.borrow_mut().push(msg);

Ok(())
}

/// Holds the handle to a spawned RLS child process and allows to send and
/// receive messages to and from the process.
pub struct RlsHandle {
/// Asynchronous LSP reader for the spawned process
reader: Option<FramedRead<std::io::BufReader<ChildStdout>, LspDecoder>>,
/// Asynchronous LSP writer for the spawned process
/// Asynchronous LSP writer for the spawned process.
writer: Option<FramedWrite<ChildStdin, LspEncoder>>,
/// Handle to the spawned child
/// Handle to the spawned child.
child: Child,
/// Tokio single-thread runtime required for interaction with async-based
/// `reader` and `writer`
/// Tokio single-thread runtime onto which LSP message reading stream has
/// been spawned. Allows to synchronously write messages via `writer` and
/// block on received messages matching an enqueued predicate in `channels`.
runtime: Runtime,
/// LSP Messages received from the stream and processed
messages: Vec<Value>,
/// Handle to all of the received LSP messages.
messages: Messages,
/// Handle to enqueued channel senders, used to notify when a given message
/// has been received.
channels: Channels,
}

impl RlsHandle {
/// Returns messages received until the moment of the call.
pub fn messages(&self) -> &[Value] {
&self.messages
}

// TODO: Notify on every message received?
fn receive_message(&mut self, msg: Value) {
eprintln!("Received: {:?}", msg);

self.messages.push(msg);
pub fn messages(&self) -> Ref<Vec<Value>> {
self.messages.borrow()
}

// TODO:
/// Send a request to the RLS and block until we receive the message.
/// Note that between sending and receiving the response *another* messages
/// can be received.
pub fn request<R>(&mut self, id: u64, params: R::Params) -> R::Result
where
R: rls::lsp_data::LSPRequest,
Expand All @@ -90,12 +149,13 @@ impl RlsHandle {
"params": params,
}));

let msg = self.wait_for_message(|val| val["id"] == id && val.get("result").is_some());
let msg = &msg["result"];
let msg = self.wait_for_message(move |val| val["id"] == id && val.get("result").is_some());

R::Result::deserialize(msg).unwrap_or_else(|_| panic!("Can't deserialize results: {:?}", msg))
R::Result::deserialize(&msg["result"])
.unwrap_or_else(|_| panic!("Can't deserialize results: {:?}", msg))
}

/// Synchronously sends a notification to the RLS.
pub fn notify<R>(&mut self, params: R::Params)
where
R: rls::lsp_data::LSPNotification,
Expand All @@ -110,56 +170,32 @@ impl RlsHandle {

/// Synchronously sends a message to the RLS.
pub fn send(&mut self, msg: Value) {
eprintln!("Sending: {:?}", msg);

let writer = self.writer.take().unwrap();

eprintln!("Sending: {:?}", msg);
let fut = writer.send(msg);

self.writer = Some(self.runtime.block_on(fut).unwrap());
eprintln!("Finished Sending");
}

/// Consumes messages in blocking manner until `f` predicate returns true
/// for a received message from the stream, additionally including the first
/// message for which the predicate returned false.
pub fn take_messages_until_inclusive(&mut self, f: impl Fn(&Value) -> bool) -> &[Value] {
// let stream = self.reader.by_ref();
let old_msg_len = self.messages.len();

// Fugly workaround to synchronously take items from stream *including*
// the one for which `f` returns false.
// Straightforward implementation of using `by_ref` and then doing
// `take_while(|x| Ok(!f(x))).collect()` doesn't work since it seems
// that the last element for which `f` is false is consumed from the
// inner stream and there's no way to retrieve it afterwards.
loop {
let reader = self.reader.take().unwrap();

match self.runtime.block_on(reader.into_future()) {
Ok((item, stream)) => {
if let Some(item) = item {
self.receive_message(item);
}

self.reader = Some(stream);
},
Err(..) => panic!("Can't read LSP message from stream"),
}
/// Enqueues a channel that is notified and consumed when a given predicate
/// `f` is true for a received message.
fn future_msg(
&mut self,
f: impl Fn(&Value) -> bool + 'static,
) -> impl Future<Item = Value, Error = oneshot::Canceled> {
let (tx, rx) = oneshot::channel();

let last = self.messages.last().unwrap();
// *Do* include the last message for which `f` was false.
if f(last) {
break;
}
}
self.channels.borrow_mut().push((Box::new(f), tx));

&self.messages[old_msg_len..]
rx
}

pub fn wait_for_message(&mut self, f: impl Fn(&Value) -> bool) -> &Value {
self.take_messages_until_inclusive(f);

self.messages.last().unwrap()
/// Blocks until a message, for which predicate `f` returns true, is received.
pub fn wait_for_message(&mut self, f: impl Fn(&Value) -> bool + 'static) -> Value {
let fut = self.future_msg(f);
self.runtime.block_on(fut).unwrap()
}

/// Blocks until the processing (building + indexing) is done by the RLS.
Expand All @@ -169,22 +205,14 @@ impl RlsHandle {
});
}

/// Blocks until RLS responds with a message with a given `id`.
pub fn wait_for_id(&mut self, id: u64) {
self.wait_for_message(|msg| msg["id"] == id);
}

/// Requests the RLS to shut down and waits (with a timeout) until the child
/// process is terminated.
pub fn shutdown(mut self) {
self.request::<languageserver_types::request::Shutdown>(99999, ());
self.notify::<languageserver_types::notification::Exit>(());

let rt = &mut self.runtime;

let fut = self.child.wait_with_output();
let fut = Timeout::new(fut, Duration::from_secs(15));
let fut = self.child.wait_with_output().timeout(rls_timeout());

rt.block_on(fut).unwrap();
self.runtime.block_on(fut).unwrap();
}
}

0 comments on commit 96085ac

Please sign in to comment.