Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an API to coarsen/partition Targets by their cycles #12251

Merged
merged 6 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 101 additions & 18 deletions src/python/pants/engine/internals/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
Snapshot,
SpecsSnapshot,
)
from pants.engine.internals import native_engine
from pants.engine.internals.target_adaptor import TargetAdaptor
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
CoarsenedTarget,
CoarsenedTargets,
Dependencies,
DependenciesRequest,
ExplicitlyProvidedDependencies,
Expand Down Expand Up @@ -77,6 +80,7 @@
from pants.option.global_options import GlobalOptions, OwnersNotFoundBehavior
from pants.source.filespec import matches_filespec
from pants.util.docutil import doc_url
from pants.util.frozendict import FrozenDict
from pants.util.logging import LogLevel
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet

Expand Down Expand Up @@ -238,29 +242,58 @@ def visit(address: Address):
)


@rule(desc="Resolve transitive targets")
async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTargets:
"""Find all the targets transitively depended upon by the target roots.
@dataclass(frozen=True)
class _DependencyMappingRequest:
tt_request: TransitiveTargetsRequest
expanded_targets: bool


@dataclass(frozen=True)
class _DependencyMapping:
mapping: FrozenDict[Address, Tuple[Address, ...]]
visited: FrozenOrderedSet[Target]
roots_as_targets: Collection[Target]


@rule
async def transitive_dependency_mapping(request: _DependencyMappingRequest) -> _DependencyMapping:
"""This uses iteration, rather than recursion, so that we can tolerate dependency cycles.

This uses iteration, rather than recursion, so that we can tolerate dependency cycles. Unlike a
traditional BFS algorithm, we batch each round of traversals via `MultiGet` for improved
performance / concurrency.
Unlike a traditional BFS algorithm, we batch each round of traversals via `MultiGet` for
improved performance / concurrency.
"""
roots_as_targets = await Get(Targets, Addresses(request.roots))
roots_as_targets: Collection[Target]
if request.expanded_targets:
roots_as_targets = await Get(Targets, Addresses(request.tt_request.roots))
else:
roots_as_targets = await Get(UnexpandedTargets, Addresses(request.tt_request.roots))
visited: OrderedSet[Target] = OrderedSet()
queued = FrozenOrderedSet(roots_as_targets)
dependency_mapping: Dict[Address, Tuple[Address, ...]] = {}
while queued:
direct_dependencies = await MultiGet(
Get(
Targets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.include_special_cased_deps,
),
direct_dependencies: Tuple[Collection[Target], ...]
if request.expanded_targets:
direct_dependencies = await MultiGet(
Get(
Targets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.tt_request.include_special_cased_deps,
),
)
for tgt in queued
)
else:
direct_dependencies = await MultiGet(
Get(
UnexpandedTargets,
DependenciesRequest(
tgt.get(Dependencies),
include_special_cased_deps=request.tt_request.include_special_cased_deps,
),
)
for tgt in queued
)
for tgt in queued
)

dependency_mapping.update(
zip(
Expand All @@ -278,11 +311,21 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
return _DependencyMapping(
FrozenDict(dependency_mapping), FrozenOrderedSet(visited), roots_as_targets
)


@rule(desc="Resolve transitive targets")
async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTargets:
"""Find all the targets transitively depended upon by the target roots."""

dependency_mapping = await Get(_DependencyMapping, _DependencyMappingRequest(request, True))

# Apply any transitive excludes (`!!` ignores).
transitive_excludes: FrozenOrderedSet[Target] = FrozenOrderedSet()
unevaluated_transitive_excludes = []
for t in (*roots_as_targets, *visited):
for t in (*dependency_mapping.roots_as_targets, *dependency_mapping.visited):
unparsed = t.get(Dependencies).unevaluated_transitive_excludes
if unparsed.values:
unevaluated_transitive_excludes.append(unparsed)
Expand All @@ -296,8 +339,48 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
)

return TransitiveTargets(
tuple(roots_as_targets), FrozenOrderedSet(visited.difference(transitive_excludes))
tuple(dependency_mapping.roots_as_targets),
FrozenOrderedSet(dependency_mapping.visited.difference(transitive_excludes)),
)


# -----------------------------------------------------------------------------------------------
# CoarsenedTargets
# -----------------------------------------------------------------------------------------------


@rule
async def coarsened_targets(addresses: Addresses) -> CoarsenedTargets:
dependency_mapping = await Get(
_DependencyMapping,
_DependencyMappingRequest(
# NB: We set include_special_cased_deps=True because although computing CoarsenedTargets
# requires a transitive graph walk (to ensure that all cycles are actually detected),
# the resulting CoarsenedTargets instance is not itself transitive: everything not directly
# involved in a cycle with one of the input Addresses is discarded in the output.
TransitiveTargetsRequest(addresses, include_special_cased_deps=True),
expanded_targets=False,
),
)
components = native_engine.strongly_connected_components(
list(dependency_mapping.mapping.items())
)

addresses_set = set(addresses)
addresses_to_targets = {
t.address: t for t in [*dependency_mapping.visited, *dependency_mapping.roots_as_targets]
}
targets = []
for component in components:
if not any(component_address in addresses_set for component_address in component):
continue
component_set = set(component)
members = tuple(addresses_to_targets[a] for a in component)
dependencies = FrozenOrderedSet(
[d for a in component for d in dependency_mapping.mapping[a] if d not in component_set]
)
targets.append(CoarsenedTarget(members, dependencies))
return CoarsenedTargets(targets)


# -----------------------------------------------------------------------------------------------
Expand Down
77 changes: 77 additions & 0 deletions src/python/pants/engine/internals/graph_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from pants.engine.rules import Get, MultiGet, rule
from pants.engine.target import (
AsyncFieldMixin,
CoarsenedTargets,
Dependencies,
DependenciesRequest,
ExplicitlyProvidedDependencies,
Expand Down Expand Up @@ -97,6 +98,7 @@ class MockTarget(Target):
def transitive_targets_rule_runner() -> RuleRunner:
return RuleRunner(
rules=[
QueryRule(CoarsenedTargets, (Addresses,)),
QueryRule(Targets, (DependenciesRequest,)),
QueryRule(TransitiveTargets, (TransitiveTargetsRequest,)),
],
Expand Down Expand Up @@ -306,6 +308,81 @@ def test_transitive_targets_tolerates_subtarget_cycles(
]


def test_coarsened_targets(transitive_targets_rule_runner: RuleRunner) -> None:
"""CoarsenedTargets should "coarsen" a cycle into a single CoarsenedTarget instance.

Cycles are only allowed for file targets, so we use explicit file dependencies in this test.
"""
transitive_targets_rule_runner.create_files("", ["dep.txt", "t1.txt", "t2.txt"])
transitive_targets_rule_runner.add_to_build_file(
"",
dedent(
"""\
target(name='dep', sources=['dep.txt'])
target(name='t1', sources=['t1.txt'], dependencies=['dep.txt:dep', 't2.txt:t2'])
target(name='t2', sources=['t2.txt'], dependencies=['t1.txt:t1'])
"""
),
)

def assert_coarsened(
a: Address, expected_members: List[Address], expected_dependencies: List[Address]
) -> None:
coarsened_targets = transitive_targets_rule_runner.request(
CoarsenedTargets,
[Addresses([a])],
)
assert list(sorted(t.address for t in coarsened_targets[0].members)) == expected_members
assert list(sorted(d for d in coarsened_targets[0].dependencies)) == expected_dependencies

# BUILD targets are never involved in cycles, and so always coarsen to themselves.
assert_coarsened(
Address("", target_name="dep"),
[Address("", target_name="dep")],
[Address("", relative_file_path="dep.txt", target_name="dep")],
)
assert_coarsened(
Address("", target_name="t1"),
[Address("", target_name="t1")],
[
Address("", relative_file_path="dep.txt", target_name="dep"),
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
],
)
assert_coarsened(
Address("", target_name="t2"),
[Address("", target_name="t2")],
[
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
],
)

# As will file targets not involved in cycles.
assert_coarsened(
Address("", relative_file_path="dep.txt", target_name="dep"),
[Address("", relative_file_path="dep.txt", target_name="dep")],
[],
)

# But file targets involved in cycles will coarsen to the cycle, and have only dependencies outside of the cycle.
cycle_files = [
Address("", relative_file_path="t1.txt", target_name="t1"),
Address("", relative_file_path="t2.txt", target_name="t2"),
]
assert_coarsened(
Address("", relative_file_path="t1.txt", target_name="t1"),
cycle_files,
[Address("", relative_file_path="dep.txt", target_name="dep")],
)
assert_coarsened(
Address("", relative_file_path="t2.txt", target_name="t2"),
cycle_files,
[Address("", relative_file_path="dep.txt", target_name="dep")],
)


def assert_failed_cycle(
rule_runner: RuleRunner,
*,
Expand Down
5 changes: 4 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from io import RawIOBase
from typing import Any, Sequence, TextIO
from typing import Any, Sequence, TextIO, Tuple

from typing_extensions import Protocol

Expand Down Expand Up @@ -139,6 +139,9 @@ def rule_subgraph_visualize(
) -> None: ...
def garbage_collect_store(scheduler: PyScheduler, target_size_bytes: int) -> None: ...
def lease_files_in_graph(scheduler: PyScheduler, session: PySession) -> None: ...
def strongly_connected_components(
adjacency_lists: Sequence[Tuple[Any, Sequence[Any]]]
) -> Sequence[Sequence[Any]]: ...

class PyDigest:
def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ...
Expand Down
17 changes: 17 additions & 0 deletions src/python/pants/engine/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,23 @@ def expect_single(self) -> Target:
return self[0]


@dataclass(frozen=True)
class CoarsenedTarget:
"""A set of Targets which cyclicly reach one another, and are thus indivisable."""

# The members of the cycle.
members: Tuple[Target, ...]
# The deduped direct (not transitive) dependencies of all Targets in the cycle. Dependencies
# between members of the cycle are excluded.
#
# To expand these dependencies, request `CoarsenedTargets` for them.
dependencies: FrozenOrderedSet[Address]


class CoarsenedTargets(Collection[CoarsenedTarget]):
"""A set of direct (not transitive) disjoint CoarsenedTarget instances."""


@dataclass(frozen=True)
class TransitiveTargets:
"""A set of Target roots, and their transitive, flattened, de-duped dependencies.
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ logging = { path = "logging" }
nailgun = { path = "nailgun" }
num_enum = "0.4"
parking_lot = "0.11"
petgraph = "0.5"
process_execution = { path = "process_execution" }
rand = "0.8"
regex = "1"
Expand Down
46 changes: 45 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use hashing::Digest;
use log::{self, debug, error, warn, Log};
use logging::logger::PANTS_LOGGER;
use logging::{Logger, PythonLogLevel};
use petgraph::graph::{DiGraph, Graph};
use process_execution::RemoteCacheWarningsBehavior;
use regex::Regex;
use rule_graph::{self, RuleGraph};
Expand All @@ -77,7 +78,7 @@ use workunit_store::{

use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, Failure,
Function, Intrinsics, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
Function, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
Tasks, Types, Value,
};

Expand Down Expand Up @@ -396,6 +397,15 @@ py_module_initializer!(native_engine, |py, m| {
py_fn!(py, ensure_remote_has_recursive(a: PyScheduler, b: PyList)),
)?;

m.add(
py,
"strongly_connected_components",
py_fn!(
py,
strongly_connected_components(a: Vec<(PyObject, Vec<PyObject>)>)
),
)?;

m.add_class::<PyExecutionRequest>(py)?;
m.add_class::<PyExecutionStrategyOptions>(py)?;
m.add_class::<PyExecutor>(py)?;
Expand Down Expand Up @@ -830,6 +840,40 @@ fn nailgun_server_await_shutdown(py: Python, nailgun_server_ptr: PyNailgunServer
})
}

fn strongly_connected_components(
py: Python,
adjacency_lists: Vec<(PyObject, Vec<PyObject>)>,
) -> CPyResult<Vec<Vec<PyObject>>> {
Comment on lines +843 to +846
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great to do this w/ PyO3, but I see it depends on key_for which hasn't been ported yet. I can port this in a followup - it's more important we land this to unblock Patrick.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. You can see in an earlier commit where I was attempting to do that too: the issue was that without hash/eq, I'd need to use PyDict rather than the rust native hashmap (not much of a hardship...maybe even more performant, but awkward).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean without hash/eq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyObject itself does not have Eq/Hash implementations, so you need wrapper types around it to add those (we use Value and Key to do that).

let mut graph: DiGraph<Key, (), u32> = Graph::new();
let mut node_ids: HashMap<Key, _> = HashMap::new();

for (node, adjacency_list) in adjacency_lists {
let node_key = externs::key_for(node.clone_ref(py).into())?;
let node_id = *node_ids
.entry(node_key)
.or_insert_with(|| graph.add_node(node_key));
for dependency in adjacency_list {
let dependency_key = externs::key_for(dependency.clone_ref(py).into())?;
let dependency_id = node_ids
.entry(dependency_key)
.or_insert_with(|| graph.add_node(dependency_key));
graph.add_edge(node_id, *dependency_id, ());
}
}

Ok(
petgraph::algo::tarjan_scc(&graph)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love great libraries like this :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.into_iter()
.map(|component| {
component
.into_iter()
.map(|node_id| externs::val_for(&graph[node_id]).consume_into_py_object(py))
.collect::<Vec<_>>()
})
.collect(),
)
}

///
/// Given a set of Tasks and type information, creates a Scheduler.
///
Expand Down