Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow intrinsics to have multiple input arguments #9527

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ class EngineTypes(NamedTuple):
dir: TypeId
file: TypeId
link: TypeId
platform: TypeId
multi_platform_process_request: TypeId
process_result: TypeId
coroutine: TypeId
Expand Down Expand Up @@ -962,6 +963,7 @@ def ti(type_obj):
dir=ti(Dir),
file=ti(File),
link=ti(Link),
platform=ti(Platform),
multi_platform_process_request=ti(MultiPlatformExecuteProcessRequest),
process_result=ti(FallibleExecuteProcessResultWithPlatform),
coroutine=ti(CoroutineType),
Expand Down
8 changes: 5 additions & 3 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ mod cffi_externs;

use engine::externs::*;
use engine::{
externs, nodes, Core, ExecutionRequest, ExecutionTermination, Function, Handle, Key, Params,
RootResult, Rule, Scheduler, Session, Tasks, TypeId, Types, Value,
externs, nodes, Core, ExecutionRequest, ExecutionTermination, Function, Handle, Intrinsics, Key,
Params, RootResult, Rule, Scheduler, Session, Tasks, TypeId, Types, Value,
};
use futures::future::{self as future03, TryFutureExt};
use futures01::{future, Future};
Expand Down Expand Up @@ -289,9 +289,10 @@ fn make_core(
let ignore_patterns = ignore_patterns_buf
.to_strings()
.map_err(|err| format!("Failed to decode ignore patterns as UTF8: {:?}", err))?;
let intrinsics = Intrinsics::new(&types);
#[allow(clippy::redundant_closure)] // I couldn't find an easy way to remove this closure.
let mut tasks = with_tasks(tasks_ptr, |tasks| tasks.clone());
tasks.intrinsics_set(&types);
tasks.intrinsics_set(&intrinsics);
// Allocate on the heap via `Box` and return a raw pointer to the boxed value.
let remote_store_servers_vec = remote_store_servers_buf
.to_strings()
Expand Down Expand Up @@ -354,6 +355,7 @@ fn make_core(
root_type_ids,
tasks,
types,
intrinsics,
PathBuf::from(build_root_buf.to_os_string()),
&ignore_patterns,
use_gitignore,
Expand Down
6 changes: 5 additions & 1 deletion src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures01::Future;

use crate::core::{Failure, TypeId};
use crate::handles::maybe_drop_handles;
use crate::intrinsics::Intrinsics;
use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::tasks::{Rule, Tasks};
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct Core {
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
pub intrinsics: Intrinsics,
pub runtime: Runtime,
pub executor: task_executor::Executor,
store: Store,
Expand All @@ -63,6 +65,7 @@ impl Core {
root_subject_types: Vec<TypeId>,
tasks: Tasks,
types: Types,
intrinsics: Intrinsics,
build_root: PathBuf,
ignore_patterns: &[String],
use_gitignore: bool,
Expand Down Expand Up @@ -272,7 +275,8 @@ impl Core {
graph,
tasks,
rule_graph,
types: types,
types,
intrinsics,
runtime,
executor: executor.clone(),
store,
Expand Down
198 changes: 137 additions & 61 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,131 @@
use crate::context::Context;
use crate::core::{throw, TypeId, Value};
use crate::core::{throw, Value};
use crate::externs;
use crate::nodes::MultiPlatformExecuteProcess;
use crate::nodes::{lift_digest, DownloadedFile, NodeFuture, Snapshot};
use crate::tasks::Intrinsic;
use crate::types::Types;

use boxfuture::Boxable;
use bytes;
use futures::future::{self as future03, TryFutureExt};
use futures01::{future, Future};
use hashing;
use indexmap::IndexMap;

use std::path::PathBuf;

pub fn run_intrinsic(
input: TypeId,
product: TypeId,
context: Context,
value: Value,
) -> NodeFuture<Value> {
let types = &context.core.types;
if product == types.process_result && input == types.multi_platform_process_request {
multi_platform_process_request_to_process_result(context, value)
} else if product == types.files_content && input == types.directory_digest {
directory_digest_to_files_content(context, value)
} else if product == types.directory_digest && input == types.directory_with_prefix_to_strip {
directory_with_prefix_to_strip_to_digest(context, value)
} else if product == types.directory_digest && input == types.directory_with_prefix_to_add {
directory_with_prefix_to_add_to_digest(context, value)
} else if product == types.snapshot && input == types.directory_digest {
digest_to_snapshot(context, value)
} else if product == types.directory_digest && input == types.directories_to_merge {
directories_to_merge_to_digest(context, value)
} else if product == types.snapshot && input == types.url_to_fetch {
url_to_fetch_to_snapshot(context, value)
} else if product == types.snapshot && input == types.path_globs {
path_globs_to_snapshot(context, value)
} else if product == types.directory_digest && input == types.input_files_content {
input_files_content_to_digest(context, value)
} else if product == types.snapshot && input == types.snapshot_subset {
snapshot_subset_to_snapshot(context, value)
} else {
panic!("Unrecognized intrinsic: {:?} -> {:?}", input, product)
type IntrinsicFn = Box<dyn Fn(Context, Vec<Value>) -> NodeFuture<Value> + Send + Sync + 'static>;

pub struct Intrinsics {
intrinsics: IndexMap<Intrinsic, IntrinsicFn>,
}

impl Intrinsics {
pub fn new(types: &Types) -> Intrinsics {
let mut intrinsics: IndexMap<Intrinsic, IntrinsicFn> = IndexMap::new();
intrinsics.insert(
Intrinsic {
product: types.directory_digest,
inputs: vec![types.input_files_content],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to be able to tie the inputs length to the length on the values vec expected by the intrinsic function. The &args[0], &args[1] look scary in isolation (what if there are 3 input types and the function evolves to only using 2 or needing 4. There is and will be a small enough set of intrinsics that just making sure these align by hand is probably good enough.

Copy link
Member Author

@stuhood stuhood Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is and will be a small enough set of intrinsics that just making sure these align by hand is probably good enough.

Yea. For better or worse, these have evolved away from a large switch statement and toward dynamic dispatch like this, because it makes the callsite clearer. It's possible that moving back toward the switch statement is feasible, but for now I think that the fact these are now isolated to one file makes this lack of type safety manageable.

},
Box::new(input_files_content_to_digest),
);
intrinsics.insert(
Intrinsic {
product: types.snapshot,
inputs: vec![types.path_globs],
},
Box::new(path_globs_to_snapshot),
);
intrinsics.insert(
Intrinsic {
product: types.snapshot,
inputs: vec![types.url_to_fetch],
},
Box::new(url_to_fetch_to_snapshot),
);
intrinsics.insert(
Intrinsic {
product: types.snapshot,
inputs: vec![types.directory_digest],
},
Box::new(digest_to_snapshot),
);
intrinsics.insert(
Intrinsic {
product: types.files_content,
inputs: vec![types.directory_digest],
},
Box::new(directory_digest_to_files_content),
);
intrinsics.insert(
Intrinsic {
product: types.directory_digest,
inputs: vec![types.directories_to_merge],
},
Box::new(directories_to_merge_to_digest),
);
intrinsics.insert(
Intrinsic {
product: types.directory_digest,
inputs: vec![types.directory_with_prefix_to_strip],
},
Box::new(directory_with_prefix_to_strip_to_digest),
);
intrinsics.insert(
Intrinsic {
product: types.directory_digest,
inputs: vec![types.directory_with_prefix_to_add],
},
Box::new(directory_with_prefix_to_add_to_digest),
);
intrinsics.insert(
Intrinsic {
product: types.process_result,
inputs: vec![types.multi_platform_process_request, types.platform],
},
Box::new(multi_platform_process_request_to_process_result),
);
intrinsics.insert(
Intrinsic {
product: types.snapshot,
inputs: vec![types.snapshot_subset],
},
Box::new(snapshot_subset_to_snapshot),
);
Intrinsics { intrinsics }
}

pub fn keys(&self) -> impl Iterator<Item = &Intrinsic> {
self.intrinsics.keys()
}

pub fn run(&self, intrinsic: Intrinsic, context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let function = self
.intrinsics
.get(&intrinsic)
.unwrap_or_else(|| panic!("Unrecognized intrinsic: {:?}", intrinsic));
function(context, args)
}
}

fn multi_platform_process_request_to_process_result(
context: Context,
value: Value,
args: Vec<Value>,
) -> NodeFuture<Value> {
let process_val = &args[0];
// TODO: The platform will be used in a followup.
let _platform_val = &args[1];
let core = context.core.clone();
future::result(MultiPlatformExecuteProcess::lift(&value).map_err(|str| {
throw(&format!(
"Error lifting MultiPlatformExecuteProcess: {}",
str
))
}))
future::result(
MultiPlatformExecuteProcess::lift(process_val).map_err(|str| {
throw(&format!(
"Error lifting MultiPlatformExecuteProcess: {}",
str
))
}),
)
.and_then(move |process_request| context.get(process_request))
.map(move |result| {
let platform_name: String = result.0.platform.into();
Expand All @@ -73,11 +146,8 @@ fn multi_platform_process_request_to_process_result(
.to_boxed()
}

fn directory_digest_to_files_content(
context: Context,
directory_digest_val: Value,
) -> NodeFuture<Value> {
future::result(lift_digest(&directory_digest_val).map_err(|str| throw(&str)))
fn directory_digest_to_files_content(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
future::result(lift_digest(&args[0]).map_err(|str| throw(&str)))
.and_then(move |digest| {
context
.core
Expand All @@ -89,15 +159,18 @@ fn directory_digest_to_files_content(
.to_boxed()
}

fn directory_with_prefix_to_strip_to_digest(context: Context, request: Value) -> NodeFuture<Value> {
fn directory_with_prefix_to_strip_to_digest(
context: Context,
args: Vec<Value>,
) -> NodeFuture<Value> {
let core = context.core;

Box::pin(async move {
let input_digest = lift_digest(&externs::project_ignoring_type(
&request,
&args[0],
"directory_digest",
))?;
let prefix = externs::project_str(&request, "prefix");
let prefix = externs::project_str(&args[0], "prefix");
let digest =
store::Snapshot::strip_prefix(core.store(), input_digest, PathBuf::from(prefix)).await?;
let res: Result<_, String> = Ok(Snapshot::store_directory(&core, &digest));
Expand All @@ -108,15 +181,15 @@ fn directory_with_prefix_to_strip_to_digest(context: Context, request: Value) ->
.to_boxed()
}

fn directory_with_prefix_to_add_to_digest(context: Context, request: Value) -> NodeFuture<Value> {
fn directory_with_prefix_to_add_to_digest(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let core = context.core;
Box::pin(async move {
let input_digest = lift_digest(&externs::project_ignoring_type(
&request,
&args[0],
"directory_digest",
))?;

let prefix = externs::project_str(&request, "prefix");
let prefix = externs::project_str(&args[0], "prefix");
let digest =
store::Snapshot::add_prefix(core.store(), input_digest, PathBuf::from(prefix)).await?;
let res: Result<_, String> = Ok(Snapshot::store_directory(&core, &digest));
Expand All @@ -127,11 +200,11 @@ fn directory_with_prefix_to_add_to_digest(context: Context, request: Value) -> N
.to_boxed()
}

fn digest_to_snapshot(context: Context, directory_digest_val: Value) -> NodeFuture<Value> {
fn digest_to_snapshot(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let core = context.core.clone();
let store = context.core.store();
Box::pin(async move {
let digest = lift_digest(&directory_digest_val)?;
let digest = lift_digest(&args[0])?;
let snapshot = store::Snapshot::from_digest(store, digest).await?;
let res: Result<_, String> = Ok(Snapshot::store_snapshot(&core, &snapshot));
res
Expand All @@ -141,10 +214,10 @@ fn digest_to_snapshot(context: Context, directory_digest_val: Value) -> NodeFutu
.to_boxed()
}

fn directories_to_merge_to_digest(context: Context, request: Value) -> NodeFuture<Value> {
fn directories_to_merge_to_digest(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let core = context.core;
let digests: Result<Vec<hashing::Digest>, String> =
externs::project_multi(&request, "directories")
externs::project_multi(&args[0], "directories")
.into_iter()
.map(|val| lift_digest(&val))
.collect();
Expand All @@ -158,24 +231,24 @@ fn directories_to_merge_to_digest(context: Context, request: Value) -> NodeFutur
.to_boxed()
}

fn url_to_fetch_to_snapshot(context: Context, val: Value) -> NodeFuture<Value> {
fn url_to_fetch_to_snapshot(context: Context, mut args: Vec<Value>) -> NodeFuture<Value> {
let core = context.core.clone();
context
.get(DownloadedFile(externs::key_for(val)))
.get(DownloadedFile(externs::key_for(args.pop().unwrap())))
.map(move |snapshot| Snapshot::store_snapshot(&core, &snapshot))
.to_boxed()
}

fn path_globs_to_snapshot(context: Context, val: Value) -> NodeFuture<Value> {
fn path_globs_to_snapshot(context: Context, mut args: Vec<Value>) -> NodeFuture<Value> {
let core = context.core.clone();
context
.get(Snapshot(externs::key_for(val)))
.get(Snapshot(externs::key_for(args.pop().unwrap())))
.map(move |snapshot| Snapshot::store_snapshot(&core, &snapshot))
.to_boxed()
}

fn input_files_content_to_digest(context: Context, files_content: Value) -> NodeFuture<Value> {
let file_values = externs::project_multi(&files_content, "dependencies");
fn input_files_content_to_digest(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let file_values = externs::project_multi(&args[0], "dependencies");
let digests: Vec<_> = file_values
.iter()
.map(|file| {
Expand Down Expand Up @@ -207,13 +280,16 @@ fn input_files_content_to_digest(context: Context, files_content: Value) -> Node
.to_boxed()
}

fn snapshot_subset_to_snapshot(context: Context, value: Value) -> NodeFuture<Value> {
let globs = externs::project_ignoring_type(&value, "globs");
fn snapshot_subset_to_snapshot(context: Context, args: Vec<Value>) -> NodeFuture<Value> {
let globs = externs::project_ignoring_type(&args[0], "globs");
let store = context.core.store();

Box::pin(async move {
let path_globs = Snapshot::lift_path_globs(&globs)?;
let original_digest = lift_digest(&externs::project_ignoring_type(&value, "directory_digest"))?;
let original_digest = lift_digest(&externs::project_ignoring_type(
&args[0],
"directory_digest",
))?;

let snapshot = store::Snapshot::get_snapshot_subset(store, original_digest, path_globs).await?;

Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod watch;
pub use crate::context::Core;
pub use crate::core::{Function, Key, Params, TypeId, Value};
pub use crate::handles::Handle;
pub use crate::intrinsics::Intrinsics;
pub use crate::scheduler::{
ExecutionRequest, ExecutionTermination, RootResult, Scheduler, Session,
};
Expand Down
Loading