Skip to content

Commit

Permalink
wip on reviving pantsbuild#10954, superceding pantsbuild#17745.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaos committed Jan 2, 2023
1 parent 59d7805 commit 8306f4c
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 22 deletions.
34 changes: 34 additions & 0 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import itertools
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -21,6 +22,7 @@
Digest,
DigestContents,
FileContent,
MergeDigests,
Snapshot,
)
from pants.engine.internals.engine_testutil import (
Expand Down Expand Up @@ -1006,3 +1008,35 @@ async def for_member() -> str:
)

assert "yep" == rule_runner.request(str, [])


@dataclass(frozen=True)
class FileInput:
filename: str


@dataclass(frozen=True)
class MergedOutput:
digest: Digest


@rule
async def catch_merge_digests_error(file_input: FileInput) -> MergedOutput:
# Create two separate digests writing different contents to the same file path.
input_1 = CreateDigest((FileContent(path=file_input.filename, content=b"yes"),))
input_2 = CreateDigest((FileContent(path=file_input.filename, content=b"no"),))
digests = await MultiGet(Get(Digest, CreateDigest, input_1), Get(Digest, CreateDigest, input_2))
try:
merged = await Get(Digest, MergeDigests(digests))
except Exception as e:
raise Exception(f"error merging digests for input {file_input}: {e}")
return MergedOutput(merged)


def test_catch_intrinsic_error() -> None:
rule_runner = RuleRunner(
rules=[catch_merge_digests_error, QueryRule(MergedOutput, (FileInput,))]
)
msg = re.escape("error merging digests for input FileInput(filename='some-file.txt')")
with pytest.raises(ExecutionError, match=msg):
rule_runner.request(MergedOutput, (FileInput("some-file.txt"),))
3 changes: 3 additions & 0 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ class PyGeneratorResponseGet(Generic[_Output]):
class PyGeneratorResponseGetMulti:
def __init__(self, gets: tuple[PyGeneratorResponseGet, ...]) -> None: ...

class PyGeneratorResponseThrow:
def __init__(self, err: Exception) -> None: ...

# ------------------------------------------------------------------------------
# (uncategorized)
# ------------------------------------------------------------------------------
Expand Down
40 changes: 33 additions & 7 deletions src/python/pants/engine/internals/selectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import ast
import itertools

# XXX from traceback import TracebackException
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Generator,
Generic,
Iterable,
Sequence,
Tuple,
TypeVar,
Union,
cast,
overload,
)
Expand All @@ -23,6 +27,7 @@
PyGeneratorResponseBreak,
PyGeneratorResponseGet,
PyGeneratorResponseGetMulti,
PyGeneratorResponseThrow,
)
from pants.util.meta import frozen_after_init
from pants.util.strutil import softwrap
Expand Down Expand Up @@ -590,11 +595,26 @@ def __init__(self, *args: Any) -> None:
self.params = tuple(args)


# A specification for how the native engine interacts with @rule coroutines:
# - coroutines may await on any of `Get`, `MultiGet`, `Effect` or other coroutines decorated with `@rule_helper`.
# - we will send back a single `Any` or a tuple of `Any` to the coroutine, depending upon the variant of `Get`.
# - a coroutine will eventually return a single `Any`.
RuleSend = Union[Any, Tuple[Any, ...]]
RuleYield = Union[Get, Tuple[Get, ...]]
RuleCoroutine = Coroutine[RuleYield, RuleSend, Any]
NativeEngineGeneratorResponse = Union[
PyGeneratorResponseGet,
PyGeneratorResponseGetMulti,
PyGeneratorResponseBreak,
PyGeneratorResponseThrow,
]


def native_engine_generator_send(
func, arg
) -> PyGeneratorResponseGet | PyGeneratorResponseGetMulti | PyGeneratorResponseBreak:
func: RuleCoroutine, arg: RuleSend | None, err: Exception | None
) -> NativeEngineGeneratorResponse:
try:
res = func.send(arg)
res = func.throw(err) if err is not None else func.send(arg)
# It isn't necessary to differentiate between `Get` and `Effect` here, as the static
# analysis of `@rule`s has already validated usage.
if isinstance(res, (Get, Effect)):
Expand All @@ -604,8 +624,14 @@ def native_engine_generator_send(
else:
raise ValueError(f"internal engine error: unrecognized coroutine result {res}")
except StopIteration as e:
if not e.args:
raise
# This was a `return` from a coroutine, as opposed to a `StopIteration` raised
# by calling `next()` on an empty iterator.
return PyGeneratorResponseBreak(e.value)
except Exception as e:
# XXX
# Read the stack, and place it into an attribute on the exception object. This is
# consumed within the rust code to produce a combined stacktrace in cases where an
# exception was raised while trying to handle another exception.
# wrapped = TracebackException.from_exception(e)
# formatted_stack = list(wrapped.format())
# e._formatted_stack = formatted_stack # type: ignore[attr-defined]

return PyGeneratorResponseThrow(e)
3 changes: 2 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> {
m.add_class::<externs::PyGeneratorResponseBreak>()?;
m.add_class::<externs::PyGeneratorResponseGet>()?;
m.add_class::<externs::PyGeneratorResponseGetMulti>()?;
m.add_class::<externs::PyGeneratorResponseThrow>()?;

m.add_function(wrap_pyfunction!(stdio_initialize, m)?)?;
m.add_function(wrap_pyfunction!(stdio_thread_console_set, m)?)?;
Expand Down Expand Up @@ -496,7 +497,7 @@ fn py_result_from_root(py: Python, result: Result<Value, Failure>) -> PyResult {
let msg = format!("{}", f);
let python_traceback = Failure::native_traceback(&msg);
(
externs::create_exception(py, msg),
externs::create_exception(py, msg), // XXX KAOS TODO :: PR -> `externs::create_value_error(py, msg)`
python_traceback,
Vec::new(),
)
Expand Down
62 changes: 60 additions & 2 deletions src/rust/engine/src/externs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ pub fn create_exception(py: Python, msg: String) -> Value {
Value::new(PyException::new_err(msg).into_py(py))
}

// pub fn into_value_result(py_result: PyResult<&PyAny>) -> Result<Value, Failure> {
// match py_result {
// Ok(obj) => Ok(Value::from(obj)),
// Err(err) => Err(Failure::from_py_err(err)),
// }
// }

pub fn call_function<'py>(func: &'py PyAny, args: &[Value]) -> PyResult<&'py PyAny> {
let args: Vec<PyObject> = args.iter().map(|v| v.clone().into()).collect();
let args_tuple = PyTuple::new(func.py(), &args);
Expand All @@ -238,12 +245,21 @@ pub fn call_function<'py>(func: &'py PyAny, args: &[Value]) -> PyResult<&'py PyA
pub fn generator_send(
py: Python,
generator: &Value,
arg: &Value,
arg: Option<Value>,
err: Option<Value>,
) -> Result<GeneratorResponse, Failure> {
let selectors = py.import("pants.engine.internals.selectors").unwrap();
let native_engine_generator_send = selectors.getattr("native_engine_generator_send").unwrap();
let py_arg = match arg {
Some(arg) => arg.to_object(py),
None => py.None(),
};
let py_err = match err {
Some(err) => err.to_object(py),
None => py.None(),
};
let response = native_engine_generator_send
.call1((generator.to_object(py), arg.to_object(py)))
.call1((generator.to_object(py), py_arg, py_err))
.map_err(|py_err| Failure::from_py_err_with_gil(py, py_err))?;

if let Ok(b) = response.extract::<PyRef<PyGeneratorResponseBreak>>() {
Expand All @@ -267,6 +283,36 @@ pub fn generator_send(
})
.collect::<Result<Vec<_>, _>>()?;
Ok(GeneratorResponse::GetMulti(gets))
} else if let Ok(throw) = response.extract::<PyRef<PyGeneratorResponseThrow>>() {
Err(Failure::from_py_err_with_gil(
py,
PyErr::from_value(throw.0.as_ref(py)),
)) // XXX
// let new_err_val = Value::new(throw.0.clone_ref(py));
// match err {
// Some(err) => {
// // If this is the same error that we previously sent, then just return the previous error to
// // preserve the stacktraces.
// let err_is_same_as_last_time = err
// .as_py_err()
// .map(|previous_err_val| previous_err_val == &new_err_val)
// .unwrap_or(false);
// if err_is_same_as_last_time {
// // Ok(GeneratorResponse::Throw(err))
// Err(err)
// } else {
// // Otherwise, the error was handled, but another error was raised in that handling, so we
// // create a new Failure instance, and join the tracebacks.
// let new_failure = Failure::from_py_err_with_gil(py, throw.0.clone_ref(py));
// // let joined_failure = err.join_tracebacks(new_failure);
// // Ok(GeneratorResponse::Throw(joined_failure))

// Err(new_failure) // XXX
// }
// }
// // We didn't have an error before, but we do now, so just return a new Failure instance.
// None => Err(Failure::from_py_err_with_gil(py, throw.0.clone_ref(py))), // XXX
// }
} else {
panic!(
"native_engine_generator_send returned unrecognized type: {:?}",
Expand Down Expand Up @@ -484,6 +530,17 @@ impl PyGeneratorResponseGetMulti {
}
}

#[pyclass]
pub struct PyGeneratorResponseThrow(PyObject);

#[pymethods]
impl PyGeneratorResponseThrow {
#[new]
fn __new__(val: PyObject) -> Self {
Self(val)
}
}

#[derive(Debug)]
pub struct Get {
pub output: TypeId,
Expand Down Expand Up @@ -516,4 +573,5 @@ pub enum GeneratorResponse {
Break(Value, TypeId),
Get(Get),
GetMulti(Vec<Get>),
// Throw(Failure), XXX
}
2 changes: 1 addition & 1 deletion src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ fn digest_subset_to_digest(
}

fn session_values(context: Context, _args: Vec<Value>) -> BoxFuture<'static, NodeResult<Value>> {
async move { context.get(SessionValues).await? }.boxed()
async move { context.get(SessionValues).await }.boxed()
}

fn run_id(context: Context, _args: Vec<Value>) -> BoxFuture<'static, NodeResult<Value>> {
Expand Down
40 changes: 29 additions & 11 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ impl Task {

///
/// Given a python generator Value, loop to request the generator's dependencies until
/// it completes with a result Value.
/// it completes with a result Value or fails with an error.
///
async fn generate(
context: &Context,
Expand All @@ -1137,26 +1137,44 @@ impl Task {
generator: Value,
) -> NodeResult<(Value, TypeId)> {
let mut input: Option<Value> = None;
let mut err: Option<Value> = None;
loop {
let context = context.clone();
let params = params.clone();
let response = Python::with_gil(|py| {
let input = input.unwrap_or_else(|| Value::from(py.None()));
externs::generator_send(py, &generator, &input)
})?;
let response = Python::with_gil(|py| externs::generator_send(py, &generator, input, err))?;
match response {
externs::GeneratorResponse::Get(get) => {
let values = Self::gen_get(&context, workunit, &params, entry, vec![get]).await?;
input = Some(values.into_iter().next().unwrap());
let result = Self::gen_get(&context, workunit, &params, entry, vec![get]).await;
match result {
Ok(values) => {
input = Some(values.into_iter().next().unwrap());
err = None;
}
Err(Failure::Throw { val, .. }) => {
input = None;
err = Some(val); // XXX TODO preserve stack traces.. etc
}
Err(Failure::Invalidated) | Err(Failure::MissingDigest(_, _)) => todo!(),
}
}
externs::GeneratorResponse::GetMulti(gets) => {
let values = Self::gen_get(&context, workunit, &params, entry, gets).await?;
let gil = Python::acquire_gil();
input = Some(externs::store_tuple(gil.python(), values));
let result = Self::gen_get(&context, workunit, &params, entry, gets).await;
match result {
Ok(values) => {
let gil = Python::acquire_gil();
input = Some(externs::store_tuple(gil.python(), values));
err = None;
}
Err(Failure::Throw { val, .. }) => {
input = None;
err = Some(val); // XXX TODO preserve stack traces.. etc
}
Err(Failure::Invalidated) | Err(Failure::MissingDigest(_, _)) => todo!(),
}
}
externs::GeneratorResponse::Break(val, type_id) => {
break Ok((val, type_id));
}
} // externs::GeneratorResponse::Throw(err) => break Err(err), // XXX
}
}
}
Expand Down

0 comments on commit 8306f4c

Please sign in to comment.