Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Turborepo): add filesystem cookies on top of filewatching #5858

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turborepo-filewatch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ version = "0.2.4"

[dev-dependencies]
tempfile = { workspace = true }
tokio-scoped = "0.2.0"

[features]
default = ["macos_fsevent"]
Expand Down
327 changes: 327 additions & 0 deletions crates/turborepo-filewatch/src/cookie_jar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
use std::{
collections::HashMap,
fs::OpenOptions,
path::PathBuf,
sync::{atomic::AtomicUsize, Arc, Mutex},
time::Duration,
};

use notify::{Event, EventKind};
use thiserror::Error;
use tokio::{
sync::{broadcast, oneshot},
time::error::Elapsed,
};
use tracing::debug;
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, PathRelation};

use crate::NotifyError;

#[derive(Clone, Debug, Error)]
pub enum WatchError {
#[error(transparent)]
RecvError(#[from] broadcast::error::RecvError),
#[error("filewatching encountered errors: {0}")]
NotifyError(#[from] NotifyError),
#[error("filewatching has closed, cannot watch cookies")]
Closed,

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

variant `Closed` is never constructed

Check warning on line 27 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

variant `Closed` is never constructed
}

#[derive(Debug, Error)]
pub enum CookieError {
#[error(transparent)]
Watch(#[from] WatchError),
#[error("cookie timeout expired")]
Timeout(#[from] Elapsed),
#[error("failed to receiver cookie notification: {0}")]
RecvError(#[from] oneshot::error::RecvError),
#[error("failed to write cookie file {0}")]
IO(#[from] std::io::Error),
}

type CookieResponse = Result<(), WatchError>;

pub struct CookieJar {

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

struct `CookieJar` is never constructed

Check warning on line 44 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

struct `CookieJar` is never constructed
root: AbsoluteSystemPathBuf,
serial: AtomicUsize,
timeout: Duration,
watches: Arc<Mutex<Watches>>,
// _exit_ch exists to trigger a close on the receiver when an instance
// of this struct is dropped. The task that is receiving events will exit,
// dropping the other sender for the broadcast channel, causing all receivers
// to be notified of a close.
_exit_ch: tokio::sync::oneshot::Sender<()>,
}

#[derive(Default)]
struct Watches {
closed: bool,

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

fields `closed` and `cookies` are never read

Check warning on line 58 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

fields `closed` and `cookies` are never read
cookies: HashMap<PathBuf, oneshot::Sender<CookieResponse>>,
}

impl CookieJar {
pub fn new(

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

associated items `new` and `wait_for_cookie` are never used

Check warning on line 63 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

associated items `new` and `wait_for_cookie` are never used
root: &AbsoluteSystemPath,
timeout: Duration,
file_events: broadcast::Receiver<Result<Event, NotifyError>>,
) -> Self {
let (exit_ch, exit_signal) = tokio::sync::oneshot::channel();
let watches = Arc::new(Mutex::new(Watches::default()));
tokio::spawn(watch_cookies(
root.to_owned(),
watches.clone(),
file_events,
exit_signal,
));
Self {
root: root.to_owned(),
serial: AtomicUsize::new(0),
timeout,
watches,
_exit_ch: exit_ch,
}
}

pub async fn wait_for_cookie(&self) -> Result<(), CookieError> {
let serial = self
.serial
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let cookie_path = self.root.join_component(&format!("{}.cookie", serial));
let (tx, rx) = oneshot::channel();
{
let mut watches = self.watches.lock().expect("mutex poisoned");
if watches.closed {
return Err(CookieError::Watch(WatchError::Closed));
}
watches
.cookies
.insert(cookie_path.as_std_path().to_owned(), tx);
}
let mut opts = OpenOptions::new();
opts.truncate(true).create(true).write(true);
{
// dropping the resulting file closes the handle
_ = cookie_path.open_with_options(opts)?;
}
// ??? -> timeout, recv failure, actual cookie failure
tokio::time::timeout(self.timeout, rx).await???;
Ok(())
}
}

async fn watch_cookies(

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

function `watch_cookies` is never used

Check warning on line 112 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

function `watch_cookies` is never used
root: AbsoluteSystemPathBuf,
watches: Arc<Mutex<Watches>>,
mut file_events: broadcast::Receiver<Result<Event, NotifyError>>,
mut exit_signal: tokio::sync::oneshot::Receiver<()>,
) {
loop {
tokio::select! {
_ = &mut exit_signal => return,
event = file_events.recv() => {
match flatten_event(event) {
Ok(event) => {
if event.kind == EventKind::Create(notify::event::CreateKind::File) {
let mut watches = watches.lock().expect("mutex poisoned");
for path in event.paths {
let abs_path: &AbsoluteSystemPath = path
.as_path()
.try_into()
.expect("Non-absolute path from filewatching");
if root.relation_to_path(abs_path) == PathRelation::Parent {
if let Some(responder) = watches.cookies.remove(&path) {
if let Err(_) = responder.send(Ok(())) {
// Note that cookie waiters will time out if they don't get a
// response, so we don't necessarily
// need to panic here, although we could decide to do that in the
// future.
debug!("failed to notify cookie waiter of cookie success");
}
}
}
}
}
}
Err(e) => {
// we got an error, notify all waiters that their cookie failed
let is_closing = matches!(
e,
WatchError::RecvError(broadcast::error::RecvError::Closed)
);
let resp = if is_closing { WatchError::Closed } else { e };
let mut watches = watches.lock().expect("mutex poisoned");
for (_, sender) in watches.cookies.drain() {
if let Err(_) = sender.send(Err(resp.clone())) {
// Note that cookie waiters will time out if they don't get a response, so
// we don't necessarily need to panic here, although
// we could decide to do that in the future.
debug!("failed to notify cookie waiter of error: {}", resp);
}
}
if is_closing {
watches.closed = true;
return;
}
}
}
}
}
}
}

// result flattening is an unstable feature, so add a manual helper to do so.
// This version is for unwrapping events coming from filewatching
fn flatten_event(

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

function `flatten_event` is never used

Check warning on line 174 in crates/turborepo-filewatch/src/cookie_jar.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

function `flatten_event` is never used
event: Result<Result<Event, NotifyError>, broadcast::error::RecvError>,
) -> Result<Event, WatchError> {
Ok(event??)
}

#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, sync::Arc, time::Duration};

use notify::{event::CreateKind, ErrorKind, Event, EventKind};
use tokio::{sync::broadcast, time};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};

use crate::{
cookie_jar::{CookieError, CookieJar, WatchError},
NotifyError,
};

async fn ensure_tracked(cookie_jar: &CookieJar, path: &AbsoluteSystemPath) {
let path = path.as_std_path();
let mut interval = time::interval(Duration::from_millis(2));
for _i in 0..50 {
interval.tick().await;
let watches = cookie_jar.watches.lock().expect("mutex poisoned");
if watches.cookies.contains_key(path) {
return;
}
}
panic!("failed to find path in cookie_jar")
}

#[tokio::test(flavor = "multi_thread")]
async fn test_wait_for_cookie() {
let tempdir = tempfile::tempdir().unwrap();
let path = AbsoluteSystemPathBuf::try_from(tempdir.path())
.unwrap()
.to_realpath()
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);

let cookie_jar = CookieJar::new(&path, Duration::from_millis(100), file_events);
let cookie_path = path.join_component("0.cookie");
tokio_scoped::scope(|scope| {
scope.spawn(async { cookie_jar.wait_for_cookie().await.unwrap() });

scope.block_on(ensure_tracked(&cookie_jar, &cookie_path));

send_file_events
.send(Ok(Event {
kind: EventKind::Create(CreateKind::File),
paths: vec![cookie_path.as_std_path().to_owned()],
..Default::default()
}))
.unwrap();
});
}

#[tokio::test(flavor = "multi_thread")]
async fn test_wait_for_cookie_after_close() {
let tempdir = tempfile::tempdir().unwrap();
let path = AbsoluteSystemPathBuf::try_from(tempdir.path())
.unwrap()
.to_realpath()
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);

let cookie_jar = CookieJar::new(&path, Duration::from_millis(1000), file_events);
tokio_scoped::scope(|scope| {
scope.spawn(async {
let result = cookie_jar.wait_for_cookie().await;
assert_matches!(result, Err(CookieError::Watch(WatchError::Closed)));
});
// We don't care whether or not we're tracking the cookie yet, either codepath
// should result in the same error

// Dropping the [last, only] sender closes the channel, which closes
// the loop watching the cookie folder
drop(send_file_events);
});
}

#[tokio::test(flavor = "multi_thread")]
async fn test_wait_for_cookie_timeout() {
let tempdir = tempfile::tempdir().unwrap();
let path = AbsoluteSystemPathBuf::try_from(tempdir.path())
.unwrap()
.to_realpath()
.unwrap();

let (_send_file_events, file_events) = broadcast::channel(16);

let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events);
tokio_scoped::scope(|scope| {
scope.spawn(async {
let result = cookie_jar.wait_for_cookie().await;
assert_matches!(result, Err(CookieError::Timeout(_)));
});

// Don't send any events, expect to timeout.
});
}

#[tokio::test(flavor = "multi_thread")]
async fn test_wait_for_cookie_with_error() {
let tempdir = tempfile::tempdir().unwrap();
let path = AbsoluteSystemPathBuf::try_from(tempdir.path())
.unwrap()
.to_realpath()
.unwrap();

let (send_file_events, file_events) = broadcast::channel(16);

let cookie_jar = CookieJar::new(&path, Duration::from_millis(10), file_events);
let cookie_path = path.join_component("0.cookie");
tokio_scoped::scope(|scope| {
scope.spawn(async {
let result = cookie_jar.wait_for_cookie().await;
assert_matches!(result, Err(CookieError::Watch(WatchError::NotifyError(_))));
});

scope.block_on(ensure_tracked(&cookie_jar, &cookie_path));

// send an error, assert that we fail to get our cookie
send_file_events
.send(Err(NotifyError(Arc::new(notify::Error {
kind: ErrorKind::Generic("test error".to_string()),
paths: vec![cookie_path.as_std_path().to_owned()],
}))))
.unwrap();
});

let cookie_path = path.join_component("1.cookie");
tokio_scoped::scope(|scope| {
scope.spawn(async {
cookie_jar.wait_for_cookie().await.unwrap();
});

scope.block_on(ensure_tracked(&cookie_jar, &cookie_path));

// ensure that we can still wait for new cookies even though an error occurred
// previously
send_file_events
.send(Ok(Event {
kind: EventKind::Create(CreateKind::File),
paths: vec![cookie_path.as_std_path().to_owned()],
..Default::default()
}))
.unwrap();
});
}
}
1 change: 1 addition & 0 deletions crates/turborepo-filewatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use {
walkdir::WalkDir,
};

mod cookie_jar;
#[cfg(target_os = "macos")]
mod fsevent;

Expand Down
Loading