Skip to content

Commit

Permalink
Remove MultiPlatform Process abstractions
Browse files Browse the repository at this point in the history
We don't actually use these anywhere - instead, always operate on a
single platform-specific or platform-independent Process.
  • Loading branch information
illicitonion committed Sep 1, 2021
1 parent 3eb1712 commit acc90ac
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 380 deletions.
12 changes: 4 additions & 8 deletions src/python/pants/backend/python/util_rules/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,7 @@
PathGlobs,
)
from pants.engine.platform import Platform
from pants.engine.process import (
BashBinary,
MultiPlatformProcess,
Process,
ProcessCacheScope,
ProcessResult,
)
from pants.engine.process import BashBinary, Process, ProcessCacheScope, ProcessResult
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.python.python_repos import PythonRepos
from pants.python.python_setup import InvalidLockfileBehavior, PythonSetup
Expand Down Expand Up @@ -559,10 +553,12 @@ async def build_pex_component(
),
)

process = dataclasses.replace(process, platform=platform)

# NB: Building a Pex is platform dependent, so in order to get a PEX that we can use locally
# without cross-building, we specify that our PEX command should be run on the current local
# platform.
result = await Get(ProcessResult, MultiPlatformProcess({platform: process}))
result = await Get(ProcessResult, Process, process)

if pex_runtime_env.verbosity > 0:
log_output = result.stderr.decode()
Expand Down
8 changes: 4 additions & 4 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from pants.engine.internals.scheduler import ExecutionError, SchedulerSession
from pants.engine.internals.scheduler_test_base import SchedulerTestBase
from pants.engine.platform import rules as platform_rules
from pants.engine.process import MultiPlatformProcess, Process, ProcessCacheScope, ProcessResult
from pants.engine.process import Process, ProcessCacheScope, ProcessResult
from pants.engine.process import rules as process_rules
from pants.engine.rules import Get, MultiGet, rule
from pants.engine.streaming_workunit_handler import (
Expand Down Expand Up @@ -611,7 +611,7 @@ async def a_rule() -> TrueResult:
description="always true",
cache_scope=ProcessCacheScope.PER_SESSION,
)
_ = await Get(ProcessResult, MultiPlatformProcess({None: proc}))
_ = await Get(ProcessResult, Process, proc)
return TrueResult()

scheduler, tracker, handler = self._fixture_for_rules(
Expand Down Expand Up @@ -753,7 +753,7 @@ def test_process_digests_on_streaming_workunits(
assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
process_workunit = next(item for item in finished if item["name"] == "process")
assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
stderr_digest = process_workunit["artifacts"]["stderr_digest"]
Expand Down Expand Up @@ -781,7 +781,7 @@ def test_process_digests_on_streaming_workunits(

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
process_workunit = next(item for item in finished if item["name"] == "multi_platform_process")
process_workunit = next(item for item in finished if item["name"] == "process")

assert process_workunit is not None
stdout_digest = process_workunit["artifacts"]["stdout_digest"]
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
FallibleProcessResultWithPlatform,
InteractiveProcess,
InteractiveProcessResult,
MultiPlatformProcess,
Process,
)
from pants.engine.rules import Rule, RuleIndex, TaskRule
from pants.engine.unions import UnionMembership, is_union
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(
digest_subset=DigestSubset,
download_file=DownloadFile,
platform=Platform,
multi_platform_process=MultiPlatformProcess,
process=Process,
process_result=FallibleProcessResultWithPlatform,
coroutine=CoroutineType,
session_values=SessionValues,
Expand Down
39 changes: 5 additions & 34 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Process:
is_nailgunnable: bool
execution_slot_variable: str | None
cache_scope: ProcessCacheScope
platform: str | None

def __init__(
self,
Expand All @@ -87,6 +88,7 @@ def __init__(
is_nailgunnable: bool = False,
execution_slot_variable: str | None = None,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
platform: Platform | None = None,
) -> None:
"""Request to run a subprocess, similar to subprocess.Popen.
Expand Down Expand Up @@ -131,32 +133,7 @@ def __init__(
self.is_nailgunnable = is_nailgunnable
self.execution_slot_variable = execution_slot_variable
self.cache_scope = cache_scope


@frozen_after_init
@dataclass(unsafe_hash=True)
class MultiPlatformProcess:
platform_constraints: tuple[str | None, ...]
processes: Tuple[Process, ...]

def __init__(self, request_dict: dict[Platform | None, Process]) -> None:
if len(request_dict) == 0:
raise ValueError("At least one platform-constrained Process must be passed.")
serialized_constraints = tuple(
constraint.value if constraint else None for constraint in request_dict
)
if len([req.description for req in request_dict.values()]) != 1:
raise ValueError(
f"The `description` of all processes in a {MultiPlatformProcess.__name__} must "
f"be identical, but got: {list(request_dict.values())}."
)

self.platform_constraints = serialized_constraints
self.processes = tuple(request_dict.values())

@property
def product_description(self) -> ProductDescription:
return ProductDescription(self.processes[0].description)
self.platform = platform.value if platform is not None else None


@dataclass(frozen=True)
Expand Down Expand Up @@ -231,14 +208,8 @@ def __init__(


@rule
def get_multi_platform_request_description(req: MultiPlatformProcess) -> ProductDescription:
return req.product_description


@rule
def upcast_process(req: Process) -> MultiPlatformProcess:
"""This rule allows an Process to be run as a platform compatible MultiPlatformProcess."""
return MultiPlatformProcess({None: req})
def get_multi_platform_request_description(req: Process) -> ProductDescription:
return ProductDescription(req.description)


@rule
Expand Down
19 changes: 6 additions & 13 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use workunit_store::{
};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata, ProcessResultSource,
Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope,
ProcessMetadata, ProcessResultSource,
};

#[allow(dead_code)]
Expand Down Expand Up @@ -53,22 +53,15 @@ impl CommandRunner {

#[async_trait]
impl crate::CommandRunner for CommandRunner {
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
self.underlying.extract_compatible_request(req)
}

async fn run(
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let cache_lookup_start = Instant::now();
let write_failures_to_cache = req
.0
.values()
.any(|process| process.cache_scope == ProcessCacheScope::Always);
let digest = crate::digest(req.clone(), &self.metadata);
let write_failures_to_cache = req.cache_scope == ProcessCacheScope::Always;
let digest = crate::digest(&req, &self.metadata);
let key = digest.hash;

let context2 = context.clone();
Expand All @@ -77,7 +70,7 @@ impl crate::CommandRunner for CommandRunner {
"local_cache_read".to_owned(),
WorkunitMetadata {
level: Level::Trace,
desc: Some(format!("Local cache lookup: {}", req.user_facing_name())),
desc: Some(format!("Local cache lookup: {}", req.description)),
..WorkunitMetadata::default()
},
|workunit| async move {
Expand Down
106 changes: 15 additions & 91 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
extern crate derivative;

use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryFrom;
use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -40,7 +40,7 @@ pub use log::Level;
use async_semaphore::AsyncSemaphore;
use async_trait::async_trait;
use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
use hashing::{Digest, EMPTY_FINGERPRINT};
use hashing::Digest;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
use workunit_store::{RunningWorkunit, WorkunitStore};
Expand Down Expand Up @@ -323,51 +323,6 @@ impl Process {
}
}

impl TryFrom<MultiPlatformProcess> for Process {
type Error = String;

fn try_from(req: MultiPlatformProcess) -> Result<Self, Self::Error> {
match req.0.get(&None) {
Some(crossplatform_req) => Ok(crossplatform_req.clone()),
None => Err(String::from(
"Cannot coerce to a simple Process, no cross platform request exists.",
)),
}
}
}

///
/// A container of platform constrained processes.
///
#[derive(Derivative, Clone, Debug, Eq, PartialEq, Hash)]
pub struct MultiPlatformProcess(pub BTreeMap<Option<Platform>, Process>);

impl MultiPlatformProcess {
pub fn user_facing_name(&self) -> String {
self
.0
.iter()
.next()
.map(|(_platforms, process)| process.description.clone())
.unwrap_or_else(|| "<Unnamed process>".to_string())
}

pub fn workunit_level(&self) -> log::Level {
self
.0
.iter()
.next()
.map(|(_platforms, process)| process.level)
.unwrap_or(Level::Info)
}
}

impl From<Process> for MultiPlatformProcess {
fn from(proc: Process) -> Self {
MultiPlatformProcess(vec![(None, proc)].into_iter().collect())
}
}

///
/// Metadata surrounding an Process which factors into its cache key when cached
/// externally from the engine graph (e.g. when using remote execution or an external process
Expand Down Expand Up @@ -526,40 +481,15 @@ pub trait CommandRunner: Send + Sync {
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String>;

///
/// Given a multi platform request which may have some platform
/// constraints determine if any of the requests contained within are compatible
/// with the current command runners platform configuration. If so return the
/// first candidate that will be run if the multi platform request is submitted to
/// `fn run(..)`
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process>;
}

// TODO(#8513) possibly move to the MEPR struct, or to the hashing crate?
pub fn digest(req: MultiPlatformProcess, metadata: &ProcessMetadata) -> Digest {
let mut hashes: Vec<String> = req
.0
.values()
.map(|process| crate::remote::make_execute_request(process, metadata.clone()).unwrap())
.map(|(_a, _b, er)| {
er.action_digest
.map(|d| d.hash)
.unwrap_or_else(|| EMPTY_FINGERPRINT.to_hex())
})
.collect();
hashes.sort();
Digest::of_bytes(
hashes
.iter()
.fold(String::new(), |mut acc, hash| {
acc.push_str(hash);
acc
})
.as_bytes(),
)
pub fn digest(process: &Process, metadata: &ProcessMetadata) -> Digest {
let (_, _, execute_request) =
crate::remote::make_execute_request(process, metadata.clone()).unwrap();
execute_request.action_digest.unwrap().try_into().unwrap()
}

///
Expand All @@ -584,7 +514,7 @@ impl CommandRunner for BoundedCommandRunner {
&self,
context: Context,
workunit: &mut RunningWorkunit,
mut req: MultiPlatformProcess,
mut process: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let semaphore = self.inner.1.clone();
let inner = self.inner.clone();
Expand All @@ -593,28 +523,22 @@ impl CommandRunner for BoundedCommandRunner {
.with_acquired(|concurrency_id| {
log::debug!(
"Running {} under semaphore with concurrency id: {}",
req.user_facing_name(),
process.description,
concurrency_id
);
std::mem::drop(blocking_token);

for (_, process) in req.0.iter_mut() {
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
let execution_slot = format!("{}", concurrency_id);
process
.env
.insert(execution_slot_env_var.clone(), execution_slot);
}
if let Some(ref execution_slot_env_var) = process.execution_slot_variable {
let execution_slot = format!("{}", concurrency_id);
process
.env
.insert(execution_slot_env_var.clone(), execution_slot);
}

inner.0.run(context, workunit, req)
inner.0.run(context, workunit, process)
})
.await
}

fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
self.inner.0.extract_compatible_request(req)
}
}

impl From<Box<BoundedCommandRunner>> for Arc<dyn CommandRunner> {
Expand Down
14 changes: 2 additions & 12 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tryfuture::try_future;
use workunit_store::{in_workunit, Level, Metric, RunningWorkunit, WorkunitMetadata};

use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process,
ProcessResultMetadata, ProcessResultSource,
};

Expand Down Expand Up @@ -233,25 +233,15 @@ impl ChildResults {

#[async_trait]
impl super::CommandRunner for CommandRunner {
fn extract_compatible_request(&self, req: &MultiPlatformProcess) -> Option<Process> {
for compatible_constraint in vec![None, self.platform.into()].iter() {
if let Some(compatible_req) = req.0.get(compatible_constraint) {
return Some(compatible_req.clone());
}
}
None
}

///
/// Runs a command on this machine in the passed working directory.
///
async fn run(
&self,
context: Context,
_workunit: &mut RunningWorkunit,
req: MultiPlatformProcess,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, String> {
let req = self.extract_compatible_request(&req).unwrap();
let req_debug_repr = format!("{:#?}", req);
in_workunit!(
context.workunit_store.clone(),
Expand Down
Loading

0 comments on commit acc90ac

Please sign in to comment.