Skip to content

Commit

Permalink
Revert "Add support for cycle-tolerant "weak" Gets (pantsbuild#10230)"
Browse files Browse the repository at this point in the history
This reverts commit 0ad3a56.

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Sep 17, 2020
1 parent 3b0bff5 commit 1451ddd
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 680 deletions.
2 changes: 0 additions & 2 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def generator_send(
res.output_type,
res.input_type,
res.input,
res.weak,
)
elif type(res) in (tuple, list):
# GetMulti.
Expand All @@ -83,7 +82,6 @@ def generator_send(
get.output_type,
get.input_type,
get.input,
get.weak,
)
for get in res
)
Expand Down
22 changes: 1 addition & 21 deletions src/python/pants/engine/internals/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from textwrap import dedent
from typing import Any, FrozenSet
from typing import Any

from pants.engine.internals.engine_testutil import (
assert_equal_with_printing,
Expand Down Expand Up @@ -121,20 +121,6 @@ async def error_msg_test_rule(union_wrapper: UnionWrapper) -> UnionX:
raise AssertionError("The statement above this one should have failed!")


class BooleanDeps(FrozenSet[bool]):
pass


@rule
async def boolean_cycle(key: bool) -> BooleanDeps:
"""A rule with exactly two instances (bool == two keys), which depend on one another weakly."""
deps = {key}
dep = await Get(BooleanDeps, bool, not key, weak=True)
if dep is not None:
deps.update(dep)
return BooleanDeps(deps)


class TypeCheckFailWrapper:
"""This object wraps another object which will be used to demonstrate a type check failure when
the engine processes an `await Get(...)` statement."""
Expand Down Expand Up @@ -198,8 +184,6 @@ def rules(cls):
QueryRule(A, (UnionWrapper,)),
error_msg_test_rule,
QueryRule(UnionX, (UnionWrapper,)),
boolean_cycle,
QueryRule(BooleanDeps, (bool,)),
boolean_and_int,
QueryRule(A, (int, bool)),
)
Expand Down Expand Up @@ -245,10 +229,6 @@ def test_strict_equals(self):
# type of a value in equality.
assert A() == self.request(A, [1, True])

def test_weak_gets(self):
assert {True, False} == set(self.request(BooleanDeps, [True]))
assert {True, False} == set(self.request(BooleanDeps, [False]))

@contextmanager
def _assert_execution_error(self, expected_msg):
with assert_execution_error(self, expected_msg):
Expand Down
13 changes: 1 addition & 12 deletions src/python/pants/engine/internals/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ class Get(GetConstraints, Generic[_Output, _Input]):
infer from the input variable [1]. Likewise, the short form must use inline construction of the
input in order to convey the input type to the engine.
The `weak` parameter is an experimental extension: a "weak" Get will return None rather than the
requested value iff the dependency caused by the Get would create a cycle in the dependency
graph.
[1] The engine needs to determine all rule and Get input and output types statically before
executing any rules. Since Gets are declared inside function bodies, the only way to extract this
information is through a parse of the rule function. The parse analysis is rudimentary and cannot
Expand All @@ -135,9 +131,7 @@ class Get(GetConstraints, Generic[_Output, _Input]):
"""

@overload
def __init__(
self, output_type: Type[_Output], input_arg0: _Input, *, weak: bool = False
) -> None:
def __init__(self, output_type: Type[_Output], input_arg0: _Input) -> None:
...

@overload
Expand All @@ -146,8 +140,6 @@ def __init__(
output_type: Type[_Output],
input_arg0: Type[_Input],
input_arg1: _Input,
*,
weak: bool = False,
) -> None:
...

Expand All @@ -156,15 +148,12 @@ def __init__(
output_type: Type[_Output],
input_arg0: Union[Type[_Input], _Input],
input_arg1: Optional[_Input] = None,
*,
weak: bool = False,
) -> None:
self.output_type = output_type
self.input_type = self._validate_input_type(
input_arg0 if input_arg1 is not None else type(input_arg0)
)
self.input = self._validate_input(input_arg1 if input_arg1 is not None else input_arg0)
self.weak = weak

self._validate_output_type()

Expand Down
117 changes: 57 additions & 60 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ impl RunToken {
fn next(self) -> RunToken {
RunToken(self.0 + 1)
}

///
/// Returns true if "other" is equal to this RunToken, or this RunToken's predecessor.
///
pub fn equals_current_or_previous(&self, other: RunToken) -> bool {
self.0 == other.0 || other.next().0 == self.0
}
}

///
Expand Down Expand Up @@ -59,29 +52,33 @@ impl Generation {
///
/// A result from running a Node.
///
/// If the value is Dirty, the consumer should check whether the dependencies of the Node have the
/// same values as they did when this Node was last run; if so, the value can be re-used
/// (and should be marked "Clean").
///
/// If the value is Uncacheable it may only be consumed in the same Run that produced it, and should
/// be recomputed in a new Run.
///
/// A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as
/// equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for
/// meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty)
/// Node and a Node that would be re-cleaned once per session.
///
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub enum EntryResult<N: Node> {
// The value is Clean, and the consumer can simply use it as-is.
Clean(N::Item),
// If the value is Dirty, the consumer should check whether the dependencies of the Node have the
// same values as they did when this Node was last run; if so, the value can be re-used
// (and should be marked "Clean").
Dirty(N::Item),
// Uncacheable values may only be consumed in the same Session that produced them, and should
// be recomputed in a new Session.
Uncacheable(N::Item, <<N as Node>::Context as NodeContext>::SessionId),
// A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as
// equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for
// meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty)
// Node and a Node that would be re-cleaned once per session.
UncacheableDependencies(N::Item),
Dirty(N::Item),
Uncacheable(N::Item, <<N as Node>::Context as NodeContext>::RunId),
}

impl<N: Node> EntryResult<N> {
fn is_clean(&self, context: &N::Context) -> bool {
match self {
EntryResult::Clean(..) => true,
EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id,
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::Dirty(..) => false,
EntryResult::UncacheableDependencies(..) => false,
}
Expand All @@ -98,7 +95,7 @@ impl<N: Node> EntryResult<N> {
/// currently to clean it).
fn poll_should_wait(&self, context: &N::Context) -> bool {
match self {
EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id,
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::Dirty(..) => false,
EntryResult::UncacheableDependencies(_) | EntryResult::Clean(..) => true,
}
Expand Down Expand Up @@ -299,38 +296,25 @@ impl<N: Node> Entry<N> {
previous_result: Option<EntryResult<N>>,
) -> EntryState<N> {
// Increment the RunToken to uniquely identify this work.
let previous_run_token = run_token;
let run_token = run_token.next();
let context = context_factory.clone_for(entry_id, run_token);
let context = context_factory.clone_for(entry_id);
let node = node.clone();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
trace!(
"Running node {:?} with {:?}. It was: previous_result={:?}",
node,
run_token,
previous_result,
);

context_factory.spawn(async move {
// If we have previous result generations, compare them to all current dependency
// generations (which, if they are dirty, will cause recursive cleaning). If they
// match, we can consider the previous result value to be clean for reuse.
let was_clean = if let Some(previous_dep_generations) = previous_dep_generations {
trace!("Getting deps to attempt to clean {}", node);
match context
.graph()
.dep_generations(entry_id, previous_run_token, &context)
.await
{
match context.graph().dep_generations(entry_id, &context).await {
Ok(ref dep_generations) if dep_generations == &previous_dep_generations => {
trace!("Deps matched: {} is clean.", node);
// Dependencies have not changed: Node is clean.
true
}
_ => {
// If dependency generations mismatched or failed to fetch, indicate that the Node
// should re-run.
trace!("Deps did not match: {} needs to re-run.", node);
// If dependency generations mismatched or failed to fetch, clear its
// dependencies and indicate that it should re-run.
context.graph().clear_deps(entry_id, run_token);
false
}
}
Expand Down Expand Up @@ -427,6 +411,11 @@ impl<N: Node> Entry<N> {
dep_generations,
..
} => {
trace!(
"Re-starting node {:?}. It was: previous_result={:?}",
self.node,
result,
);
assert!(
!result.is_clean(context),
"A clean Node should not reach this point: {:?}",
Expand Down Expand Up @@ -470,14 +459,20 @@ impl<N: Node> Entry<N> {
/// result should be used. This special case exists to avoid 1) cloning the result to call this
/// method, and 2) comparing the current/previous results unnecessarily.
///
/// Takes a &mut InnerGraph to ensure that completing nodes doesn't race with dirtying them.
/// The important relationship being guaranteed here is that if the Graph is calling
/// invalidate_from_roots, it may mark us, or our dependencies, as dirty. We don't want to
/// complete _while_ a batch of nodes are being marked as dirty, and this exclusive access ensures
/// that can't happen.
///
pub(crate) fn complete(
&self,
&mut self,
context: &N::Context,
result_run_token: RunToken,
dep_generations: Vec<Generation>,
result: Option<Result<N::Item, N::Error>>,
has_uncacheable_deps: bool,
has_weak_deps: bool,
_graph: &mut super::InnerGraph<N>,
) {
let mut state = self.state.lock();

Expand All @@ -492,6 +487,7 @@ impl<N: Node> Entry<N> {
"Not completing node {:?} because it was invalidated.",
self.node
);
return;
}
}

Expand All @@ -517,21 +513,17 @@ impl<N: Node> Entry<N> {
}
Some(Ok(result)) => {
let next_result: EntryResult<N> = if !self.cacheable_with_output(Some(&result)) {
EntryResult::Uncacheable(result, context.session_id().clone())
} else if has_weak_deps {
EntryResult::Dirty(result)
EntryResult::Uncacheable(result, context.run_id().clone())
} else if has_uncacheable_deps {
EntryResult::UncacheableDependencies(result)
} else {
EntryResult::Clean(result)
};
// If the new result does not match the previous result, the generation increments.
if Some(next_result.as_ref()) != previous_result.as_ref().map(EntryResult::as_ref) {
// Node was re-executed (ie not cleaned) and had a different result value.
generation = generation.next()
};
self.notify_waiters(waiters, Ok((next_result.as_ref().clone(), generation)));

EntryState::Completed {
result: next_result,
pollers: Vec::new(),
Expand Down Expand Up @@ -608,7 +600,10 @@ impl<N: Node> Entry<N> {
}

///
/// Get the RunToken of this entry regardless of whether it is running.
/// Get the current RunToken of this entry.
///
/// TODO: Consider moving the Generation and RunToken out of the EntryState once we decide what
/// we want the per-Entry locking strategy to be.
///
pub(crate) fn run_token(&self) -> RunToken {
match *self.state.lock() {
Expand All @@ -619,22 +614,19 @@ impl<N: Node> Entry<N> {
}

///
/// Get the current RunToken of this entry iff it is currently running.
/// Clears the state of this Node, forcing it to be recomputed.
///
pub(crate) fn running_run_token(&self) -> Option<RunToken> {
match *self.state.lock() {
EntryState::Running { run_token, .. } => Some(run_token),
_ => None,
}
}

/// # Arguments
///
/// Clears the state of this Node, forcing it to be recomputed.
/// * `graph_still_contains_edges` - If the caller has guaranteed that all edges from this Node
/// have been removed from the graph, they should pass false here, else true. We may want to
/// remove this parameter, and force this method to remove the edges, but that would require
/// acquiring the graph lock here, which we currently don't do.
///
pub(crate) fn clear(&mut self) {
pub(crate) fn clear(&mut self, graph_still_contains_edges: bool) {
let mut state = self.state.lock();

let (run_token, generation, previous_result) =
let (run_token, generation, mut previous_result) =
match mem::replace(&mut *state, EntryState::initial()) {
EntryState::NotStarted {
run_token,
Expand Down Expand Up @@ -662,8 +654,13 @@ impl<N: Node> Entry<N> {

trace!("Clearing node {:?}", self.node);

// Swap in a state with a new RunToken value, which invalidates any outstanding work and all
// edges for the previous run.
if graph_still_contains_edges {
if let Some(previous_result) = previous_result.as_mut() {
previous_result.dirty();
}
}

// Swap in a state with a new RunToken value, which invalidates any outstanding work.
*state = EntryState::NotStarted {
run_token: run_token.next(),
generation,
Expand Down
Loading

0 comments on commit 1451ddd

Please sign in to comment.