Skip to content

Commit

Permalink
Merge the StubCAS and StubActionCache.
Browse files Browse the repository at this point in the history
[ci skip-build-wheels]
  • Loading branch information
stuhood committed Jun 17, 2022
1 parent e915c87 commit e53a231
Show file tree
Hide file tree
Showing 11 changed files with 911 additions and 941 deletions.
9 changes: 2 additions & 7 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class PantsdClientException(Exception):
# ------------------------------------------------------------------------------

class PyStubCASBuilder:
def always_errors(self) -> PyStubCASBuilder: ...
def cas_always_errors(self) -> PyStubCASBuilder: ...
def build(self, executor: PyExecutor) -> PyStubCAS: ...

class PyStubCAS:
Expand All @@ -187,12 +187,7 @@ class PyStubCAS:
@property
def address(self) -> str: ...
def remove(self, digest: FileDigest) -> bool: ...

class PyStubActionCache:
def __init__(self, executor: PyExecutor) -> None: ...
@property
def address(self) -> str: ...
def len(self) -> int: ...
def action_cache_len(self) -> int: ...

# ------------------------------------------------------------------------------
# (etc.)
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn missing_directory() {
#[tokio::test]
async fn load_file_grpc_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let error = load_file_bytes(&new_byte_store(&cas), TestData::roland().digest())
.await
Expand All @@ -80,7 +80,7 @@ async fn load_file_grpc_error() {
#[tokio::test]
async fn load_directory_grpc_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let error = load_directory_proto_bytes(
&new_byte_store(&cas),
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn write_empty_file() {

#[tokio::test]
async fn write_file_errors() {
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let store = new_byte_store(&cas);
let error = store
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn list_missing_digests_some_missing() {
#[tokio::test]
async fn list_missing_digests_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let store = new_byte_store(&cas);

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn load_file_remote_error_is_error() {
let dir = TempDir::new().unwrap();

let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();
let error = load_file_bytes(
&new_store(dir.path(), &cas.address()),
TestData::roland().digest(),
Expand All @@ -303,7 +303,7 @@ async fn load_directory_remote_error_is_error() {
let dir = TempDir::new().unwrap();

let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();
let error = new_store(dir.path(), &cas.address())
.load_directory(TestData::roland().digest())
.await
Expand Down
121 changes: 77 additions & 44 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::time::sleep;
use fs::{DirectoryDigest, RelativePath, EMPTY_DIRECTORY_DIGEST};
use grpc_util::tls;
use hashing::{Digest, EMPTY_DIGEST};
use mock::{StubActionCache, StubCAS};
use mock::StubCAS;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use store::Store;
use testutil::data::{TestData, TestDirectory, TestTree};
Expand Down Expand Up @@ -74,14 +74,17 @@ impl CommandRunnerTrait for MockLocalCommandRunner {
struct StoreSetup {
pub store: Store,
pub _store_temp_dir: TempDir,
pub _cas: StubCAS,
pub cas: StubCAS,
pub executor: task_executor::Executor,
}

impl StoreSetup {
pub fn new() -> StoreSetup {
pub fn new() -> Self {
Self::new_with_stub_cas(StubCAS::builder().build())
}

pub fn new_with_stub_cas(cas: StubCAS) -> Self {
let executor = task_executor::Executor::new();
let cas = StubCAS::builder().build();
let store_temp_dir = TempDir::new().unwrap();
let store_dir = store_temp_dir.path().join("store_dir");
let store = Store::local_only(executor.clone(), store_dir)
Expand All @@ -99,10 +102,10 @@ impl StoreSetup {
4 * 1024 * 1024,
)
.unwrap();
StoreSetup {
Self {
store,
_store_temp_dir: store_temp_dir,
_cas: cas,
cas,
executor,
}
}
Expand All @@ -124,18 +127,15 @@ fn create_local_runner(
fn create_cached_runner(
local: Box<dyn CommandRunnerTrait>,
store_setup: &StoreSetup,
read_delay_ms: u64,
write_delay_ms: u64,
eager_fetch: bool,
) -> (Box<dyn CommandRunnerTrait>, StubActionCache) {
let action_cache = StubActionCache::new_with_delays(read_delay_ms, write_delay_ms).unwrap();
let runner = Box::new(
) -> Box<dyn CommandRunnerTrait> {
Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
ProcessMetadata::default(),
store_setup.executor.clone(),
store_setup.store.clone(),
&action_cache.address(),
&store_setup.cas.address(),
None,
BTreeMap::default(),
Platform::current().unwrap(),
Expand All @@ -147,8 +147,7 @@ fn create_cached_runner(
CACHE_READ_TIMEOUT,
)
.expect("caching command runner"),
);
(runner, action_cache)
)
}

// TODO: Unfortunately, this code cannot be moved to the `testutil::mock` crate, because that
Expand All @@ -171,10 +170,13 @@ async fn cache_read_success() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);

let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
store_setup
.cas
.action_cache
.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
Expand All @@ -192,11 +194,18 @@ async fn cache_read_skipped_on_action_cache_errors() {
let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 500);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);

let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
action_cache.always_errors.store(true, Ordering::SeqCst);
store_setup
.cas
.action_cache
.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
store_setup
.cas
.action_cache
.always_errors
.store(true, Ordering::SeqCst);

assert_eq!(
workunit_store.get_metrics().get("remote_cache_read_errors"),
Expand All @@ -222,11 +231,11 @@ async fn cache_read_skipped_on_store_errors() {
let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 500);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, true);
let cache_runner = create_cached_runner(local_runner, &store_setup, true);

// Claim that the process has a non-empty and not-persisted stdout digest.
let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(
store_setup.cas.action_cache.insert(
action_digest,
0,
Digest::of_bytes("pigs flying".as_bytes()),
Expand Down Expand Up @@ -260,11 +269,10 @@ async fn cache_read_eager_fetch() {
async fn run_process(eager_fetch: bool, workunit: &mut RunningWorkunit) -> (i32, usize) {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, 0, 0, eager_fetch);
let cache_runner = create_cached_runner(local_runner, &store_setup, eager_fetch);

let (process, action_digest) = create_process(&store_setup).await;
action_cache.insert(
store_setup.cas.action_cache.insert(
action_digest,
0,
TestData::roland().digest(),
Expand Down Expand Up @@ -300,14 +308,20 @@ async fn cache_read_speculation() {
cache_hit: bool,
workunit: &mut RunningWorkunit,
) -> (i32, usize) {
let store_setup = StoreSetup::new();
let store_setup = StoreSetup::new_with_stub_cas(
StubCAS::builder()
.ac_read_delay(Duration::from_millis(remote_delay_ms))
.build(),
);
let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, remote_delay_ms, 0, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);

let (process, action_digest) = create_process(&store_setup).await;
if cache_hit {
action_cache.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
store_setup
.cas
.action_cache
.insert(action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
}

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
Expand Down Expand Up @@ -341,11 +355,11 @@ async fn cache_write_success() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);
let (process, action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
assert!(store_setup.cas.action_cache.action_map.lock().is_empty());

let local_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
Expand All @@ -356,20 +370,28 @@ async fn cache_write_success() {

// Wait for the cache write block to finish.
sleep(Duration::from_secs(1)).await;
assert_eq!(action_cache.len(), 1);
assert_eq!(action_cache.get(action_digest).unwrap().exit_code, 0);
assert_eq!(store_setup.cas.action_cache.len(), 1);
assert_eq!(
store_setup
.cas
.action_cache
.get(action_digest)
.unwrap()
.exit_code,
0
);
}

#[tokio::test]
async fn cache_write_not_for_failures() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, 100);
let (cache_runner, action_cache) = create_cached_runner(local_runner, &store_setup, 0, 0, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);
let (process, _action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
assert!(store_setup.cas.action_cache.action_map.lock().is_empty());

let local_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
Expand All @@ -380,21 +402,24 @@ async fn cache_write_not_for_failures() {

// Wait for the cache write block to finish.
sleep(Duration::from_millis(100)).await;
assert!(action_cache.action_map.lock().is_empty());
assert!(store_setup.cas.action_cache.action_map.lock().is_empty());
}

/// Cache writes should be async and not block the CommandRunner from returning.
#[tokio::test]
async fn cache_write_does_not_block() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let store_setup = StoreSetup::new_with_stub_cas(
StubCAS::builder()
.ac_write_delay(Duration::from_millis(100))
.build(),
);
let (local_runner, local_runner_call_counter) = create_local_runner(0, 100);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, &store_setup, 0, 100, false);
let cache_runner = create_cached_runner(local_runner, &store_setup, false);
let (process, action_digest) = create_process(&store_setup).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
assert!(action_cache.action_map.lock().is_empty());
assert!(store_setup.cas.action_cache.action_map.lock().is_empty());

let local_result = cache_runner
.run(Context::default(), &mut workunit, process.clone().into())
Expand All @@ -405,11 +430,19 @@ async fn cache_write_does_not_block() {

// We expect the cache write to have not finished yet, even though we already finished
// CommandRunner::run().
assert!(action_cache.action_map.lock().is_empty());
assert!(store_setup.cas.action_cache.action_map.lock().is_empty());

sleep(Duration::from_secs(1)).await;
assert_eq!(action_cache.len(), 1);
assert_eq!(action_cache.get(action_digest).unwrap().exit_code, 0);
assert_eq!(store_setup.cas.action_cache.len(), 1);
assert_eq!(
store_setup
.cas
.action_cache
.get(action_digest)
.unwrap()
.exit_code,
0
);
}

#[tokio::test]
Expand Down Expand Up @@ -562,13 +595,13 @@ async fn make_action_result_basic() {
.expect("Error saving directory");

let mock_command_runner = Arc::new(MockCommandRunner);
let action_cache = StubActionCache::new().unwrap();
let cas = StubCAS::builder().build();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
ProcessMetadata::default(),
executor.clone(),
store.clone(),
&action_cache.address(),
&cas.address(),
None,
BTreeMap::default(),
Platform::current().unwrap(),
Expand Down
Loading

0 comments on commit e53a231

Please sign in to comment.