Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notify fs watcher to engine. #9318

Merged
merged 16 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,7 @@ def rule_subgraph_visualization(self, root_subject_type, product_type):
yield line.rstrip()

def invalidate_files(self, direct_filenames):
# NB: Watchman no longer triggers events when children are created/deleted under a directory,
# so we always need to invalidate the direct parent as well.
filenames = set(direct_filenames)
filenames.update(os.path.dirname(f) for f in direct_filenames)
filenames_buf = self._native.context.utf8_buf_buf(filenames)
return self._native.lib.graph_invalidate(self._scheduler, filenames_buf)

Expand Down
807 changes: 440 additions & 367 deletions src/rust/engine/Cargo.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ default-members = [
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.3"
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
Expand All @@ -95,6 +96,11 @@ lazy_static = "1"
log = "0.4"
logging = { path = "logging" }
num_enum = "0.4"
# notify is currently an experimental API, we are pinning to https://docs.rs/notify/5.0.0-pre.1/notify/
# because the latest prerelease at time of writing has removed the debounced watcher which we would like to use.
# The author suggests they will add the debounced watcher back into the stable 5.0.0 release. When that happens
# we can move to it.
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
Expand All @@ -111,6 +117,12 @@ uuid = { version = "0.7", features = ["v4"] }
task_executor = { path = "task_executor" }
workunit_store = { path = "workunit_store" }

[dev-dependencies]
testutil = { path = "./testutil" }
fs = { path = "./fs" }
tokio = "0.1"
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
env_logger = "0.5.4"

[patch.crates-io]
# TODO: Remove patch when we can upgrade to an official released version of protobuf with a fix.
# See: https://github.com/pantsbuild/pants/issues/7760 for context.
Expand Down
14 changes: 7 additions & 7 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use boxfuture::{BoxFuture, Boxable};
/// the Node was `cleared`), the work is discarded. See `Entry::complete` for more information.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct RunToken(u32);
pub struct RunToken(u32);

impl RunToken {
fn initial() -> RunToken {
pub fn initial() -> RunToken {
RunToken(0)
}

Expand All @@ -40,10 +40,10 @@ impl RunToken {
/// incremented when the output of a node has changed.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct Generation(u32);
pub struct Generation(u32);

impl Generation {
fn initial() -> Generation {
pub fn initial() -> Generation {
Generation(0)
}

Expand All @@ -65,7 +65,7 @@ impl Generation {
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub(crate) enum EntryResult<N: Node> {
pub enum EntryResult<N: Node> {
Clean(Result<N::Item, N::Error>),
Dirty(Result<N::Item, N::Error>),
Uncacheable(
Expand Down Expand Up @@ -118,7 +118,7 @@ impl<N: Node> AsRef<Result<N::Item, N::Error>> for EntryResult<N> {

#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub(crate) enum EntryState<N: Node> {
pub enum EntryState<N: Node> {
// A node that has either been explicitly cleared, or has not yet started Running. In this state
// there is no need for a dirty bit because the RunToken is either in its initial state, or has
// been explicitly incremented when the node was cleared.
Expand Down Expand Up @@ -174,7 +174,7 @@ pub struct Entry<N: Node> {
// maps is painful.
node: N,

state: Arc<Mutex<EntryState<N>>>,
pub state: Arc<Mutex<EntryState<N>>>,
}

impl<N: Node> Entry<N> {
Expand Down
45 changes: 43 additions & 2 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use hashing;

use petgraph;

mod entry;
// make the entry module public for testing purposes. We use it to contruct mock
// graph entries in the notify watch tests.
pub mod entry;
mod node;

pub use crate::entry::Entry;
pub use crate::entry::{Entry, EntryState};
use crate::entry::{Generation, RunToken};

use std::collections::binary_heap::BinaryHeap;
Expand Down Expand Up @@ -1007,6 +1009,45 @@ impl<N: Node> Graph<N> {
Ok(())
}
}

}

// This module provides a trait which contains functions that
// should only be used in tests. A user must explicitly import the trait
// to use the extra test functions, and they should only be imported into
// test modules.
pub mod test_support {
use super::{EntryState, EntryId, Node};
pub trait TestGraph<N: Node> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>);
fn add_fixture_entry(&self, node: N) -> EntryId;
fn entry_state(&self, id: EntryId) -> &str;
}
}

impl<N: Node> test_support::TestGraph<N> for Graph<N> {
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>) {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let mut entry_state = entry.state.lock();
*entry_state = state;
}

fn add_fixture_entry(&self, node: N) -> EntryId {
let mut inner = self.inner.lock();
inner.ensure_entry(node)
}

fn entry_state(&self, id: EntryId) -> &str {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let entry_state = entry.state.lock();
match *entry_state {
EntryState::Completed { .. } => "completed",
EntryState::Running { .. } => "running",
EntryState::NotStarted { .. } => "not started",
}
}
}

///
Expand Down
17 changes: 11 additions & 6 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
use crate::watch::InvalidationWatcher;
use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, PosixFS};
Expand All @@ -41,7 +42,7 @@ const GIGABYTES: usize = 1024 * 1024 * 1024;
/// https://github.com/tokio-rs/tokio/issues/369 is resolved.
///
pub struct Core {
pub graph: Graph<NodeKey>,
pub graph: Arc<Graph<NodeKey>>,
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
Expand All @@ -50,6 +51,7 @@ pub struct Core {
pub command_runner: Box<dyn process_execution::CommandRunner>,
pub http_client: reqwest::r#async::Client,
pub vfs: PosixFS,
pub watcher: InvalidationWatcher,
pub build_root: PathBuf,
}

Expand Down Expand Up @@ -218,24 +220,27 @@ impl Core {
process_execution_metadata,
));
}
let graph = Arc::new(Graph::new());
let watcher = InvalidationWatcher::new(Arc::downgrade(&graph), build_root.clone())?;

let http_client = reqwest::r#async::Client::new();
let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types);

Ok(Core {
graph: Graph::new(),
tasks: tasks,
rule_graph: rule_graph,
types: types,
graph,
tasks,
rule_graph,
types,
executor: executor.clone(),
store,
command_runner,
http_client,
watcher,
// TODO: Errors in initialization should definitely be exposed as python
// exceptions, rather than as panics.
vfs: PosixFS::new(&build_root, &ignore_patterns, executor)
.map_err(|e| format!("Could not initialize VFS: {:?}", e))?,
build_root: build_root,
build_root,
})
}

Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod scheduler;
mod selectors;
mod tasks;
mod types;
mod watch;

pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
Expand All @@ -49,3 +50,6 @@ pub use crate::scheduler::{
};
pub use crate::tasks::{Rule, Tasks};
pub use crate::types::Types;

#[cfg(test)]
mod watch_tests;
56 changes: 38 additions & 18 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,26 +1054,46 @@ impl Node for NodeKey {
};

let context2 = context.clone();
future::lazy(|| {
if let Some(span_id) = maybe_span_id {
set_parent_id(span_id);
}
match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::MultiPlatformExecuteProcess(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(),
future::lazy({
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
let context = context.clone();
let self2 = self.clone();
move || {
if let Some(path) = self2.fs_subject() {
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
let abs_path = context.core.build_root.join(path);
context
.core
.executor
.spawn_on_io_pool(context.core.watcher.watch(abs_path))
.to_boxed()
} else {
future::ok(()).to_boxed()
}
}
})
.inspect(move |_: &NodeResult| {
if let Some(started_workunit) = maybe_started_workunit {
let workunit: WorkUnit = started_workunit.finish();
context2.session.workunit_store().add_workunit(workunit)
}
.then(|_| {
hrfuller marked this conversation as resolved.
Show resolved Hide resolved
future::lazy(|| {
if let Some(span_id) = maybe_span_id {
set_parent_id(span_id);
}
match self {
NodeKey::DigestFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::DownloadedFile(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::MultiPlatformExecuteProcess(n) => {
n.run(context).map(NodeResult::from).to_boxed()
}
NodeKey::ReadLink(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Scandir(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Select(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Snapshot(n) => n.run(context).map(NodeResult::from).to_boxed(),
NodeKey::Task(n) => n.run(context).map(NodeResult::from).to_boxed(),
}
})
.inspect(move |_: &NodeResult| {
if let Some(started_workunit) = maybe_started_workunit {
let workunit: WorkUnit = started_workunit.finish();
context2.session.workunit_store().add_workunit(workunit)
}
})
})
.to_boxed()
}
Expand Down
18 changes: 2 additions & 16 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures01::future::{self, Future};
use crate::context::{Context, Core};
use crate::core::{Failure, Params, TypeId, Value};
use crate::nodes::{NodeKey, Select, Tracer, Visualizer};
use crate::watch::InvalidationWatcher;
use graph::{Graph, InvalidationResult};
use hashing;
use indexmap::IndexMap;
Expand Down Expand Up @@ -227,22 +228,7 @@ impl Scheduler {
/// Invalidate the invalidation roots represented by the given Paths.
///
pub fn invalidate(&self, paths: &HashSet<PathBuf>) -> usize {
let InvalidationResult { cleared, dirtied } =
self.core.graph.invalidate_from_roots(move |node| {
if let Some(fs_subject) = node.fs_subject() {
paths.contains(fs_subject)
} else {
false
}
});
// TODO: The rust log level is not currently set correctly in a pantsd context. To ensure that
// we see this even at `info` level, we set it to warn. #6004 should address this by making
// rust logging re-configuration an explicit step in `src/python/pants/init/logging.py`.
warn!(
"invalidation: cleared {} and dirtied {} nodes for: {:?}",
cleared, dirtied, paths
);
cleared + dirtied
InvalidationWatcher::invalidate(&self.core.graph, paths, "watchman")
}

///
Expand Down
Loading