Skip to content

Commit

Permalink
Create a TaskQueue that runs only after `GlobalState::process_changes…
Browse files Browse the repository at this point in the history
…` runs
  • Loading branch information
davidbarsky committed Nov 21, 2023
1 parent 437baf6 commit 448eafb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 27 deletions.
16 changes: 14 additions & 2 deletions crates/rust-analyzer/src/global_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
mem_docs::MemDocs,
op_queue::OpQueue,
reload,
task_pool::TaskPool,
task_pool::{TaskPool, TaskQueue},
};

// Enforces drop order
Expand Down Expand Up @@ -125,6 +125,11 @@ pub(crate) struct GlobalState {
pub(crate) fetch_proc_macros_queue: OpQueue<Vec<ProcMacroPaths>, bool>,
pub(crate) prime_caches_queue: OpQueue,

/// a deferred task queue. this should only be used if the enqueued Task
/// can only run *after* [`GlobalState::process_changes`] has been called.
pub(crate) task_queue: TaskQueue,
}

#[derive(Clone)]
pub(crate) struct Conn {
pub(crate) sender: Sender<lsp_server::Message>,
Expand Down Expand Up @@ -170,13 +175,18 @@ impl GlobalState {
Handle { handle, receiver }
};

let task_queue = {
let (sender, receiver) = unbounded();
TaskQueue { sender, receiver }
};

let mut analysis_host = AnalysisHost::new(config.lru_parse_query_capacity());
if let Some(capacities) = config.lru_query_capacities() {
analysis_host.update_lru_capacities(capacities);
}
let (flycheck_sender, flycheck_receiver) = unbounded();
let mut this = GlobalState {
conn: server,
conn,
task_pool,
fmt_pool,
loader,
Expand Down Expand Up @@ -214,6 +224,8 @@ impl GlobalState {
fetch_proc_macros_queue: OpQueue::default(),

prime_caches_queue: OpQueue::default(),

task_queue,
};
// Apply any required database inputs from the config.
this.update_configuration(config);
Expand Down
12 changes: 5 additions & 7 deletions crates/rust-analyzer/src/handlers/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use lsp_types::{
DidChangeWatchedFilesParams, DidChangeWorkspaceFoldersParams, DidCloseTextDocumentParams,
DidOpenTextDocumentParams, DidSaveTextDocumentParams, WorkDoneProgressCancelParams,
};
use stdx::thread::ThreadIntent;
use triomphe::Arc;
use vfs::{AbsPathBuf, ChangeKind, VfsPath};

Expand All @@ -18,7 +17,6 @@ use crate::{
global_state::GlobalState,
lsp::{from_proto, utils::apply_document_changes},
lsp_ext::RunFlycheckParams,
main_loop::Task,
mem_docs::DocumentData,
reload,
};
Expand Down Expand Up @@ -68,12 +66,12 @@ pub(crate) fn handle_did_open_text_document(
}

state.vfs.write().0.set_file_contents(path, Some(params.text_document.text.into_bytes()));

if state.config.notifications().unindexed_project {
state.task_pool.handle.spawn(ThreadIntent::Worker, || {
tracing::debug!("dispatching task");
Task::FileIndexState(params.text_document.uri)
});
tracing::debug!("queuing task");
let _ = state
.task_queue
.sender
.send(crate::main_loop::QueuedTask::CheckIfIndexed(params.text_document.uri));
}
}
Ok(())
Expand Down
58 changes: 40 additions & 18 deletions crates/rust-analyzer/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,21 @@ pub fn main_loop(config: Config, connection: Connection) -> anyhow::Result<()> {
enum Event {
Lsp(lsp_server::Message),
Task(Task),
QueuedTask(QueuedTask),
Vfs(vfs::loader::Message),
Flycheck(flycheck::Message),
}

#[derive(Debug)]
pub(crate) enum QueuedTask {
CheckIfIndexed(lsp_types::Url),
}

#[derive(Debug)]
pub(crate) enum Task {
Response(lsp_server::Response),
Retry(lsp_server::Request),
Diagnostics(Vec<(FileId, Vec<lsp_types::Diagnostic>)>),
FileIndexState(lsp_types::Url),
PrimeCaches(PrimeCachesProgress),
FetchWorkspace(ProjectWorkspaceProgress),
FetchBuildData(BuildDataProgress),
Expand Down Expand Up @@ -106,6 +111,7 @@ impl fmt::Debug for Event {
match self {
Event::Lsp(it) => fmt::Debug::fmt(it, f),
Event::Task(it) => fmt::Debug::fmt(it, f),
Event::QueuedTask(it) => fmt::Debug::fmt(it, f),
Event::Vfs(it) => fmt::Debug::fmt(it, f),
Event::Flycheck(it) => fmt::Debug::fmt(it, f),
}
Expand Down Expand Up @@ -184,6 +190,9 @@ impl GlobalState {
recv(self.task_pool.receiver) -> task =>
Some(Event::Task(task.unwrap())),

recv(self.task_queue.receiver) -> task =>
Some(Event::QueuedTask(task.unwrap())),

recv(self.fmt_pool.receiver) -> task =>
Some(Event::Task(task.unwrap())),

Expand Down Expand Up @@ -216,6 +225,10 @@ impl GlobalState {
lsp_server::Message::Notification(not) => self.on_notification(not)?,
lsp_server::Message::Response(resp) => self.complete_request(resp),
},
Event::QueuedTask(task) => {
let _p = profile::span("GlobalState::handle_event/deferred_task");
self.handle_deferred_task(task);
}
Event::Task(task) => {
let _p = profile::span("GlobalState::handle_event/task");
let mut prime_caches_progress = Vec::new();
Expand Down Expand Up @@ -488,23 +501,6 @@ impl GlobalState {
// Only retry requests that haven't been cancelled. Otherwise we do unnecessary work.
Task::Retry(req) if !self.is_completed(&req) => self.on_request(req),
Task::Retry(_) => (),
Task::FileIndexState(uri) => {
let _p = profile::span("run_unindexed_project");
let snap = self.snapshot();
let id = from_proto::file_id(&snap, &uri).expect("unable to get FileId");
if let Ok(crates) = &snap.analysis.crates_for(id) {
if crates.is_empty() {
tracing::debug!(?uri, "rust-analyzer does not track this file");
self.send_notification::<ext::UnindexedProject>(
ext::UnindexedProjectParams {
text_documents: vec![lsp_types::TextDocumentIdentifier { uri }],
},
);
} else {
tracing::warn!("was unable to get analysis for crate")
}
}
}
Task::Diagnostics(diagnostics_per_file) => {
for (file_id, diagnostics) in diagnostics_per_file {
self.diagnostics.set_native_diagnostics(file_id, diagnostics)
Expand Down Expand Up @@ -626,6 +622,32 @@ impl GlobalState {
}
}

fn handle_deferred_task(&mut self, task: QueuedTask) {
match task {
QueuedTask::CheckIfIndexed(uri) => {
let snap = self.snapshot();
let conn = self.conn.clone();

self.task_pool.handle.spawn_with_sender(ThreadIntent::Worker, move |_| {
tracing::debug!(?uri, "handling uri");
let id = from_proto::file_id(&snap, &uri).expect("unable to get FileId");
if let Ok(crates) = &snap.analysis.crates_for(id) {
if crates.is_empty() {
conn.send_notification::<ext::UnindexedProject>(
ext::UnindexedProjectParams {
text_documents: vec![lsp_types::TextDocumentIdentifier { uri }],
},
);
tracing::debug!("sent notification");
} else {
tracing::debug!(?uri, "is indexed");
}
}
});
}
}
}

fn handle_flycheck_msg(&mut self, message: flycheck::Message) {
match message {
flycheck::Message::AddDiagnostic { id, workspace_root, diagnostic } => {
Expand Down
11 changes: 11 additions & 0 deletions crates/rust-analyzer/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use crossbeam_channel::Sender;
use stdx::thread::{Pool, ThreadIntent};

use crate::main_loop::QueuedTask;

pub(crate) struct TaskPool<T> {
sender: Sender<T>,
pool: Pool,
Expand Down Expand Up @@ -40,3 +42,12 @@ impl<T> TaskPool<T> {
self.pool.len()
}
}

/// `TaskQueue`, like its name suggests, queues tasks.
///
/// This should only be used used if a task must run after [`GlobalState::process_changes`]
/// has been called.
pub(crate) struct TaskQueue {
pub(crate) sender: crossbeam_channel::Sender<QueuedTask>,
pub(crate) receiver: crossbeam_channel::Receiver<QueuedTask>,
}

0 comments on commit 448eafb

Please sign in to comment.