Skip to content

Commit

Permalink
Platform specific watching behavior. On Darwin recursively watch the
Browse files Browse the repository at this point in the history
build root at startup. On Linux watch individual directory roots.

[ci skip-jvm-tests]  # No JVM changes made.
  • Loading branch information
Henry Fuller committed Mar 25, 2020
1 parent 04b8746 commit 3f52ee5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 49 deletions.
1 change: 0 additions & 1 deletion pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ pants_ignore.add = [
"/build-support/bin/native/src",
# We shouldn't walk or watch the rust compiler artifacts because it is slow.
"/src/rust/engine/target",
"/**/*/__pycache__",
]

[cache]
Expand Down
52 changes: 23 additions & 29 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,38 +1069,32 @@ impl Node for NodeKey {

scope_task_parent_id(maybe_span_id, async move {
let context2 = context.clone();
let self2 = self.clone();
let maybe_watch = if let Some(path) = self.fs_subject() {
let abs_path = context.core.build_root.join(path);
context
.core
.watcher
.watch(abs_path)
.map_err(|e| Failure::FileWatch(format!("{:?}", e)))
.await
} else {
Ok(())
};

let result = match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => {
// We only need to add watches to scandir nodes. Notify will watch
// all direct descendents of the scandir fs subject. We will only ever digest
// a file or read a link which is part of a directory we have scanned.
// The main reason for doing this is performance. Adding watches to files
// adds significant overhead to cold builds or builds not running pantsd.
let path = self2.fs_subject().unwrap();
let abs_path = context.core.build_root.join(path);
if let Err(e) = context
.core
.watcher
.watch(abs_path)
.map_err(|e| Failure::FileWatch(format!("{:?}", e)))
.await
{
Err(e)
} else {
let result = match maybe_watch {
Ok(()) => match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
}
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
},
Err(e) => Err(e),
};
if let Some(started_workunit) = maybe_started_workunit {
let workunit: WorkUnit = started_workunit.finish();
Expand Down
61 changes: 42 additions & 19 deletions src/rust/engine/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use notify::{RecommendedWatcher, RecursiveMode, Watcher};
//use parking_lot::Mutex;
use futures::compat::Future01CompatExt;
use futures_locks::Mutex;
use process_execution::Platform;
use task_executor::Executor;

use graph::{Graph, InvalidationResult};
Expand Down Expand Up @@ -41,6 +42,7 @@ pub struct InvalidationWatcher {
watcher: Arc<Mutex<RecommendedWatcher>>,
executor: Executor,
liveness: Receiver<()>,
current_platform: Platform,
}

impl InvalidationWatcher {
Expand All @@ -55,11 +57,25 @@ impl InvalidationWatcher {
// We canonicalize the build_root once so this isn't a problem.
let canonical_build_root =
std::fs::canonicalize(build_root.as_path()).map_err(|e| format!("{:?}", e))?;
let current_platform = Platform::current()?;
let (watch_sender, watch_receiver) = crossbeam_channel::unbounded();
let watcher = Arc::new(Mutex::new(
Watcher::new(watch_sender, Duration::from_millis(50))
.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?,
));
let mut watcher: RecommendedWatcher = Watcher::new(watch_sender, Duration::from_millis(50))
.map_err(|e| format!("Failed to begin watching the filesystem: {}", e))?;
// On darwin the notify API is much more efficient if you watch the build root
// recursively, so we set up that watch here and then return early when watch() is
// called by nodes that are running. On Linux the notify crate handles adding paths to watch
// much more efficiently so we do that instead on Linux.
if current_platform == Platform::Darwin {
watcher
.watch(canonical_build_root.clone(), RecursiveMode::Recursive)
.map_err(|e| {
format!(
"Failed to begin recursively watching files in the build root: {}",
e
)
})?
}
let wrapped_watcher = Arc::new(Mutex::new(watcher));

let (thread_liveness_sender, thread_liveness_receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
Expand Down Expand Up @@ -95,7 +111,7 @@ impl InvalidationWatcher {
})
.flatten()
.collect();
info!("notify invalidating {:?} because of {:?}", paths, ev.kind);
debug!("notify invalidating {:?} because of {:?}", paths, ev.kind);
InvalidationWatcher::invalidate(&graph, &paths, "notify");
}
Ok(Err(err)) => {
Expand All @@ -120,30 +136,37 @@ impl InvalidationWatcher {
});

Ok(InvalidationWatcher {
watcher,
watcher: wrapped_watcher,
executor,
liveness: thread_liveness_receiver,
current_platform,
})
}

///
/// Watch the given path non-recursively.
///
pub async fn watch(&self, path: PathBuf) -> Result<(), notify::Error> {
// Using a futurized mutex here because for some reason using a regular mutex
// to block the io pool causes the v2 ui to not update which nodes its working
// on properly.
let watcher_lock = self.watcher.lock().compat().await;
match watcher_lock {
Ok(mut watcher_lock) => {
self
.executor
.spawn_blocking(move || watcher_lock.watch(path, RecursiveMode::NonRecursive))
.await
// Short circuit here if we are on a Darwin platform because we should be watching
// the entire build root recursively already.
if self.current_platform == Platform::Darwin {
Ok(())
} else {
// Using a futurized mutex here because for some reason using a regular mutex
// to block the io pool causes the v2 ui to not update which nodes its working
// on properly.
let watcher_lock = self.watcher.lock().compat().await;
match watcher_lock {
Ok(mut watcher_lock) => {
self
.executor
.spawn_blocking(move || watcher_lock.watch(path, RecursiveMode::NonRecursive))
.await
}
Err(()) => Err(notify::Error::new(notify::ErrorKind::Generic(
"Couldn't lock mutex for invalidation watcher".to_string(),
))),
}
Err(()) => Err(notify::Error::new(notify::ErrorKind::Generic(
"Couldn't lock mutex for invalidation watcher".to_string(),
))),
}
}

Expand Down

0 comments on commit 3f52ee5

Please sign in to comment.