Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Turborepo): Make package discovery async, and apply a debouncer #8058

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions crates/turborepo-filewatch/src/debouncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::{fmt::Debug, sync::Mutex, time::Duration};

use tokio::{select, sync, time::Instant};
use tracing::trace;

pub(crate) struct Debouncer {
bump: sync::Notify,
serial: Mutex<Option<usize>>,
timeout: Duration,
}

const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10);

impl Default for Debouncer {
fn default() -> Self {
Self::new(DEFAULT_DEBOUNCE_TIMEOUT)
}
}

impl Debug for Debouncer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let serial = { self.serial.lock().expect("lock is valid") };
f.debug_struct("Debouncer")
.field("is_pending", &serial.is_some())
.field("timeout", &self.timeout)
.finish()
}
}

impl Debouncer {
pub(crate) fn new(timeout: Duration) -> Self {
let bump = sync::Notify::new();
let serial = Mutex::new(Some(0));
Self {
bump,
serial,
timeout,
}
}

pub(crate) fn bump(&self) -> bool {
let mut serial = self.serial.lock().expect("lock is valid");
match *serial {
None => false,
Some(previous) => {
*serial = Some(previous + 1);
self.bump.notify_one();
true
}
}
}

pub(crate) async fn debounce(&self) {
let mut serial = {
self.serial
.lock()
.expect("lock is valid")
.expect("only this thread sets the value to None")
};
let mut deadline = Instant::now() + self.timeout;
loop {
let timeout = tokio::time::sleep_until(deadline);
select! {
_ = self.bump.notified() => {
trace!("debouncer notified");
// reset timeout
let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None");
if current_serial == serial {
// we timed out between the serial update and the notification.
// ignore this notification, we've already bumped the timer
continue;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
_ = timeout => {
// check if serial is still valid. It's possible a bump came in before the timeout,
// but we haven't been notified yet.
let mut current_serial_opt = self.serial.lock().expect("lock is valid");
let current_serial = current_serial_opt.expect("only this thread sets the value to None");
if current_serial == serial {
// if the serial is what we last observed, and the timer expired, we timed out.
// we're done. Mark that we won't accept any more bumps and return
*current_serial_opt = None;
return;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use std::{
sync::Arc,
time::{Duration, Instant},
};

use crate::debouncer::Debouncer;

#[tokio::test]
async fn test_debouncer() {
let debouncer = Arc::new(Debouncer::new(Duration::from_millis(10)));
let debouncer_copy = debouncer.clone();
let handle = tokio::task::spawn(async move {
debouncer_copy.debounce().await;
});
for _ in 0..10 {
// assert that we can continue bumping it past the original timeout
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(debouncer.bump());
}
let start = Instant::now();
handle.await.unwrap();
let end = Instant::now();
// give some wiggle room to account for race conditions, but assert that we
// didn't immediately complete after the last bump
assert!(end - start > Duration::from_millis(5));
// we shouldn't be able to bump it after it's run out it's timeout
assert!(!debouncer.bump());
}
}
124 changes: 11 additions & 113 deletions crates/turborepo-filewatch/src/hash_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Arc,
},
time::Duration,
};
Expand All @@ -12,15 +12,17 @@ use radix_trie::{Trie, TrieCommon};
use thiserror::Error;
use tokio::{
select,
sync::{self, broadcast, mpsc, oneshot, watch},
time::Instant,
sync::{broadcast, mpsc, oneshot, watch},
};
use tracing::{debug, trace};
use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf};
use turborepo_repository::discovery::DiscoveryResponse;
use turborepo_scm::{package_deps::GitHashes, Error as SCMError, SCM};

use crate::{globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, OptionalWatch};
use crate::{
debouncer::Debouncer, globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError,
OptionalWatch,
};

pub struct HashWatcher {
_exit_tx: oneshot::Sender<()>,
Expand Down Expand Up @@ -125,92 +127,11 @@ enum Query {
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct Version(usize);

struct HashDebouncer {
bump: sync::Notify,
serial: Mutex<Option<usize>>,
timeout: Duration,
}

const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10);

impl Default for HashDebouncer {
fn default() -> Self {
Self::new(DEFAULT_DEBOUNCE_TIMEOUT)
}
}

impl HashDebouncer {
fn new(timeout: Duration) -> Self {
let bump = sync::Notify::new();
let serial = Mutex::new(Some(0));
Self {
bump,
serial,
timeout,
}
}

fn bump(&self) -> bool {
let mut serial = self.serial.lock().expect("lock is valid");
match *serial {
None => false,
Some(previous) => {
*serial = Some(previous + 1);
self.bump.notify_one();
true
}
}
}

async fn debounce(&self) {
let mut serial = {
self.serial
.lock()
.expect("lock is valid")
.expect("only this thread sets the value to None")
};
let mut deadline = Instant::now() + self.timeout;
loop {
let timeout = tokio::time::sleep_until(deadline);
select! {
_ = self.bump.notified() => {
trace!("debouncer notified");
// reset timeout
let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None");
if current_serial == serial {
// we timed out between the serial update and the notification.
// ignore this notification, we've already bumped the timer
continue;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
_ = timeout => {
// check if serial is still valid. It's possible a bump came in before the timeout,
// but we haven't been notified yet.
let mut current_serial_opt = self.serial.lock().expect("lock is valid");
let current_serial = current_serial_opt.expect("only this thread sets the value to None");
if current_serial == serial {
// if the serial is what we last observed, and the timer expired, we timed out.
// we're done. Mark that we won't accept any more bumps and return
*current_serial_opt = None;
return;
} else {
serial = current_serial;
deadline = Instant::now() + self.timeout;
}
}
}
}
}
}

enum HashState {
Hashes(GitHashes),
Pending(
Version,
Arc<HashDebouncer>,
Arc<Debouncer>,
Vec<oneshot::Sender<Result<GitHashes, Error>>>,
),
Unavailable(String),
Expand Down Expand Up @@ -545,16 +466,16 @@ impl Subscriber {
spec: &HashSpec,
hash_update_tx: &mpsc::Sender<HashUpdate>,
immediate: bool,
) -> (Version, Arc<HashDebouncer>) {
) -> (Version, Arc<Debouncer>) {
let version = Version(self.next_version.fetch_add(1, Ordering::SeqCst));
let tx = hash_update_tx.clone();
let spec = spec.clone();
let repo_root = self.repo_root.clone();
let scm = self.scm.clone();
let debouncer = if immediate {
HashDebouncer::new(Duration::from_millis(0))
Debouncer::new(Duration::from_millis(0))
} else {
HashDebouncer::default()
Debouncer::default()
};
let debouncer = Arc::new(debouncer);
let debouncer_copy = debouncer.clone();
Expand Down Expand Up @@ -702,7 +623,6 @@ impl Subscriber {
mod tests {
use std::{
assert_matches::assert_matches,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -717,7 +637,7 @@ mod tests {
use crate::{
cookies::CookieWriter,
globwatcher::GlobSet,
hash_watcher::{HashDebouncer, HashSpec, HashWatcher},
hash_watcher::{HashSpec, HashWatcher},
package_watcher::PackageWatcher,
FileSystemWatcher,
};
Expand Down Expand Up @@ -1114,28 +1034,6 @@ mod tests {
assert!(result.is_empty());
}

#[tokio::test]
async fn test_debouncer() {
let debouncer = Arc::new(HashDebouncer::new(Duration::from_millis(10)));
let debouncer_copy = debouncer.clone();
let handle = tokio::task::spawn(async move {
debouncer_copy.debounce().await;
});
for _ in 0..10 {
// assert that we can continue bumping it past the original timeout
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(debouncer.bump());
}
let start = Instant::now();
handle.await.unwrap();
let end = Instant::now();
// give some wiggle room to account for race conditions, but assert that we
// didn't immediately complete after the last bump
assert!(end - start > Duration::from_millis(5));
// we shouldn't be able to bump it after it's run out it's timeout
assert!(!debouncer.bump());
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_basic_file_changes_with_inputs() {
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
};

pub mod cookies;
mod debouncer;
#[cfg(target_os = "macos")]
mod fsevent;
pub mod globwatcher;
Expand Down
Loading
Loading