Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify refresh and publication of diagnostics #360

Merged
merged 7 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 20 additions & 36 deletions crates/ark/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ use regex::Regex;
use serde_json::json;
use stdext::result::ResultOrLog;
use stdext::*;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::UnboundedSender as TokioUnboundedSender;

use crate::dap::dap::DapBackendEvent;
use crate::dap::dap_r_main::RMainDap;
Expand All @@ -101,9 +99,9 @@ use crate::errors;
use crate::help::message::HelpReply;
use crate::help::message::HelpRequest;
use crate::kernel::Kernel;
use crate::lsp::backend::Backend;
use crate::lsp::backend::ConsoleInputs;
use crate::lsp::backend::LspEvent;
use crate::lsp::backend::TokioUnboundedSender;
use crate::lsp::events::EVENTS;
use crate::modules;
use crate::plots::graphics_device;
Expand Down Expand Up @@ -143,7 +141,6 @@ pub fn start_r(
stdin_reply_rx: Receiver<amalthea::Result<InputReply>>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
lsp_runtime: Arc<Runtime>,
dap: Arc<Mutex<Dap>>,
) {
// Initialize global state (ensure we only do this once!)
Expand All @@ -164,7 +161,6 @@ pub fn start_r(
stdin_reply_rx,
iopub_tx,
kernel_init_tx,
lsp_runtime,
dap,
));
});
Expand Down Expand Up @@ -269,13 +265,6 @@ pub struct RMain {
pub help_tx: Option<Sender<HelpRequest>>,
pub help_rx: Option<Receiver<HelpReply>>,

// LSP tokio runtime used to spawn LSP tasks on the executor and the
// corresponding backend used to send LSP requests to the frontend.
// The backend is initialized on LSP start up, and is refreshed after a
// frontend reconnect.
lsp_runtime: Arc<Runtime>,
lsp_backend: Option<Backend>,

/// Event channel for notifying the LSP. In principle, could be a Jupyter comm.
lsp_events_tx: Option<TokioUnboundedSender<LspEvent>>,

Expand Down Expand Up @@ -360,7 +349,6 @@ impl RMain {
stdin_reply_rx: Receiver<amalthea::Result<InputReply>>,
iopub_tx: Sender<IOPubMessage>,
kernel_init_tx: Bus<KernelInfo>,
lsp_runtime: Arc<Runtime>,
dap: Arc<Mutex<Dap>>,
) -> Self {
Self {
Expand All @@ -382,8 +370,6 @@ impl RMain {
error_traceback: Vec::new(),
help_tx: None,
help_rx: None,
lsp_runtime,
lsp_backend: None,
lsp_events_tx: None,
dap: RMainDap::new(dap),
is_busy: false,
Expand Down Expand Up @@ -591,12 +577,7 @@ impl RMain {
// here, but only containing high-level information such as `search()`
// contents and `ls(rho)`.
if !info.browser && !info.incomplete && !info.input_request {
match console_inputs() {
Ok(inputs) => {
self.send_lsp(LspEvent::DidChangeConsoleInputs(inputs));
},
Err(err) => log::error!("Can't retrieve console inputs: {err:?}"),
}
self.refresh_lsp();
}

// Signal prompt
Expand Down Expand Up @@ -1073,27 +1054,30 @@ impl RMain {
&self.kernel
}

pub fn get_lsp_runtime(&self) -> &Arc<Runtime> {
&self.lsp_runtime
}

pub fn get_lsp_backend(&self) -> Option<&Backend> {
self.lsp_backend.as_ref()
}

fn send_lsp(&self, event: LspEvent) {
if let Some(ref tx) = self.lsp_events_tx {
tx.send(event).unwrap();
}
}

pub fn set_lsp_backend(
&mut self,
backend: Backend,
lsp_events_tx: TokioUnboundedSender<LspEvent>,
) {
self.lsp_backend = Some(backend);
self.lsp_events_tx = Some(lsp_events_tx);
pub fn set_lsp_channel(&mut self, lsp_events_tx: TokioUnboundedSender<LspEvent>) {
self.lsp_events_tx = Some(lsp_events_tx.clone());

// Refresh LSP state now since we probably have missed some updates
// while the channel was offline. This is currently not an ideal timing
// as the channel is set up from a preemptive `r_task()` after the LSP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. that r_task() could be happening at interrupt time, which is possibly bad with respect to looking at the envs on the search path in console_inputs() (among other things)?

I would also be in favor of making it an idle task to run once any of the user's startup code has stopped running and we get back into read_console()

// is set up. We'll want to do this in an idle task.
self.refresh_lsp();
}

pub fn refresh_lsp(&self) {
match console_inputs() {
Ok(inputs) => {
self.send_lsp(LspEvent::DidChangeConsoleInputs(inputs));
},
Err(err) => log::error!("Can't retrieve console inputs: {err:?}"),
}
self.send_lsp(LspEvent::RefreshAllDiagnostics());
}

pub fn call_frontend_method(&self, request: UiFrontendRequest) -> anyhow::Result<RObject> {
Expand Down
120 changes: 98 additions & 22 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use tower_lsp::LspService;
use tower_lsp::Server;
use tree_sitter::Point;

pub(crate) type TokioUnboundedSender<T> = tokio::sync::mpsc::UnboundedSender<T>;

use crate::interface::RMain;
use crate::lsp::completions::provide_completions;
use crate::lsp::completions::resolve_completion;
Expand All @@ -42,7 +44,6 @@ use crate::lsp::encoding::get_position_encoding_kind;
use crate::lsp::help_topic;
use crate::lsp::hover::hover;
use crate::lsp::indexer;
use crate::lsp::indexer::IndexerStateManager;
use crate::lsp::selection_range::convert_selection_range_from_tree_sitter_to_lsp;
use crate::lsp::selection_range::selection_range;
use crate::lsp::signature_help::signature_help;
Expand All @@ -65,6 +66,9 @@ macro_rules! backend_trace {
#[derive(Debug)]
pub enum LspEvent {
DidChangeConsoleInputs(ConsoleInputs),
RefreshDiagnostics(Url, Document, WorldState),
RefreshAllDiagnostics(),
PublishDiagnostics(Url, Vec<Diagnostic>, Option<i32>),
}

#[derive(Clone, Debug)]
Expand All @@ -75,8 +79,8 @@ pub struct Backend {
/// Global world state containing all inputs for LSP analysis.
pub state: WorldState,

/// Synchronisation util for indexer.
pub indexer_state_manager: IndexerStateManager,
/// Channel for communication with the LSP.
events_tx: TokioUnboundedSender<LspEvent>,
}

/// Information sent from the kernel to the LSP after each top-level evaluation.
Expand Down Expand Up @@ -120,6 +124,54 @@ impl Backend {

return callback(document.value());
}

fn did_change_console_inputs(&self, inputs: ConsoleInputs) {
*self.state.console_scopes.lock() = inputs.console_scopes;
*self.state.installed_packages.lock() = inputs.installed_packages;
}

fn refresh_diagnostics(&self, url: Url, document: Document, state: WorldState) {
tokio::task::spawn_blocking({
let events_tx = self.events_tx.clone();

move || {
let version = document.version.clone();
let diagnostics = diagnostics::generate_diagnostics(document, state);
events_tx.send(LspEvent::PublishDiagnostics(url, diagnostics, version))
}
});
}

fn refresh_all_diagnostics(&self) {
for doc_ref in self.state.documents.iter() {
tokio::task::spawn_blocking({
let url = doc_ref.key().clone();
let document = doc_ref.value().clone();
let version = document.version.clone();

let state = self.state.clone();
let events_tx = self.events_tx.clone();

move || {
let diagnostics = diagnostics::generate_diagnostics(document, state);
events_tx
.send(LspEvent::PublishDiagnostics(url, diagnostics, version))
.unwrap();
}
});
}
}

async fn publish_diagnostics(
&self,
uri: Url,
diagnostics: Vec<Diagnostic>,
version: Option<i32>,
) {
self.client
.publish_diagnostics(uri, diagnostics, version)
.await
}
}

#[tower_lsp::async_trait]
Expand All @@ -143,8 +195,10 @@ impl LanguageServer for Backend {
}
}

// start indexing
indexer::start(folders, self.indexer_state_manager.clone());
// Start indexing
tokio::task::spawn_blocking(|| {
indexer::start(folders);
});
Comment on lines +198 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a little unsure of how you can guarantee that the initial round of indexing has finished before generating the first set of diagnostics anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I have convinced myself that we still need something here. Maybe when the indexer finishes it should kick off an "update all diagnostics" event?

I opened dplyr to bind-rows.R and immediately saw

Screenshot 2024-05-28 at 4 37 39 PM

But note that dataframe_ish is a function that is defined in that file, it is at the bottom. Previously the diagnostics would wait until the indexer finished the initial round before generating diagnostics.

I ran 1 in the Console to retrigger diagnostics and then poof the squiggle under dataframe_ish() disappears because by that time the indexer was finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right!

On a related note, it seems the did_open() handler should trigger an index update? It doesn't on main either, but I'm pretty sure it should? We're indexing files in the workspace already but the newly opened file might be outside the workspace.

And did_close() should clear the index for that file? But only if not in the workspace?

Also deleting a file from the FS doesn't remove it from our index. And updating it from outside positron (e.g. by changing branch) does not update it either (we don't receive did_change notifications from unmanaged files in the workspace - i.e. we only receive these notifs for files for which we got a did_open).

And of course there's the issue that diagnostics of a file are influenced by completely unrelated files. And in an inconsistent way as only defined functions are in scope, not other symbols.

ok it's a giant mess, but we don't need to solve all that right now. For now how about firing diagnostics for a file after it has finished indexing? This won't take into account functions in other files, but we don't handle that particularly well anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now how about firing diagnostics for a file after it has finished indexing

Seems good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix that in the next PR to avoid merge conflicts


Ok(InitializeResult {
server_info: Some(ServerInfo {
Expand Down Expand Up @@ -274,11 +328,17 @@ impl LanguageServer for Backend {
let uri = params.text_document.uri;
let version = params.text_document.version;

self.state
.documents
.insert(uri.clone(), Document::new(contents, Some(version)));
let document = Document::new(contents, Some(version));

self.state.documents.insert(uri.clone(), document.clone());

diagnostics::refresh_diagnostics(self.clone(), uri.clone(), Some(version));
self.events_tx
.send(LspEvent::RefreshDiagnostics(
uri,
document,
self.state.clone(),
Comment on lines +338 to +339
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once state is a pure value, I am imagining that it might be better to have RefreshDiagnostics just take url and state, since the snapshot of the state at that time will hold the document we care about already.

Might let us avoid an extra document.clone() and seems simpler

))
.unwrap();
}

async fn did_change(&self, params: DidChangeTextDocumentParams) {
Expand Down Expand Up @@ -309,11 +369,17 @@ impl LanguageServer for Backend {
}
}

// publish diagnostics - but only publish them if the version of
// Publish diagnostics - but only publish them if the version of
// the document now matches the version of the change after applying
// it in `on_did_change()` (i.e. no changes left in the out of order queue)
if params.text_document.version == version {
diagnostics::refresh_diagnostics(self.clone(), uri.clone(), Some(version));
self.events_tx
.send(LspEvent::RefreshDiagnostics(
uri.clone(),
doc.clone(),
self.state.clone(),
))
.unwrap();
}
}

Expand All @@ -326,7 +392,10 @@ impl LanguageServer for Backend {

let uri = params.text_document.uri;

diagnostics::clear_diagnostics(self.clone(), uri.clone(), None);
// Publish empty set of diagnostics to clear them
self.client
.publish_diagnostics(uri.clone(), Vec::new(), None)
.await;

match self.state.documents.remove(&uri) {
Some(_) => {
Expand Down Expand Up @@ -584,33 +653,40 @@ pub fn start_lsp(runtime: Arc<Runtime>, address: String, conn_init_tx: Sender<bo
let (read, write) = (read.compat(), write.compat_write());

let init = |client: Client| {
let (events_tx, mut events_rx) = tokio_unbounded_channel::<LspEvent>();

// Create backend.
// Note that DashMap uses synchronization primitives internally, so we
// don't guard access to the map via a mutex.
let backend = Backend {
client,
events_tx: events_tx.clone(),
state: WorldState {
documents: Arc::new(DashMap::new()),
workspace: Arc::new(Mutex::new(Workspace::default())),
console_scopes: Arc::new(Mutex::new(vec![])),
installed_packages: Arc::new(Mutex::new(vec![])),
},
indexer_state_manager: IndexerStateManager::new(),
};

let (events_tx, mut events_rx) = tokio_unbounded_channel::<LspEvent>();

// LSP event loop. To be integrated in our synchronising dispatcher
// once implemented.
// Watcher task for LSP events. To be integrated in our
// synchronising dispatcher once implemented.
tokio::spawn({
let backend = backend.clone();
async move {
loop {
match events_rx.recv().await.unwrap() {
LspEvent::DidChangeConsoleInputs(inputs) => {
*backend.state.console_scopes.lock() = inputs.console_scopes;
*backend.state.installed_packages.lock() =
inputs.installed_packages;
backend.did_change_console_inputs(inputs);
},
LspEvent::RefreshDiagnostics(url, document, state) => {
backend.refresh_diagnostics(url, document, state);
},
LspEvent::RefreshAllDiagnostics() => {
backend.refresh_all_diagnostics();
},
LspEvent::PublishDiagnostics(uri, diagnostics, version) => {
backend.publish_diagnostics(uri, diagnostics, version).await;
},
}
}
Expand All @@ -624,10 +700,10 @@ pub fn start_lsp(runtime: Arc<Runtime>, address: String, conn_init_tx: Sender<bo
// `kernel_init_rx`. Even if it isn't, this should be okay because
// `r_task()` defensively blocks until its sender is initialized.
r_task({
let backend = backend.clone();
let events_tx = events_tx.clone();
move || {
let main = RMain::get_mut();
main.set_lsp_backend(backend, events_tx);
main.set_lsp_channel(events_tx);
}
});

Expand Down
Loading
Loading