Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract a watch crate #9635

Merged
merged 1 commit into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ members = [
"testutil/local_cas",
"testutil/local_execution_server",
"ui",
"watch",
"workunit_store"
]

Expand Down Expand Up @@ -77,6 +78,7 @@ default-members = [
"testutil/local_cas",
"testutil/local_execution_server",
"ui",
"watch",
"workunit_store"
]

Expand All @@ -86,12 +88,10 @@ async-trait = "0.1"
boxfuture = { path = "boxfuture" }
bytes = "0.4.5"
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.3"
fnv = "1.0.5"
fs = { path = "fs" }
futures01 = { package = "futures", version = "0.1" }
futures = { version = "0.3", features = ["compat"] }
futures-locks = "0.3.0"
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
Expand All @@ -101,11 +101,6 @@ log = "0.4"
logging = { path = "logging" }
num_cpus = "1"
num_enum = "0.4"
# notify is currently an experimental API, we are pinning to https://docs.rs/notify/5.0.0-pre.1/notify/
# because the latest prerelease at time of writing has removed the debounced watcher which we would like to use.
# The author suggests they will add the debounced watcher back into the stable 5.0.0 release. When that happens
# we can move to it.
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
parking_lot = "0.6"
process_execution = { path = "process_execution" }
rand = "0.6"
Expand All @@ -121,6 +116,7 @@ tokio = { version = "0.2", features = ["rt-threaded"] }
ui = { path = "ui" }
url = "2.1"
uuid = { version = "0.7", features = ["v4"] }
watch = { path = "watch" }
workunit_store = { path = "workunit_store" }

[dev-dependencies]
Expand Down
37 changes: 0 additions & 37 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,43 +1055,6 @@ impl<N: Node> Graph<N> {
}
}

// This module provides a trait which contains functions that
// should only be used in tests. A user must explicitly import the trait
// to use the extra test functions, and they should only be imported into
// test modules.
pub mod test_support {
use super::{EntryId, EntryState, Graph, Node};
pub trait TestGraph<N: Node> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>);
fn add_fixture_entry(&self, node: N) -> EntryId;
fn entry_state(&self, id: EntryId) -> &str;
}
impl<N: Node> TestGraph<N> for Graph<N> {
fn set_fixture_entry_state_for_id(&self, id: EntryId, state: EntryState<N>) {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let mut entry_state = entry.state.lock();
*entry_state = state;
}

fn add_fixture_entry(&self, node: N) -> EntryId {
let mut inner = self.inner.lock();
inner.ensure_entry(node)
}

fn entry_state(&self, id: EntryId) -> &str {
let mut inner = self.inner.lock();
let entry = inner.entry_for_id_mut(id).unwrap();
let entry_state = entry.state.lock();
match *entry_state {
EntryState::Completed { .. } => "completed",
EntryState::Running { .. } => "running",
EntryState::NotStarted { .. } => "not started",
}
}
}
}

///
/// Represents the state of a particular walk through a Graph. Implements Iterator and has the same
/// lifetime as the Graph itself.
Expand Down
40 changes: 35 additions & 5 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::convert::{Into, TryInto};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,11 +19,12 @@ use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
use crate::watch::InvalidationWatcher;

use boxfuture::{BoxFuture, Boxable};
use core::clone::Clone;
use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS};
use graph::{EntryId, Graph, NodeContext};
use graph::{EntryId, Graph, InvalidationResult, NodeContext};
use log::info;
use process_execution::{
self, speculate::SpeculatingCommandRunner, BoundedCommandRunner, Platform, ProcessMetadata,
};
Expand All @@ -32,6 +34,7 @@ use rule_graph::RuleGraph;
use sharded_lmdb::ShardedLmdb;
use store::Store;
use tokio::runtime::{Builder, Runtime};
use watch::{Invalidatable, InvalidationWatcher};

const GIGABYTES: usize = 1024 * 1024 * 1024;

Expand All @@ -44,7 +47,7 @@ const GIGABYTES: usize = 1024 * 1024 * 1024;
/// https://github.com/tokio-rs/tokio/issues/369 is resolved.
///
pub struct Core {
pub graph: Arc<Graph<NodeKey>>,
pub graph: Arc<InvalidatableGraph>,
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
Expand Down Expand Up @@ -238,7 +241,7 @@ impl Core {
process_execution_metadata,
));
}
let graph = Arc::new(Graph::new());
let graph = Arc::new(InvalidatableGraph(Graph::new()));

let http_client = reqwest::Client::new();
let rule_graph = RuleGraph::new(tasks.as_map(), root_subject_types);
Expand Down Expand Up @@ -295,6 +298,33 @@ impl Core {
}
}

pub struct InvalidatableGraph(Graph<NodeKey>);

impl Invalidatable for InvalidatableGraph {
fn invalidate(&self, paths: &HashSet<PathBuf>, caller: &str) -> usize {
let InvalidationResult { cleared, dirtied } = self.invalidate_from_roots(move |node| {
if let Some(fs_subject) = node.fs_subject() {
paths.contains(fs_subject)
} else {
false
}
});
info!(
"{} invalidation: cleared {} and dirtied {} nodes for: {:?}",
caller, cleared, dirtied, paths
);
cleared + dirtied
}
}

impl Deref for InvalidatableGraph {
type Target = Graph<NodeKey>;

fn deref(&self) -> &Graph<NodeKey> {
&self.0
}
}

#[derive(Clone)]
pub struct Context {
entry_id: Option<EntryId>,
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ mod scheduler;
mod selectors;
mod tasks;
mod types;
mod watch;

pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
Expand All @@ -51,6 +50,3 @@ pub use crate::scheduler::{
};
pub use crate::tasks::{Rule, Tasks};
pub use crate::types::Types;

#[cfg(test)]
mod watch_tests;
5 changes: 3 additions & 2 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use futures01::future::{self, Future};
use crate::context::{Context, Core};
use crate::core::{Failure, Params, TypeId, Value};
use crate::nodes::{NodeKey, Select, Tracer, Visualizer};
use crate::watch::InvalidationWatcher;

use graph::{Graph, InvalidationResult};
use hashing;
use indexmap::IndexMap;
use log::{debug, info, warn};
use logging::logger::LOGGER;
use parking_lot::Mutex;
use ui::{EngineDisplay, KeyboardCommand};
use watch::Invalidatable;
use workunit_store::WorkUnitStore;

pub enum ExecutionTermination {
Expand Down Expand Up @@ -228,7 +229,7 @@ impl Scheduler {
/// Invalidate the invalidation roots represented by the given Paths.
///
pub fn invalidate(&self, paths: &HashSet<PathBuf>) -> usize {
InvalidationWatcher::invalidate(&self.core.graph, paths, "watchman")
self.core.graph.invalidate(paths, "watchman")
}

///
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/testutil/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn make_file(path: &Path, contents: &[u8], mode: u32) {
file.set_permissions(permissions).unwrap();
}

pub fn append_to_exisiting_file(path: &Path, contents: &[u8]) {
pub fn append_to_existing_file(path: &Path, contents: &[u8]) {
let mut file = std::fs::OpenOptions::new().write(true).open(&path).unwrap();
file.write_all(contents).unwrap();
}
29 changes: 29 additions & 0 deletions src/rust/engine/watch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
version = "0.0.1"
edition = "2018"
name = "watch"
authors = [ "Pants Build <pantsbuild@gmail.com>" ]
publish = false

[dependencies]
crossbeam-channel = "0.3"
fs = { path = "../fs" }
futures = { version = "0.3", features = ["compat"] }
futures-locks = "0.3.0"
futures01 = { package = "futures", version = "0.1" }
graph = { path = "../graph" }
log = "0.4"
logging = { path = "../logging" }
# notify is currently an experimental API, we are pinning to https://docs.rs/notify/5.0.0-pre.1/notify/
# because the latest prerelease at time of writing has removed the debounced watcher which we would like to use.
# The author suggests they will add the debounced watcher back into the stable 5.0.0 release. When that happens
# we can move to it.
notify = { git = "https://github.com/notify-rs/notify", rev = "fba00891d9105e2f581c69fbe415a58cb7966fdd" }
task_executor = { path = "../task_executor" }

[dev-dependencies]
hashing = { path = "../hashing" }
parking_lot = "0.6"
tempfile = "3"
testutil = { path = "../testutil" }
tokio = { version = "0.2", features = ["rt-core", "macros"] }
Loading