Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notify fs watcher to engine. #9318

Merged
merged 16 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pants_ignore.add = [
"/build-support/*.venv/",
# An absolute symlink to the Pants Rust toolchain sources.
"/build-support/bin/native/src",
# We shouldn't walk or watch the rust compiler artifacts because it is slow.
"/src/rust/engine/target",
]

[cache]
Expand Down
813 changes: 458 additions & 355 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@ default-members = [
]

[dependencies]
async_semaphore = { path = "async_semaphore" }
async-trait = "0.1"
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.3"
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
futures-locks = "0.3.0"
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
Expand All @@ -98,6 +101,11 @@ log = "0.4"
logging = { path = "logging" }
num_cpus = "1"
num_enum = "0.4"
# notify is currently an experimental API, we are pinning to https://docs.rs/notify/5.0.0-pre.1/notify/
# because the latest prerelease at time of writing has removed the debounced watcher which we would like to use.
# The author suggests they will add the debounced watcher back into the stable 5.0.0 release. When that happens
# we can move to it.
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
Expand All @@ -115,6 +123,11 @@ url = "2.1"
uuid = { version = "0.7", features = ["v4"] }
workunit_store = { path = "workunit_store" }

[dev-dependencies]
testutil = { path = "./testutil" }
fs = { path = "./fs" }
env_logger = "0.5.4"

[patch.crates-io]
# TODO: Remove patch when we can upgrade to an official released version of protobuf with a fix.
# See: https://github.com/pantsbuild/pants/issues/7760 for context.
Expand Down
14 changes: 7 additions & 7 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use boxfuture::{BoxFuture, Boxable};
/// the Node was `cleared`), the work is discarded. See `Entry::complete` for more information.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct RunToken(u32);
pub struct RunToken(u32);

impl RunToken {
fn initial() -> RunToken {
pub fn initial() -> RunToken {
RunToken(0)
}

Expand All @@ -40,10 +40,10 @@ impl RunToken {
/// incremented when the output of a node has changed.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct Generation(u32);
pub struct Generation(u32);

impl Generation {
fn initial() -> Generation {
pub fn initial() -> Generation {
Generation(0)
}

Expand All @@ -65,7 +65,7 @@ impl Generation {
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub(crate) enum EntryResult<N: Node> {
pub enum EntryResult<N: Node> {
Clean(Result<N::Item, N::Error>),
Dirty(Result<N::Item, N::Error>),
Uncacheable(
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<N: Node> AsRef<Result<N::Item, N::Error>> for EntryResult<N> {

#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub(crate) enum EntryState<N: Node> {
pub enum EntryState<N: Node> {
// A node that has either been explicitly cleared, or has not yet started Running. In this state
// there is no need for a dirty bit because the RunToken is either in its initial state, or has
// been explicitly incremented when the node was cleared.
Expand Down Expand Up @@ -174,7 +174,7 @@ pub struct Entry<N: Node> {
// maps is painful.
node: N,

state: Arc<Mutex<EntryState<N>>>,
pub state: Arc<Mutex<EntryState<N>>>,
}

impl<N: Node> Entry<N> {
Expand Down
43 changes: 41 additions & 2 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use hashing;

use petgraph;

mod entry;
// make the entry module public for testing purposes. We use it to contruct mock
// graph entries in the notify watch tests.
pub mod entry;
mod node;

pub use crate::entry::Entry;
pub use crate::entry::{Entry, EntryState};
use crate::entry::{Generation, RunToken};

use std::collections::binary_heap::BinaryHeap;
Expand Down Expand Up @@ -1009,6 +1011,43 @@ impl<N: Node> Graph<N> {
}
}

// This module provides a trait which contains functions that
// should only be used in tests. A user must explicitly import the trait
// to use the extra test functions, and they should only be imported into
// test modules.
pub mod test_support {
use super::{EntryId, EntryState, Graph, Node};
pub trait TestGraph<N: Node> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>);
fn add_fixture_entry(&self, node: N) -> EntryId;
fn entry_state(&self, id: EntryId) -> &str;
}
impl<N: Node> TestGraph<N> for Graph<N> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>) {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let mut entry_state = entry.state.lock();
*entry_state = state;
}

fn add_fixture_entry(&self, node: N) -> EntryId {
let mut inner = self.inner.lock();
inner.ensure_entry(node)
}

fn entry_state(&self, id: EntryId) -> &str {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let entry_state = entry.state.lock();
match *entry_state {
EntryState::Completed { .. } => "completed",
EntryState::Running { .. } => "running",
EntryState::NotStarted { .. } => "not started",
}
}
}
}

///
/// Represents the state of a particular walk through a Graph. Implements Iterator and has the same
/// lifetime as the Graph itself.
Expand Down
12 changes: 9 additions & 3 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
use crate::watch::InvalidationWatcher;
use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, PosixFS};
Expand All @@ -43,7 +44,7 @@ const GIGABYTES: usize = 1024 * 1024 * 1024;
/// https://github.com/tokio-rs/tokio/issues/369 is resolved.
///
pub struct Core {
pub graph: Graph<NodeKey>,
pub graph: Arc<Graph<NodeKey>>,
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
Expand All @@ -53,6 +54,7 @@ pub struct Core {
pub command_runner: Box<dyn process_execution::CommandRunner>,
pub http_client: reqwest::Client,
pub vfs: PosixFS,
pub watcher: InvalidationWatcher,
pub build_root: PathBuf,
}

Expand Down Expand Up @@ -232,12 +234,15 @@ impl Core {
process_execution_metadata,
));
}
let graph = Arc::new(Graph::new());
let watcher =
InvalidationWatcher::new(Arc::downgrade(&graph), executor.clone(), build_root.clone())?;

let http_client = reqwest::Client::new();
let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types);

Ok(Core {
graph: Graph::new(),
graph: graph,
tasks: tasks,
rule_graph: rule_graph,
types: types,
Expand All @@ -250,7 +255,8 @@ impl Core {
// exceptions, rather than as panics.
vfs: PosixFS::new(&build_root, &ignore_patterns, executor)
.map_err(|e| format!("Could not initialize VFS: {:?}", e))?,
build_root: build_root,
build_root,
watcher,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,15 @@ pub enum Failure {
Invalidated,
/// A rule raised an exception.
Throw(Value, String),
FileWatch(String),
}

impl fmt::Display for Failure {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Failure::Invalidated => write!(f, "Exhausted retries due to changed files."),
Failure::Throw(exc, _) => write!(f, "{}", externs::val_to_str(exc)),
Failure::FileWatch(failure) => write!(f, "{}", failure),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/externs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ impl From<Result<Value, Failure>> for PyResult {
let val = match f {
f @ Failure::Invalidated => create_exception(&format!("{}", f)),
Failure::Throw(exc, _) => exc,
Failure::FileWatch(failure) => create_exception(&failure),
};
PyResult {
is_throw: true,
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod scheduler;
mod selectors;
mod tasks;
mod types;
mod watch;

pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
Expand All @@ -49,3 +50,6 @@ pub use crate::scheduler::{
};
pub use crate::tasks::{Rule, Tasks};
pub use crate::types::Types;

#[cfg(test)]
mod watch_tests;
41 changes: 29 additions & 12 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ impl NodeVisualizer<NodeKey> for Visualizer {
let max_colors = 12;
match entry.peek(context) {
None => "white".to_string(),
Some(Err(Failure::Throw(..))) => "4".to_string(),
Some(Err(Failure::Throw(..))) | Some(Err(Failure::FileWatch(..))) => "4".to_string(),
Some(Err(Failure::Invalidated)) => "12".to_string(),
Some(Ok(_)) => {
let viz_colors_len = self.viz_colors.len();
Expand All @@ -962,6 +962,7 @@ impl NodeTracer<NodeKey> for Tracer {
match result {
Some(Err(Failure::Invalidated)) => false,
Some(Err(Failure::Throw(..))) => false,
Some(Err(Failure::FileWatch(..))) => false,
Some(Ok(_)) => true,
None => {
// A Node with no state is either still running, or effectively cancelled
Expand All @@ -986,6 +987,7 @@ impl NodeTracer<NodeKey> for Tracer {
.join("\n")
),
Some(Err(Failure::Invalidated)) => "Invalidated".to_string(),
Some(Err(Failure::FileWatch(failure))) => format!("FileWatch failed: {}", failure),
}
}
}
Expand Down Expand Up @@ -1067,17 +1069,32 @@ impl Node for NodeKey {

scope_task_parent_id(maybe_span_id, async move {
let context2 = context.clone();
let result = match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
let maybe_watch = if let Some(path) = self.fs_subject() {
let abs_path = context.core.build_root.join(path);
context
.core
.watcher
.watch(abs_path)
.map_err(|e| Failure::FileWatch(format!("{:?}", e)))
.await
} else {
Ok(())
};

let result = match maybe_watch {
Ok(()) => match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).compat().await
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Select(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).compat().await,
NodeKey::Task(n) => n.run(context).map(NodeResult::from).compat().await,
},
Err(e) => Err(e),
};
if let Some(started_workunit) = maybe_started_workunit {
let workunit: WorkUnit = started_workunit.finish();
Expand Down
18 changes: 2 additions & 16 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures01::future::{self, Future};
use crate::context::{Context, Core};
use crate::core::{Failure, Params, TypeId, Value};
use crate::nodes::{NodeKey, Select, Tracer, Visualizer};
use crate::watch::InvalidationWatcher;
use graph::{Graph, InvalidationResult};
use hashing;
use indexmap::IndexMap;
Expand Down Expand Up @@ -228,22 +229,7 @@ impl Scheduler {
/// Invalidate the invalidation roots represented by the given Paths.
///
pub fn invalidate(&self, paths: &HashSet<PathBuf>) -> usize {
let InvalidationResult { cleared, dirtied } =
self.core.graph.invalidate_from_roots(move |node| {
if let Some(fs_subject) = node.fs_subject() {
paths.contains(fs_subject)
} else {
false
}
});
// TODO: The rust log level is not currently set correctly in a pantsd context. To ensure that
// we see this even at `info` level, we set it to warn. #6004 should address this by making
// rust logging re-configuration an explicit step in `src/python/pants/init/logging.py`.
warn!(
"invalidation: cleared {} and dirtied {} nodes for: {:?}",
cleared, dirtied, paths
);
cleared + dirtied
InvalidationWatcher::invalidate(&self.core.graph, paths, "watchman")
}

///
Expand Down
Loading