Skip to content

Commit

Permalink
Add ExecuteProcess-specific backtracking, and use StubActionCache in …
Browse files Browse the repository at this point in the history
…an integration test.

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jun 16, 2022
1 parent 5954eac commit 5efed6c
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 99 deletions.
11 changes: 11 additions & 0 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ class PyStubCAS:
def builder(cls) -> PyStubCASBuilder: ...
@property
def address(self) -> str: ...
def remove(self, digest: FileDigest) -> bool: ...

class PyStubActionCache:
def __init__(self, executor: PyExecutor) -> None: ...
@property
def address(self) -> str: ...
def len(self) -> int: ...

# ------------------------------------------------------------------------------
# (etc.)
# ------------------------------------------------------------------------------

class RawFdRunner(Protocol):
def __call__(
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl CommandRunner {
};

// Ensure that all digests in the result are loadable, erroring if any are not.
// TODO: Make conditional on `eager_fetch`, since backtracking makes this unnecessary as well.
let _ = future::try_join_all(vec![
self
.file_store
Expand Down
102 changes: 35 additions & 67 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use maplit::hashset;
use tempfile::TempDir;
use tokio::time::sleep;

use fs::{DirectoryDigest, RelativePath, EMPTY_DIRECTORY_DIGEST};
use grpc_util::tls;
use hashing::{Digest, EMPTY_DIGEST};
use maplit::hashset;
use mock::{StubActionCache, StubCAS};
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ActionResult;
use store::Store;
use tempfile::TempDir;
use testutil::data::{TestData, TestDirectory, TestTree};
use tokio::time::sleep;
use workunit_store::{RunId, RunningWorkunit, WorkunitStore};

use crate::remote::{ensure_action_stored_locally, make_execute_request};
Expand Down Expand Up @@ -151,50 +151,34 @@ fn create_cached_runner(
(runner, action_cache)
}

async fn create_process(store: &Store) -> (Process, Digest) {
// TODO: Unfortunately, this code cannot be moved to the `testutil::mock` crate, because that
// introduces a cycle between this crate and that one.
async fn create_process(store_setup: &StoreSetup) -> (Process, Digest) {
let process = Process::new(vec![
"this process will not execute: see MockLocalCommandRunner".to_string(),
]);
let (action, command, _exec_request) =
make_execute_request(&process, ProcessMetadata::default()).unwrap();
let (_command_digest, action_digest) = ensure_action_stored_locally(store, &command, &action)
.await
.unwrap();
let (_command_digest, action_digest) =
ensure_action_stored_locally(&store_setup.store, &command, &action)
.await
.unwrap();
(process, action_digest)
}

fn insert_into_action_cache(
action_cache: &StubActionCache,
action_digest: &Digest,
exit_code: i32,
stdout_digest: Digest,
stderr_digest: Digest,
) {
let action_result = ActionResult {
exit_code,
stdout_digest: Some(stdout_digest.into()),
stderr_digest: Some(stderr_digest.into()),
..ActionResult::default()
};
action_cache
.action_map
.lock()
.insert(action_digest.hash, action_result);
}

#[tokio::test]
async fn cache_read_success() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
.run(Context::default(), &mut workunit, process.into())
.await
.unwrap();
assert_eq!(remote_result.exit_code, 0);
Expand All @@ -210,8 +194,8 @@ async fn cache_read_skipped_on_action_cache_errors() {
let (local_runner, local_runner_call_counter) = create_local_runner(1, 500);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
action_cache.always_errors.store(true, Ordering::SeqCst);

assert_eq!(
Expand All @@ -220,7 +204,7 @@ async fn cache_read_skipped_on_action_cache_errors() {
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
.run(Context::default(), &mut workunit, process.into())
.await
.unwrap();
assert_eq!(remote_result.exit_code, 1);
Expand All @@ -241,10 +225,9 @@ async fn cache_read_skipped_on_store_errors() {
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, true);

// Claim that the process has a non-empty and not-persisted stdout digest.
let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(
&action_cache,
&action_digest,
let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(
action_digest,
0,
Digest::of_bytes("pigs flying".as_bytes()),
EMPTY_DIGEST,
Expand All @@ -256,7 +239,7 @@ async fn cache_read_skipped_on_store_errors() {
);
assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
.run(Context::default(), &mut workunit, process.into())
.await
.unwrap();
assert_eq!(remote_result.exit_code, 1);
Expand All @@ -280,18 +263,17 @@ async fn cache_read_eager_fetch() {
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, 0, 0, eager_fetch);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(
&action_cache,
&action_digest,
let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(
action_digest,
0,
TestData::roland().digest(),
TestData::roland().digest(),
);

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), workunit, process.clone().into())
.run(Context::default(), workunit, process.into())
.await
.unwrap();

Expand Down Expand Up @@ -323,14 +305,14 @@ async fn cache_read_speculation() {
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, remote_delay_ms, 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
let (process, action_digest) = create_process(&store_setup).await;
if cache_hit {
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
}

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(Context::default(), workunit, process.clone().into())
.run(Context::default(), workunit, process.into())
.await
.unwrap();

Expand Down Expand Up @@ -360,7 +342,7 @@ async fn cache_write_success() {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let (process, action_digest) = create_process(&store_setup.store).await;
let (process, action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
Expand All @@ -374,15 +356,8 @@ async fn cache_write_success() {

// Wait for the cache write block to finish.
sleep(Duration::from_secs(1)).await;
assert_eq!(action_cache.action_map.lock().len(), 1);
let action_map_mutex_guard = action_cache.action_map.lock();
assert_eq!(
action_map_mutex_guard
.get(&action_digest.hash)
.unwrap()
.exit_code,
0
);
assert_eq!(action_cache.len(), 1);
assert_eq!(action_cache.get(action_digest).unwrap().exit_code, 0);
}

#[tokio::test]
Expand All @@ -391,7 +366,7 @@ async fn cache_write_not_for_failures() {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let (process, _action_digest) = create_process(&store_setup.store).await;
let (process, _action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
Expand All @@ -416,7 +391,7 @@ async fn cache_write_does_not_block() {
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, 0, 100, false);
let (process, action_digest) = create_process(&store_setup.store).await;
let (process, action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
Expand All @@ -433,15 +408,8 @@ async fn cache_write_does_not_block() {
assert!(action_cache.action_map.lock().is_empty());

sleep(Duration::from_secs(1)).await;
assert_eq!(action_cache.action_map.lock().len(), 1);
let action_map_mutex_guard = action_cache.action_map.lock();
assert_eq!(
action_map_mutex_guard
.get(&action_digest.hash)
.unwrap()
.exit_code,
0
);
assert_eq!(action_cache.len(), 1);
assert_eq!(action_cache.get(action_digest).unwrap().exit_code, 0);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 5efed6c

Please sign in to comment.