Skip to content

Commit

Permalink
Platform for remote execution and caching comes from Python (#16898)
Browse files Browse the repository at this point in the history
It's possible to have n remote execution environments, and those may each have their own platform. So it's not safe for this value to be hardcoded in Rust.

I believe this fixes an actual bug for remote caching. We were setting the platform as the localhost, but if remote caching is used in conjunction with remote execution, that is often wrong. Specifically, using RE from macOS would store in the remote cache a macOS platform, when it should have been storing Linux.
  • Loading branch information
Eric-Arellano authored Sep 16, 2022
1 parent 468a4e5 commit 8923ea2
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 43 deletions.
15 changes: 11 additions & 4 deletions src/python/pants/engine/internals/platform_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@
from pants.engine.platform import Platform
from pants.engine.process import Process, ProcessResult
from pants.engine.rules import Get, collect_rules, rule
from pants.option.global_options import GlobalOptions
from pants.util.logging import LogLevel
from pants.util.strutil import softwrap


@rule
def current_platform(env_tgt: EnvironmentTarget) -> Platform:
if env_tgt.val is None or not env_tgt.val.has_field(DockerPlatformField):
return Platform.create_for_localhost()
return Platform(env_tgt.val[DockerPlatformField].normalized_value)
def current_platform(env_tgt: EnvironmentTarget, global_options: GlobalOptions) -> Platform:
if env_tgt.val and env_tgt.val.has_field(DockerPlatformField):
return Platform(env_tgt.val[DockerPlatformField].normalized_value)
return (
# For now, we assume remote execution always uses x86_64. Instead, this should be
# configurable via remote execution environment targets.
Platform.linux_x86_64
if global_options.remote_execution
else Platform.create_for_localhost()
)


@rule
Expand Down
14 changes: 4 additions & 10 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ pub enum ExecutionError {
pub struct CommandRunner {
instance_name: Option<String>,
process_cache_namespace: Option<String>,
platform: Platform,
store: Store,
execution_client: Arc<ExecutionClient<LayeredService>>,
overall_deadline: Duration,
Expand All @@ -122,7 +121,6 @@ impl CommandRunner {
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
store: Store,
platform: Platform,
overall_deadline: Duration,
retry_interval_duration: Duration,
execution_concurrency_limit: usize,
Expand Down Expand Up @@ -157,7 +155,6 @@ impl CommandRunner {
process_cache_namespace,
execution_client,
store,
platform,
overall_deadline,
retry_interval_duration,
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
Expand All @@ -167,10 +164,6 @@ impl CommandRunner {
Ok(command_runner)
}

pub fn platform(&self) -> Platform {
self.platform
}

async fn get_capabilities(&self) -> Result<&remexec::ServerCapabilities, String> {
let capabilities_fut = async {
let mut request = remexec::GetCapabilitiesRequest::default();
Expand Down Expand Up @@ -425,6 +418,7 @@ impl CommandRunner {
pub(crate) async fn extract_execute_response(
&self,
run_id: RunId,
platform: Platform,
operation_or_status: OperationOrStatus,
) -> Result<FallibleProcessResultWithPlatform, ExecutionError> {
trace!("Got operation response: {:?}", operation_or_status);
Expand Down Expand Up @@ -480,7 +474,7 @@ impl CommandRunner {
self.store.clone(),
run_id,
action_result,
self.platform,
platform,
false,
if execute_response.cached_result {
ProcessResultSource::HitRemotely
Expand Down Expand Up @@ -671,7 +665,7 @@ impl CommandRunner {
};

match self
.extract_execute_response(context.run_id, actionable_result)
.extract_execute_response(context.run_id, process.platform, actionable_result)
.await
{
Ok(result) => return Ok(result),
Expand Down Expand Up @@ -714,7 +708,7 @@ impl CommandRunner {
&process.description,
process.timeout,
start_time.elapsed(),
self.platform,
process.platform,
)
.await?;
return Ok(result);
Expand Down
5 changes: 1 addition & 4 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub struct CommandRunner {
executor: task_executor::Executor,
store: Store,
action_cache_client: Arc<ActionCacheClient<LayeredService>>,
platform: Platform,
cache_read: bool,
cache_write: bool,
cache_content_behavior: CacheContentBehavior,
Expand All @@ -72,7 +71,6 @@ impl CommandRunner {
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
mut headers: BTreeMap<String, String>,
platform: Platform,
cache_read: bool,
cache_write: bool,
warnings_behavior: RemoteCacheWarningsBehavior,
Expand Down Expand Up @@ -106,7 +104,6 @@ impl CommandRunner {
executor,
store,
action_cache_client,
platform,
cache_read,
cache_write,
cache_content_behavior,
Expand Down Expand Up @@ -263,7 +260,7 @@ impl CommandRunner {
action_digest,
&request.description,
self.instance_name.clone(),
self.platform,
request.platform,
&context,
self.action_cache_client.clone(),
self.store.clone(),
Expand Down
2 changes: 0 additions & 2 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ fn create_cached_runner(
&store_setup.cas.address(),
None,
BTreeMap::default(),
Platform::current().unwrap(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
Expand Down Expand Up @@ -615,7 +614,6 @@ async fn make_action_result_basic() {
&cas.address(),
None,
BTreeMap::default(),
Platform::current().unwrap(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
Expand Down
32 changes: 16 additions & 16 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ async fn sends_headers() {
String::from("authorization") => String::from("Bearer catnip-will-get-you-anywhere"),
},
store,
Platform::Linux_x86_64,
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
Expand Down Expand Up @@ -1076,7 +1075,6 @@ async fn ensure_inline_stdio_is_stored() {
None,
BTreeMap::new(),
store.clone(),
Platform::Linux_x86_64,
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
Expand Down Expand Up @@ -1417,7 +1415,6 @@ async fn execute_missing_file_uploads_if_known() {
None,
BTreeMap::new(),
store.clone(),
Platform::Linux_x86_64,
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
Expand Down Expand Up @@ -1478,7 +1475,6 @@ async fn execute_missing_file_errors_if_unknown() {
None,
BTreeMap::new(),
store,
Platform::Linux_x86_64,
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
Expand Down Expand Up @@ -1689,10 +1685,14 @@ async fn remote_workunits_are_stored() {
.build();
// TODO: This CommandRunner is only used for parsing, add so intentionally passes a CAS/AC
// address rather than an Execution address.
let (command_runner, _store) = create_command_runner(cas.address(), &cas, Platform::Linux_x86_64);
let (command_runner, _store) = create_command_runner(cas.address(), &cas);

command_runner
.extract_execute_response(RunId(0), OperationOrStatus::Operation(operation))
.extract_execute_response(
RunId(0),
Platform::Linux_x86_64,
OperationOrStatus::Operation(operation),
)
.await
.unwrap();

Expand Down Expand Up @@ -2136,11 +2136,7 @@ pub(crate) async fn run_cmd_runner<R: crate::CommandRunner>(
})
}

fn create_command_runner(
execution_address: String,
cas: &mock::StubCAS,
platform: Platform,
) -> (CommandRunner, Store) {
fn create_command_runner(execution_address: String, cas: &mock::StubCAS) -> (CommandRunner, Store) {
let runtime = task_executor::Executor::new();
let store_dir = TempDir::new().unwrap();
let store = make_store(store_dir.path(), cas, runtime.clone());
Expand All @@ -2151,7 +2147,6 @@ fn create_command_runner(
None,
BTreeMap::new(),
store.clone(),
platform,
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
Expand All @@ -2171,8 +2166,7 @@ async fn run_command_remote(
.directory(&TestDirectory::containing_roland())
.tree(&TestTree::roland_at_root())
.build();
let (command_runner, store) =
create_command_runner(execution_address, &cas, Platform::Linux_x86_64);
let (command_runner, store) = create_command_runner(execution_address, &cas);
let original = command_runner
.run(Context::default(), &mut workunit, request)
.await?;
Expand Down Expand Up @@ -2222,10 +2216,14 @@ async fn extract_execute_response(
.build();
// TODO: This CommandRunner is only used for parsing, add so intentionally passes a CAS/AC
// address rather than an Execution address.
let (command_runner, store) = create_command_runner(cas.address(), &cas, remote_platform);
let (command_runner, store) = create_command_runner(cas.address(), &cas);

let original = command_runner
.extract_execute_response(RunId(0), OperationOrStatus::Operation(operation))
.extract_execute_response(
RunId(0),
remote_platform,
OperationOrStatus::Operation(operation),
)
.await?;

let stdout_bytes: Vec<u8> = store
Expand Down Expand Up @@ -2303,6 +2301,7 @@ pub(crate) fn assert_contains(haystack: &str, needle: &str) {
pub(crate) fn cat_roland_request() -> Process {
let argv = owned_string_vec(&["/bin/cat", "roland.ext"]);
let mut process = Process::new(argv);
process.platform = Platform::Linux_x86_64;
process.input_digests =
InputDigests::with_input_files(TestDirectory::containing_roland().directory_digest());
process.timeout = one_second();
Expand All @@ -2312,6 +2311,7 @@ pub(crate) fn cat_roland_request() -> Process {

pub(crate) fn echo_roland_request() -> Process {
let mut req = Process::new(owned_string_vec(&["/bin/echo", "meoooow"]));
req.platform = Platform::Linux_x86_64;
req.timeout = one_second();
req.description = "unleash a roaring meow".to_string();
req
Expand Down
2 changes: 0 additions & 2 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ async fn main() {
root_ca_certs.clone(),
headers.clone(),
store.clone(),
Platform::Linux_x86_64,
Duration::from_secs(args.overall_deadline_secs),
Duration::from_millis(100),
args.execution_rpc_concurrency,
Expand All @@ -323,7 +322,6 @@ async fn main() {
&address,
root_ca_certs,
headers,
Platform::Linux_x86_64,
true,
true,
process_execution::remote_cache::RemoteCacheWarningsBehavior::Backoff,
Expand Down
6 changes: 1 addition & 5 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parking_lot::Mutex;
use process_execution::switched::SwitchedCommandRunner;
use process_execution::{
self, bounded, docker, local, nailgun, remote, remote_cache, CacheContentBehavior, CommandRunner,
ImmutableInputs, NamedCaches, Platform, ProcessExecutionStrategy, RemoteCacheWarningsBehavior,
ImmutableInputs, NamedCaches, ProcessExecutionStrategy, RemoteCacheWarningsBehavior,
};
use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities;
use regex::Regex;
Expand Down Expand Up @@ -266,9 +266,6 @@ impl Core {
root_ca_certs.clone(),
remoting_opts.execution_headers.clone(),
full_store.clone(),
// TODO if we ever want to configure the remote platform to be something else we
// need to take an option all the way down here and into the remote::CommandRunner struct.
Platform::Linux_x86_64,
remoting_opts.execution_overall_deadline,
Duration::from_millis(100),
remoting_opts.execution_rpc_concurrency,
Expand Down Expand Up @@ -331,7 +328,6 @@ impl Core {
remoting_opts.store_address.as_ref().unwrap(),
root_ca_certs.clone(),
remoting_opts.store_headers.clone(),
Platform::current()?,
remote_cache_read,
remote_cache_write,
remoting_opts.cache_warnings_behavior,
Expand Down

0 comments on commit 8923ea2

Please sign in to comment.