-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
- Loading branch information
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,341 @@ | ||
use std::{ | ||
collections::HashMap, | ||
fs::OpenOptions, | ||
path::PathBuf, | ||
sync::{atomic::AtomicUsize, Arc}, | ||
time::Duration, | ||
}; | ||
|
||
use notify::{Event, EventKind}; | ||
use thiserror::Error; | ||
use tokio::{ | ||
sync::{broadcast, oneshot, Mutex}, | ||
time::error::Elapsed, | ||
}; | ||
use tracing::debug; | ||
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation}; | ||
|
||
use crate::NotifyError; | ||
|
||
#[derive(Clone, Debug, Error)] | ||
pub enum WatchError { | ||
#[error(transparent)] | ||
RecvError(#[from] broadcast::error::RecvError), | ||
#[error("filewatching encountered errors: {0}")] | ||
NotifyError(#[from] NotifyError), | ||
#[error("filewatching has closed, cannot watch cookies")] | ||
Closed, | ||
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
} | ||
|
||
#[derive(Debug, Error)] | ||
pub enum CookieError { | ||
#[error(transparent)] | ||
Watch(#[from] WatchError), | ||
#[error("cookie timeout expired")] | ||
Timeout(#[from] Elapsed), | ||
#[error("failed to receiver cookie notification: {0}")] | ||
RecvError(#[from] oneshot::error::RecvError), | ||
#[error("failed to write cookie file {0}")] | ||
IO(#[from] std::io::Error), | ||
} | ||
|
||
type CookieResponse = Result<(), WatchError>; | ||
|
||
pub struct CookieJar { | ||
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
root: AbsoluteSystemPathBuf, | ||
serial: AtomicUsize, | ||
timeout: Duration, | ||
watches: Arc<Mutex<Watches>>, | ||
// _exit_ch exists to trigger a close on the receiver when an instance | ||
// of this struct is dropped. The task that is receiving events will exit, | ||
// dropping the other sender for the broadcast channel, causing all receivers | ||
// to be notified of a close. | ||
_exit_ch: tokio::sync::oneshot::Sender<()>, | ||
} | ||
|
||
#[derive(Default)] | ||
struct Watches { | ||
closed: bool, | ||
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
cookies: HashMap<PathBuf, oneshot::Sender<CookieResponse>>, | ||
} | ||
|
||
impl CookieJar { | ||
pub fn new( | ||
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
root: &AbsoluteSystemPath, | ||
timeout: Duration, | ||
file_events: broadcast::Receiver<Result<Event, NotifyError>>, | ||
) -> Self { | ||
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel(); | ||
let watches = Arc::new(Mutex::new(Watches::default())); | ||
tokio::spawn(watch_cookies( | ||
root.to_owned(), | ||
watches.clone(), | ||
file_events, | ||
exit_signal, | ||
)); | ||
Self { | ||
root: root.to_owned(), | ||
serial: AtomicUsize::new(0), | ||
timeout, | ||
watches, | ||
_exit_ch: exit_ch, | ||
} | ||
} | ||
|
||
pub async fn wait_for_cookie(&self) -> Result<(), CookieError> { | ||
let serial = self | ||
.serial | ||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
let cookie_path = self.root.join_component(&format!("{}.cookie", serial)); | ||
let (tx, rx) = oneshot::channel(); | ||
{ | ||
let mut watches = self.watches.lock().await; | ||
if watches.closed { | ||
return Err(CookieError::Watch(WatchError::Closed)); | ||
} | ||
watches | ||
.cookies | ||
.insert(cookie_path.as_std_path().to_owned(), tx); | ||
} | ||
let mut opts = OpenOptions::new(); | ||
opts.truncate(true).create(true).write(true); | ||
{ | ||
// dropping the resulting file closes the handle | ||
_ = cookie_path.open_with_options(opts)?; | ||
} | ||
let resp = tokio::time::timeout(self.timeout, rx).await?; | ||
unwrap_resp(resp) | ||
} | ||
} | ||
|
||
async fn watch_cookies( | ||
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 111 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
root: AbsoluteSystemPathBuf, | ||
watches: Arc<Mutex<Watches>>, | ||
mut file_events: broadcast::Receiver<Result<Event, NotifyError>>, | ||
mut exit_signal: tokio::sync::oneshot::Receiver<()>, | ||
) { | ||
loop { | ||
tokio::select! { | ||
_ = &mut exit_signal => return, | ||
event = file_events.recv() => { | ||
match unwrap_event(event) { | ||
Ok(event) => { | ||
if event.kind == EventKind::Create(notify::event::CreateKind::File) { | ||
let mut watches = watches.lock().await; | ||
for path in event.paths { | ||
let abs_path: &AbsoluteSystemPath = path | ||
.as_path() | ||
.try_into() | ||
.expect("Non-absolute path from filewatching"); | ||
if root.relation_to_path(abs_path) == PathRelation::Parent { | ||
if let Some(responder) = watches.cookies.remove(&path) { | ||
if let Err(_) = responder.send(Ok(())) { | ||
// Note that cookie waiters will time out if they don't get a | ||
// response, so we don't necessarily | ||
// need to panic here, although we could decide to do that in the | ||
// future. | ||
debug!("failed to notify cookie waiter of cookie success"); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
// we got an error, notify all waiters that their cookie failed | ||
let is_closing = matches!( | ||
e, | ||
WatchError::RecvError(broadcast::error::RecvError::Closed) | ||
); | ||
let resp = if is_closing { WatchError::Closed } else { e }; | ||
let mut watches = watches.lock().await; | ||
for (_, sender) in watches.cookies.drain() { | ||
if let Err(_) = sender.send(Err(resp.clone())) { | ||
// Note that cookie waiters will time out if they don't get a response, so | ||
// we don't necessarily need to panic here, although | ||
// we could decide to do that in the future. | ||
debug!("failed to notify cookie waiter of error: {}", resp); | ||
} | ||
} | ||
if is_closing { | ||
watches.closed = true; | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// result flattening is an unstable feature, so add a manual helper to do so. | ||
// This version is for unwrapping events coming from filewatching | ||
fn unwrap_event( | ||
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 173 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
event: Result<Result<Event, NotifyError>, broadcast::error::RecvError>, | ||
) -> Result<Event, WatchError> { | ||
match event { | ||
Ok(event) => event.map_err(|e| e.into()), | ||
Err(e) => Err(e.into()), | ||
} | ||
} | ||
|
||
// result flattening is an unstable feature, so add a manual helper to do so. | ||
// This version is for flattening notifications coming back from the cookie | ||
// watching task | ||
fn unwrap_resp( | ||
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (macos, macos-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Build Turborepo (windows, windows-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (macos, macos-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)
Check warning on line 185 in crates/turborepo-filewatch/src/cookie_jar.rs GitHub Actions / Turborepo Examples (macos, macos-latest)
|
||
resp: Result<Result<(), WatchError>, oneshot::error::RecvError>, | ||
) -> Result<(), CookieError> { | ||
match resp { | ||
Ok(resp) => resp.map_err(|e| e.into()), | ||
Err(e) => Err(e.into()), | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use std::{assert_matches::assert_matches, sync::Arc, time::Duration}; | ||
|
||
use notify::{event::CreateKind, ErrorKind, Event, EventKind}; | ||
use tokio::{sync::broadcast, time}; | ||
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; | ||
|
||
use crate::{ | ||
cookie_jar::{CookieError, CookieJar, WatchError}, | ||
NotifyError, | ||
}; | ||
|
||
async fn ensure_tracked(cookie_jar: &CookieJar, path: &AbsoluteSystemPath) { | ||
let path = path.as_std_path(); | ||
let mut interval = time::interval(Duration::from_millis(2)); | ||
for _i in 0..50 { | ||
interval.tick().await; | ||
let watches = cookie_jar.watches.lock().await; | ||
if watches.cookies.contains_key(path) { | ||
return; | ||
} | ||
} | ||
panic!("failed to find path in cookie_jar") | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_wait_for_cookie() { | ||
let tempdir = tempfile::tempdir().unwrap(); | ||
let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) | ||
.unwrap() | ||
.to_realpath() | ||
.unwrap(); | ||
|
||
let (send_file_events, file_events) = broadcast::channel(16); | ||
|
||
let cookie_jar = CookieJar::new(&path, Duration::from_millis(100), file_events); | ||
let cookie_path = path.join_component("0.cookie"); | ||
tokio_scoped::scope(|scope| { | ||
scope.spawn(async { cookie_jar.wait_for_cookie().await.unwrap() }); | ||
|
||
scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); | ||
|
||
send_file_events | ||
.send(Ok(Event { | ||
kind: EventKind::Create(CreateKind::File), | ||
paths: vec![cookie_path.as_std_path().to_owned()], | ||
..Default::default() | ||
})) | ||
.unwrap(); | ||
}); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_wait_for_cookie_after_close() { | ||
let tempdir = tempfile::tempdir().unwrap(); | ||
let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) | ||
.unwrap() | ||
.to_realpath() | ||
.unwrap(); | ||
|
||
let (send_file_events, file_events) = broadcast::channel(16); | ||
|
||
let cookie_jar = CookieJar::new(&path, Duration::from_millis(1000), file_events); | ||
tokio_scoped::scope(|scope| { | ||
scope.spawn(async { | ||
let result = cookie_jar.wait_for_cookie().await; | ||
assert_matches!(result, Err(CookieError::Watch(WatchError::Closed))); | ||
}); | ||
// We don't care whether or not we're tracking the cookie yet, either codepath | ||
// should result in the same error | ||
|
||
// Dropping the [last, only] sender closes the channel, which closes | ||
// the loop watching the cookie folder | ||
drop(send_file_events); | ||
}); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_wait_for_cookie_timeout() { | ||
let tempdir = tempfile::tempdir().unwrap(); | ||
let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) | ||
.unwrap() | ||
.to_realpath() | ||
.unwrap(); | ||
|
||
let (_send_file_events, file_events) = broadcast::channel(16); | ||
|
||
let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events); | ||
tokio_scoped::scope(|scope| { | ||
scope.spawn(async { | ||
let result = cookie_jar.wait_for_cookie().await; | ||
assert_matches!(result, Err(CookieError::Timeout(_))); | ||
}); | ||
|
||
// Don't send any events, expect to timeout. | ||
}); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_wait_for_cookie_with_error() { | ||
let tempdir = tempfile::tempdir().unwrap(); | ||
let path = AbsoluteSystemPathBuf::try_from(tempdir.path()) | ||
.unwrap() | ||
.to_realpath() | ||
.unwrap(); | ||
|
||
let (send_file_events, file_events) = broadcast::channel(16); | ||
|
||
let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events); | ||
let cookie_path = path.join_component("0.cookie"); | ||
tokio_scoped::scope(|scope| { | ||
scope.spawn(async { | ||
let result = cookie_jar.wait_for_cookie().await; | ||
assert_matches!(result, Err(CookieError::Watch(WatchError::NotifyError(_)))); | ||
}); | ||
|
||
scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); | ||
|
||
// send an error, assert that we fail to get our cookie | ||
send_file_events | ||
.send(Err(NotifyError(Arc::new(notify::Error { | ||
kind: ErrorKind::Generic("test error".to_string()), | ||
paths: vec![cookie_path.as_std_path().to_owned()], | ||
})))) | ||
.unwrap(); | ||
}); | ||
|
||
let cookie_path = path.join_component("1.cookie"); | ||
tokio_scoped::scope(|scope| { | ||
scope.spawn(async { | ||
cookie_jar.wait_for_cookie().await.unwrap(); | ||
}); | ||
|
||
scope.block_on(ensure_tracked(&cookie_jar, &cookie_path)); | ||
|
||
// ensure that we can still wait for new cookies even though an error occurred | ||
// previously | ||
send_file_events | ||
.send(Ok(Event { | ||
kind: EventKind::Create(CreateKind::File), | ||
paths: vec![cookie_path.as_std_path().to_owned()], | ||
..Default::default() | ||
})) | ||
.unwrap(); | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ use { | |
walkdir::WalkDir, | ||
}; | ||
|
||
mod cookie_jar; | ||
#[cfg(target_os = "macos")] | ||
mod fsevent; | ||
|
||
|