Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(watch): simplify watch data synchronization #9154

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{cell::RefCell, collections::HashSet, sync::Arc};
use std::{
collections::HashSet,
ops::DerefMut as _,
sync::{Arc, Mutex},
};

use futures::StreamExt;
use miette::{Diagnostic, SourceSpan};
use thiserror::Error;
use tokio::{
select,
sync::{Mutex, Notify},
task::JoinHandle,
};
use tokio::{select, sync::Notify, task::JoinHandle};
use tracing::{instrument, trace};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;
Expand Down Expand Up @@ -172,14 +172,14 @@ impl WatchClient {
// We explicitly use a tokio::sync::Mutex here to avoid deadlocks.
// If we used a std::sync::Mutex, we could deadlock by spinning the lock
// and not yielding back to the tokio runtime.
let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default()));
let changed_packages = Mutex::new(ChangedPackages::default());
let notify_run = Arc::new(Notify::new());
let notify_event = notify_run.clone();

let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Self::handle_change_event(&changed_packages, event.event.unwrap()).await?;
Self::handle_change_event(&changed_packages, event.event.unwrap())?;
notify_event.notify_one();
}

Expand All @@ -189,9 +189,13 @@ impl WatchClient {
let run_fut = async {
loop {
notify_run.notified().await;
let changed_packages_guard = changed_packages.lock().await;
if !changed_packages_guard.borrow().is_empty() {
let changed_packages = changed_packages_guard.take();
let some_changed_packages = {
let mut changed_packages_guard =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only acquire the guard in the block so we ensure we release the lock once we take the changed packages. This lets the event_fut make progress while self.execute_run is pending.

changed_packages.lock().expect("poisoned lock");
(!changed_packages_guard.is_empty())
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};
if let Some(changed_packages) = some_changed_packages {
self.execute_run(changed_packages).await?;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed_packages_guard is still held while run is being executed, this prevents any events from being handled.

}
Expand All @@ -214,8 +218,8 @@ impl WatchClient {
}

#[instrument(skip(changed_packages))]
async fn handle_change_event(
changed_packages: &Mutex<RefCell<ChangedPackages>>,
fn handle_change_event(
changed_packages: &Mutex<ChangedPackages>,
event: proto::package_change_event::Event,
) -> Result<(), Error> {
// Should we recover here?
Expand All @@ -225,7 +229,7 @@ impl WatchClient {
}) => {
let package_name = PackageName::from(package_name);

match changed_packages.lock().await.get_mut() {
match changed_packages.lock().expect("poisoned lock").deref_mut() {
ChangedPackages::All => {
// If we've already changed all packages, ignore
}
Expand All @@ -235,7 +239,7 @@ impl WatchClient {
}
}
proto::package_change_event::Event::RediscoverPackages(_) => {
*changed_packages.lock().await.get_mut() = ChangedPackages::All;
*changed_packages.lock().expect("poisoned lock") = ChangedPackages::All;
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
return Err(DaemonError::Unavailable(message).into());
Expand Down
Loading