diff --git a/build-support/githooks/pre-commit b/build-support/githooks/pre-commit index 45b6d8108cb2..27524b031cd8 100755 --- a/build-support/githooks/pre-commit +++ b/build-support/githooks/pre-commit @@ -53,7 +53,7 @@ echo "* Checking shell scripts via our custom linter" # fails in pants changed. if git rev-parse --verify "${MERGE_BASE}" &>/dev/null; then echo "* Checking imports" - ./build-support/bin/isort.sh || die "To fix import sort order, run \`\"$(pwd)/build-support/bin/isort.sh\" -f\`" + ./build-support/bin/isort.sh || die "To fix import sort order, run \`build-support/bin/isort.sh -f\`" # TODO(CMLivingston) Make lint use `-q` option again after addressing proper workunit labeling: # https://github.com/pantsbuild/pants/issues/6633 diff --git a/src/python/pants/build_graph/build_configuration.py b/src/python/pants/build_graph/build_configuration.py index 5512d3a3910f..d99070e9ddf7 100644 --- a/src/python/pants/build_graph/build_configuration.py +++ b/src/python/pants/build_graph/build_configuration.py @@ -170,7 +170,7 @@ def rules(self): return list(self._rules) def union_rules(self): - """Returns a mapping of registered union base types -> [a list of union member types]. + """Returns a mapping of registered union base types -> [OrderedSet of union member types]. :rtype: OrderedDict """ diff --git a/src/python/pants/engine/rules.py b/src/python/pants/engine/rules.py index 865ef11464ae..d5b06e34acc3 100644 --- a/src/python/pants/engine/rules.py +++ b/src/python/pants/engine/rules.py @@ -10,8 +10,9 @@ from abc import ABC, abstractmethod from collections import OrderedDict from collections.abc import Iterable +from dataclasses import dataclass from textwrap import dedent -from typing import Any, Callable, Type, cast +from typing import Any, Callable, Dict, Type, cast import asttokens from twitter.common.collections import OrderedSet @@ -428,6 +429,17 @@ def __new__(cls, union_base, union_member): return super().__new__(cls, union_base, union_member) +@dataclass(frozen=True) +class UnionMembership: + union_rules: Dict[type, typing.Iterable[type]] + + def is_member(self, union_type, putative_member): + members = self.union_rules.get(union_type) + if members is None: + raise TypeError(f'Not a registered union type: {union_type}') + return type(putative_member) in members + + class Rule(ABC): """Rules declare how to produce products for the product graph. diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index 5df054546b71..65bea4ed791e 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -42,7 +42,7 @@ from pants.engine.mapper import AddressMapper from pants.engine.parser import SymbolTable from pants.engine.platform import create_platform_rules -from pants.engine.rules import RootRule, rule +from pants.engine.rules import RootRule, UnionMembership, rule from pants.engine.scheduler import Scheduler from pants.engine.selectors import Params from pants.init.options_initializer import BuildConfigInitializer, OptionsInitializer @@ -357,6 +357,10 @@ def build_configuration_singleton() -> BuildConfiguration: def symbol_table_singleton() -> SymbolTable: return symbol_table + @rule + def union_membership_singleton() -> UnionMembership: + return UnionMembership(build_configuration.union_rules()) + # Create a Scheduler containing graph and filesystem rules, with no installed goals. The # LegacyBuildGraph will explicitly request the products it needs. rules = ( @@ -365,6 +369,7 @@ def symbol_table_singleton() -> SymbolTable: glob_match_error_behavior_singleton, build_configuration_singleton, symbol_table_singleton, + union_membership_singleton, ] + create_legacy_graph_tasks() + create_fs_rules() + diff --git a/src/python/pants/rules/core/test.py b/src/python/pants/rules/core/test.py index 0883889b0fe3..7f4c950beaef 100644 --- a/src/python/pants/rules/core/test.py +++ b/src/python/pants/rules/core/test.py @@ -2,14 +2,16 @@ # Licensed under the Apache License, Version 2.0 (see LICENSE). import logging +from dataclasses import dataclass +from typing import Optional from pants.base.exiter import PANTS_FAILED_EXIT_CODE, PANTS_SUCCEEDED_EXIT_CODE -from pants.build_graph.address import Address +from pants.build_graph.address import Address, BuildFileAddress from pants.engine.addressable import BuildFileAddresses from pants.engine.console import Console from pants.engine.goal import Goal from pants.engine.legacy.graph import HydratedTarget -from pants.engine.rules import console_rule, rule +from pants.engine.rules import UnionMembership, console_rule, rule from pants.engine.selectors import Get from pants.rules.core.core_test_model import Status, TestResult, TestTarget @@ -24,18 +26,27 @@ class Test(Goal): name = 'test' +@dataclass(frozen=True) +class AddressAndTestResult: + address: BuildFileAddress + test_result: Optional[TestResult] # If None, target was not a test target. + + @console_rule def fast_test(console: Console, addresses: BuildFileAddresses) -> Test: - test_results = yield [Get(TestResult, Address, address.to_address()) for address in addresses] + results = yield [Get(AddressAndTestResult, Address, addr.to_address()) for addr in addresses] did_any_fail = False - for address, test_result in zip(addresses, test_results): + filtered_results = [(x.address, x.test_result) for x in results if x.test_result is not None] + + for address, test_result in filtered_results: if test_result.status == Status.FAILURE: did_any_fail = True if test_result.stdout: console.write_stdout( "{} stdout:\n{}\n".format( address.reference(), - console.red(test_result.stdout) if test_result.status == Status.FAILURE else test_result.stdout + (console.red(test_result.stdout) if test_result.status == Status.FAILURE + else test_result.stdout) ) ) if test_result.stderr: @@ -44,14 +55,16 @@ def fast_test(console: Console, addresses: BuildFileAddresses) -> Test: console.write_stdout( "{} stderr:\n{}\n".format( address.reference(), - console.red(test_result.stderr) if test_result.status == Status.FAILURE else test_result.stderr + (console.red(test_result.stderr) if test_result.status == Status.FAILURE + else test_result.stderr) ) ) console.write_stdout("\n") - for address, test_result in zip(addresses, test_results): - console.print_stdout('{0:80}.....{1:>10}'.format(address.reference(), test_result.status.value)) + for address, test_result in filtered_results: + console.print_stdout('{0:80}.....{1:>10}'.format( + address.reference(), test_result.status.value)) if did_any_fail: console.print_stderr(console.red('Tests failed')) @@ -63,19 +76,24 @@ def fast_test(console: Console, addresses: BuildFileAddresses) -> Test: @rule -def coordinator_of_tests(target: HydratedTarget) -> TestResult: +def coordinator_of_tests(target: HydratedTarget, + union_membership: UnionMembership) -> AddressAndTestResult: # TODO(#6004): when streaming to live TTY, rely on V2 UI for this information. When not a # live TTY, periodically dump heavy hitters to stderr. See # https://github.com/pantsbuild/pants/issues/6004#issuecomment-492699898. - logger.info("Starting tests: {}".format(target.address.reference())) - # NB: This has the effect of "casting" a TargetAdaptor to a member of the TestTarget union. If the - # TargetAdaptor is not a member of the union, it will fail at runtime with a useful error message. - result = yield Get(TestResult, TestTarget, target.adaptor) - logger.info("Tests {}: {}".format( - "succeeded" if result.status == Status.SUCCESS else "failed", - target.address.reference(), - )) - yield result + if union_membership.is_member(TestTarget, target.adaptor): + logger.info("Starting tests: {}".format(target.address.reference())) + # NB: This has the effect of "casting" a TargetAdaptor to a member of the TestTarget union. + # The adaptor will always be a member because of the union membership check above, but if + # it were not it would fail at runtime with a useful error message. + result = yield Get(TestResult, TestTarget, target.adaptor) + logger.info("Tests {}: {}".format( + "succeeded" if result.status == Status.SUCCESS else "failed", + target.address.reference(), + )) + else: + result = None # Not a test target. + yield AddressAndTestResult(target.address, result) def rules(): diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 7d06f7cfc426..5959c0bcd7c3 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -111,6 +111,8 @@ pub struct CommandRunner { store: Store, platform: Platform, executor: task_executor::Executor, + backoff_incremental_wait: Duration, + backoff_max_wait: Duration, } #[derive(Debug, PartialEq)] @@ -121,6 +123,8 @@ enum ExecutionError { MissingDigests(Vec), // String is the operation name which can be used to poll the GetOperation gRPC API. NotFinished(String), + // String is the error message. + Retryable(String), } #[derive(Default)] @@ -129,6 +133,12 @@ struct ExecutionHistory { current_attempt: ExecutionStats, } +impl ExecutionHistory { + fn total_attempt_count(&self) -> usize { + self.attempts.len() + 1 + } +} + impl CommandRunner { // The Execute API used to be unary, and became streaming. The contract of the streaming API is // that if the client closes the stream after one request, it should continue to function exactly @@ -316,69 +326,43 @@ impl super::CommandRunner for CommandRunner { } future::err(err).to_boxed() } + ExecutionError::Retryable(message) => { + command_runner.retry_execution( + execute_request, + future::done({ + if history.total_attempt_count() >= 5 { + Err(format!("Gave up retrying remote execution after {} retriable attempts; last failure: {}", history.total_attempt_count(), message)) + } else { + trace!("Got retryable error from server; retrying. Error: {}", message); + Ok(()) + } + }).to_boxed(), + history, + maybe_cancel_remote_exec_token, + ) + }, ExecutionError::MissingDigests(missing_digests) => { - let ExecutionHistory { - mut attempts, - current_attempt, - } = history; - trace!( "Server reported missing digests ({:?}); trying to upload: {:?}", - current_attempt, + history.current_attempt, missing_digests, ); - attempts.push(current_attempt); - let history = ExecutionHistory { - attempts, - current_attempt: ExecutionStats::default(), - }; - - store - .ensure_remote_has_recursive(missing_digests, workunit_store.clone()) - .and_then({ - let command_runner = command_runner.clone(); - move |summary| { - let mut history = history; - history.current_attempt += summary; - command_runner - .oneshot_execute(&execute_request) - .join(future::ok(history)) - } - }) - .map({ - let operations_client = operations_client.clone(); - let executor = command_runner.executor.clone(); - move |(operation, history)| { - let maybe_cancel_remote_exec_token = match operation { - OperationOrStatus::Operation(ref operation) => { - Some(CancelRemoteExecutionToken::new( - operations_client, - operation.name.clone(), - executor, - )) - } - _ => None, - }; - // Reset `iter_num` on `MissingDigests` - future::Loop::Continue(( - history, - operation, - maybe_cancel_remote_exec_token, - 0, - )) - } - }) - .to_boxed() - } + command_runner.retry_execution( + execute_request, + store.ensure_remote_has_recursive(missing_digests, workunit_store.clone()).map(|_| ()).to_boxed(), + history, + maybe_cancel_remote_exec_token, + ) + }, ExecutionError::NotFinished(operation_name) => { let mut operation_request = bazel_protos::operations::GetOperationRequest::new(); operation_request.set_name(operation_name.clone()); let backoff_period = min( - CommandRunner::BACKOFF_MAX_WAIT_MILLIS, - (1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS, + command_runner.backoff_max_wait, + (1 + iter_num) * command_runner.backoff_incremental_wait, ); // take the grpc result and cancel the op if too much time has passed. @@ -404,7 +388,7 @@ impl super::CommandRunner for CommandRunner { .to_boxed() } else { // maybe the delay here should be the min of remaining time and the backoff period - Delay::new(Instant::now() + Duration::from_millis(backoff_period)) + Delay::new(Instant::now() + backoff_period) .map_err(move |e| { format!( "Future-Delay errored at operation result polling for {}, {}: {}", @@ -421,7 +405,7 @@ impl super::CommandRunner for CommandRunner { .or_else(move |err| { rpcerror_recover_cancelled(operation_request.take_name(), err) }) - .map( OperationOrStatus::Operation) + .map(OperationOrStatus::Operation) .map_err(rpcerror_to_string), ) .map(move |operation| { @@ -466,9 +450,6 @@ impl super::CommandRunner for CommandRunner { } impl CommandRunner { - const BACKOFF_INCR_WAIT_MILLIS: u64 = 500; - const BACKOFF_MAX_WAIT_MILLIS: u64 = 5000; - pub fn new( address: &str, metadata: ExecuteProcessRequestMetadata, @@ -477,6 +458,8 @@ impl CommandRunner { store: Store, platform: Platform, executor: task_executor::Executor, + backoff_incremental_wait: Duration, + backoff_max_wait: Duration, ) -> CommandRunner { let env = Arc::new(grpcio::EnvBuilder::new().build()); let channel = { @@ -507,6 +490,8 @@ impl CommandRunner { store, platform, executor, + backoff_incremental_wait, + backoff_max_wait, } } @@ -644,11 +629,10 @@ impl CommandRunner { attempts.current_attempt.was_cache_hit = execute_response.cached_result; } - let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); - execution_attempts.push(attempts.current_attempt); - let status = execute_response.take_status(); if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok { + let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); + execution_attempts.push(attempts.current_attempt); return populate_fallible_execution_result( self.store.clone(), execute_response, @@ -735,15 +719,84 @@ impl CommandRunner { } future::err(ExecutionError::MissingDigests(missing_digests)).to_boxed() } - code => future::err(ExecutionError::Fatal(format!( - "Error from remote execution: {:?}: {:?}", - code, - status.get_message() - ))) - .to_boxed(), + code => { + // Error we see from Google's RBE service if we use pre-emptable workers and one gets pre-empted. + if code == grpcio::RpcStatusCode::Aborted + && status.get_message() == "the bot running the task appears to be lost" + { + future::err(ExecutionError::Retryable(status.get_message().to_owned())).to_boxed() + } else { + future::err(ExecutionError::Fatal(format!( + "Error from remote execution: {:?}: {:?}", + code, + status.get_message() + ))) + .to_boxed() + } + } } .to_boxed() } + + #[allow(clippy::type_complexity)] + fn retry_execution( + &self, + execute_request: Arc, + prefix_future: BoxFuture<(), String>, + history: ExecutionHistory, + maybe_cancel_remote_exec_token: Option, + ) -> BoxFuture< + futures::future::Loop< + FallibleExecuteProcessResult, + ( + ExecutionHistory, + OperationOrStatus, + Option, + u32, + ), + >, + String, + > { + if let Some(mut cancel_remote_exec_token) = maybe_cancel_remote_exec_token { + cancel_remote_exec_token.do_not_send_cancellation_on_drop(); + } + + let ExecutionHistory { + mut attempts, + current_attempt, + } = history; + attempts.push(current_attempt); + let history = ExecutionHistory { + attempts, + current_attempt: ExecutionStats::default(), + }; + + let command_runner = self.clone(); + prefix_future + .and_then(move |()| command_runner.oneshot_execute(&execute_request)) + .map({ + let operations_client = self.operations_client.clone(); + let executor = self.executor.clone(); + move |operation| { + let maybe_cancel_remote_exec_token = match operation { + OperationOrStatus::Operation(ref operation) => Some(CancelRemoteExecutionToken::new( + operations_client, + operation.name.clone(), + executor, + )), + _ => None, + }; + future::Loop::Continue(( + history, + operation, + maybe_cancel_remote_exec_token, + // Reset `iter_num` for a new Execute attempt: + 0, + )) + } + }) + .to_boxed() + } } fn maybe_add_workunit( @@ -1771,6 +1824,93 @@ pub mod tests { assert_cancellation_requests(&mock_server, vec![]); } + #[test] + fn retries_retriable_errors() { + let execute_request = echo_foo_request(); + let op_name = "gimme-foo".to_string(); + + let mock_server = { + mock::execution_server::TestServer::new( + mock::execution_server::MockExecution::new( + op_name.clone(), + super::make_execute_request( + &execute_request.clone().try_into().unwrap(), + empty_request_metadata(), + ) + .unwrap() + .2, + vec![ + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + make_incomplete_operation(&op_name), + make_successful_operation( + &op_name, + StdoutType::Raw("foo".to_owned()), + StderrType::Raw("".to_owned()), + 0, + ), + ], + ), + None, + ) + }; + + let result = run_command_remote(mock_server.address(), execute_request).unwrap(); + + assert_eq!( + result.without_execution_attempts(), + FallibleExecuteProcessResult { + stdout: as_bytes("foo"), + stderr: as_bytes(""), + exit_code: 0, + output_directory: EMPTY_DIGEST, + execution_attempts: vec![], + } + ); + + assert_cancellation_requests(&mock_server, vec![]); + } + + #[test] + fn gives_up_after_many_retriable_errors() { + let execute_request = echo_foo_request(); + let op_name = "gimme-foo".to_string(); + + let mock_server = { + mock::execution_server::TestServer::new( + mock::execution_server::MockExecution::new( + op_name.clone(), + super::make_execute_request( + &execute_request.clone().try_into().unwrap(), + empty_request_metadata(), + ) + .unwrap() + .2, + vec![ + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + make_incomplete_operation(&op_name), + make_retryable_operation_failure(), + ], + ), + None, + ) + }; + + let err = run_command_remote(mock_server.address(), execute_request).unwrap_err(); + + assert_that!(err).contains("Gave up"); + assert_that!(err).contains("appears to be lost"); + + assert_cancellation_requests(&mock_server, vec![]); + } + #[test] fn extract_response_with_digest_stdout() { let op_name = "gimme-foo".to_string(); @@ -1887,6 +2027,8 @@ pub mod tests { store, Platform::Linux, runtime.clone(), + Duration::from_millis(0), + Duration::from_secs(0), ); let result = runtime .block_on(cmd_runner.run(echo_roland_request(), WorkUnitStore::new())) @@ -2068,7 +2210,12 @@ pub mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner(mock_server.address(), &cas); + let command_runner = create_command_runner( + mock_server.address(), + &cas, + Duration::from_millis(0), + Duration::from_secs(0), + ); let mut runtime = tokio::runtime::Runtime::new().unwrap(); let successful_mock_result = FallibleExecuteProcessResult { @@ -2408,6 +2555,8 @@ pub mod tests { store, Platform::Linux, runtime.clone(), + Duration::from_millis(0), + Duration::from_secs(0), ); let result = runtime @@ -2510,6 +2659,8 @@ pub mod tests { store, Platform::Linux, runtime.clone(), + Duration::from_millis(0), + Duration::from_secs(0), ) .run(cat_roland_request(), WorkUnitStore::new()) .wait(); @@ -2585,6 +2736,8 @@ pub mod tests { store, Platform::Linux, runtime.clone(), + Duration::from_millis(0), + Duration::from_secs(0), ); let error = runtime @@ -2797,7 +2950,7 @@ pub mod tests { #[test] fn wait_between_request_1_retry() { - // wait at least 500 milli for one retry + // wait at least 100 milli for one retry { let execute_request = echo_foo_request(); let mock_server = { @@ -2824,7 +2977,17 @@ pub mod tests { None, ) }; - run_command_remote(mock_server.address(), execute_request).unwrap(); + let cas = mock::StubCAS::empty(); + let command_runner = create_command_runner( + mock_server.address(), + &cas, + Duration::from_millis(100), + Duration::from_secs(1), + ); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime + .block_on(command_runner.run(execute_request, WorkUnitStore::new())) + .unwrap(); let messages = mock_server.mock_responder.received_messages.lock(); assert!(messages.len() == 2); @@ -2834,14 +2997,14 @@ pub mod tests { .unwrap() .received_at .sub(messages.get(0).unwrap().received_at) - >= Duration::from_millis(500) + >= Duration::from_millis(100) ); } } #[test] fn wait_between_request_3_retry() { - // wait at least 500 + 1000 + 1500 = 3000 milli for 3 retries. + // wait at least 50 + 100 + 150 = 300 milli for 3 retries. { let execute_request = echo_foo_request(); let mock_server = { @@ -2870,7 +3033,17 @@ pub mod tests { None, ) }; - run_command_remote(mock_server.address(), execute_request).unwrap(); + let cas = mock::StubCAS::empty(); + let command_runner = create_command_runner( + mock_server.address(), + &cas, + Duration::from_millis(50), + Duration::from_secs(5), + ); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime + .block_on(command_runner.run(execute_request, WorkUnitStore::new())) + .unwrap(); let messages = mock_server.mock_responder.received_messages.lock(); assert!(messages.len() == 4); @@ -2880,7 +3053,7 @@ pub mod tests { .unwrap() .received_at .sub(messages.get(0).unwrap().received_at) - >= Duration::from_millis(500) + >= Duration::from_millis(50) ); assert!( messages @@ -2888,7 +3061,7 @@ pub mod tests { .unwrap() .received_at .sub(messages.get(1).unwrap().received_at) - >= Duration::from_millis(1000) + >= Duration::from_millis(100) ); assert!( messages @@ -2896,7 +3069,7 @@ pub mod tests { .unwrap() .received_at .sub(messages.get(2).unwrap().received_at) - >= Duration::from_millis(1500) + >= Duration::from_millis(150) ); } } @@ -3090,7 +3263,12 @@ pub mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner("".to_owned(), &cas); + let command_runner = create_command_runner( + "".to_owned(), + &cas, + std::time::Duration::from_millis(0), + std::time::Duration::from_secs(0), + ); let mut runtime = tokio::runtime::Runtime::new().unwrap(); @@ -3183,6 +3361,25 @@ pub mod tests { MockOperation::new(op) } + fn make_retryable_operation_failure() -> MockOperation { + let mut status = bazel_protos::status::Status::new(); + status.set_code(grpcio::RpcStatusCode::Aborted as i32); + status.set_message(String::from("the bot running the task appears to be lost")); + + let mut operation = bazel_protos::operations::Operation::new(); + operation.set_done(true); + operation.set_response(make_any_proto(&{ + let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); + response.set_status(status); + response + })); + + MockOperation { + op: Ok(Some(operation)), + duration: None, + } + } + fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation { let mut op = bazel_protos::operations::Operation::new(); op.set_name(operation_name.to_string()); @@ -3327,12 +3524,22 @@ pub mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner(address, &cas); + let command_runner = create_command_runner( + address, + &cas, + Duration::from_millis(0), + Duration::from_secs(0), + ); let mut runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(command_runner.run(request, WorkUnitStore::new())) } - fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { + fn create_command_runner( + address: String, + cas: &mock::StubCAS, + backoff_incremental_wait: Duration, + backoff_max_wait: Duration, + ) -> CommandRunner { let runtime = task_executor::Executor::new(); let store_dir = TempDir::new().unwrap(); let store = Store::with_remote( @@ -3359,6 +3566,8 @@ pub mod tests { store, Platform::Linux, runtime.clone(), + backoff_incremental_wait, + backoff_max_wait, ) } @@ -3369,7 +3578,12 @@ pub mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner("".to_owned(), &cas); + let command_runner = create_command_runner( + "".to_owned(), + &cas, + Duration::from_millis(0), + Duration::from_secs(0), + ); let mut runtime = tokio::runtime::Runtime::new().unwrap(); @@ -3387,7 +3601,12 @@ pub mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let command_runner = create_command_runner("".to_owned(), &cas); + let command_runner = create_command_runner( + "".to_owned(), + &cas, + Duration::from_millis(0), + Duration::from_secs(0), + ); let mut runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(super::extract_output_files( diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 156bf3b25f01..b9acde2dc897 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -336,6 +336,8 @@ fn main() { store.clone(), Platform::Linux, executor.clone(), + std::time::Duration::from_millis(500), + std::time::Duration::from_secs(5), )) as Box } None => Box::new(process_execution::local::CommandRunner::new( diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index d1661724d290..86f8516c49b4 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -164,6 +164,8 @@ impl Core { // need to take an option all the way down here and into the remote::CommandRunner struct. Platform::Linux, executor.clone(), + std::time::Duration::from_millis(500), + std::time::Duration::from_secs(5), )), process_execution_remote_parallelism, )); diff --git a/tests/python/pants_test/rules/test_test.py b/tests/python/pants_test/rules/test_test.py index 5ae5a309b933..c51635c26b3d 100644 --- a/tests/python/pants_test/rules/test_test.py +++ b/tests/python/pants_test/rules/test_test.py @@ -7,7 +7,15 @@ from pants.build_graph.address import Address, BuildFileAddress from pants.engine.legacy.graph import HydratedTarget from pants.engine.legacy.structs import PythonTestsAdaptor -from pants.rules.core.test import Status, TestResult, coordinator_of_tests, fast_test +from pants.engine.rules import UnionMembership +from pants.rules.core.core_test_model import TestTarget +from pants.rules.core.test import ( + AddressAndTestResult, + Status, + TestResult, + coordinator_of_tests, + fast_test, +) from pants_test.engine.util import MockConsole, run_rule from pants_test.test_base import TestBase @@ -16,14 +24,16 @@ class TestTest(TestBase): def single_target_test(self, result, expected_console_output, success=True): console = MockConsole(use_colors=False) - res = run_rule(fast_test, console, (self.make_build_target_address("some/target"),), { - (TestResult, Address): lambda _: result, + addr = self.make_build_target_address("some/target") + res = run_rule(fast_test, console, (addr,), { + (AddressAndTestResult, Address): lambda _: AddressAndTestResult(addr, result), }) self.assertEquals(console.stdout.getvalue(), expected_console_output) self.assertEquals(0 if success else 1, res.exit_code) - def make_build_target_address(self, spec): + @staticmethod + def make_build_target_address(spec): address = Address.parse(spec) return BuildFileAddress( build_file=None, @@ -61,14 +71,15 @@ def test_output_mixed(self): def make_result(target): if target == target1: - return TestResult(status=Status.SUCCESS, stdout='I passed\n', stderr='') + tr = TestResult(status=Status.SUCCESS, stdout='I passed\n', stderr='') elif target == target2: - return TestResult(status=Status.FAILURE, stdout='I failed\n', stderr='') + tr = TestResult(status=Status.FAILURE, stdout='I failed\n', stderr='') else: raise Exception("Unrecognised target") + return AddressAndTestResult(target, tr) res = run_rule(fast_test, console, (target1, target2), { - (TestResult, Address): make_result, + (AddressAndTestResult, Address): make_result, }) self.assertEqual(1, res.exit_code) @@ -97,10 +108,19 @@ def test_stderr(self): ) def test_coordinator_python_test(self): + addr = Address.parse("some/target") target_adaptor = PythonTestsAdaptor(type_alias='python_tests') with self.captured_logging(logging.INFO): - result = run_rule(coordinator_of_tests, HydratedTarget(Address.parse("some/target"), target_adaptor, ()), { - (TestResult, PythonTestsAdaptor): lambda _: TestResult(status=Status.FAILURE, stdout='foo', stderr=''), - }) - - self.assertEqual(result, TestResult(status=Status.FAILURE, stdout='foo', stderr='')) + result = run_rule( + coordinator_of_tests, + HydratedTarget(addr, target_adaptor, ()), + UnionMembership(union_rules={TestTarget: [PythonTestsAdaptor]}), + { + (TestResult, PythonTestsAdaptor): + lambda _: TestResult(status=Status.FAILURE, stdout='foo', stderr=''), + }) + + self.assertEqual( + result, + AddressAndTestResult(addr, TestResult(status=Status.FAILURE, stdout='foo', stderr='')) + )