Skip to content

Commit

Permalink
Unify validate_output logic in CompletenessCheckingStore and CacheLoo…
Browse files Browse the repository at this point in the history
…kupScheduler
  • Loading branch information
blizzardc0der authored and steedmicro committed Apr 2, 2024
1 parent b78d971 commit e22f2b9
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 10 deletions.
25 changes: 25 additions & 0 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ use nativelink_error::Error;
use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, ActionResult as ProtoActionResult, GetActionResultRequest,
};
<<<<<<< Updated upstream
use nativelink_store::ac_utils::{get_and_decode_digest, get_digests_info};
=======
use nativelink_store::ac_utils::{get_and_decode_digest, get_digests_info, DigestInputType};
>>>>>>> Stashed changes
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState,
Expand Down Expand Up @@ -90,6 +94,7 @@ async fn validate_outputs_exist(
action_result: &ProtoActionResult,
) -> bool {
// Verify that output_files and output_directories are available in the cas.
<<<<<<< Updated upstream
let Ok(mut digest_infos) =
get_digests_info(action_result.output_files.clone(), &move |digests| {
Box::new(
Expand All @@ -99,6 +104,26 @@ async fn validate_outputs_exist(
)
})
else {
=======

let Ok(mut digest_infos) = get_digests_info(
action_result,
DigestInputType::OutputFiles(&action_result.output_files),
false,
) else {
return false;
};
let Ok(mut digest_output_infos) = get_digests_info(
action_result,
DigestInputType::OutputDirectories(&action_result.output_directories),
false,
) else {
return false;
};
digest_infos.append(&mut digest_output_infos);

let Ok(sizes) = Pin::new(cas_store.as_ref()).has_many(&digest_infos).await else {
>>>>>>> Stashed changes
return false;
};

Expand Down
60 changes: 60 additions & 0 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use std::pin::Pin;
use bytes::BytesMut;
use futures::TryFutureExt;
use nativelink_error::{Code, Error, ResultExt};
use nativelink_proto::build::bazel::remote::execution::v2::{
ActionResult, Directory, OutputDirectory, OutputFile,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasher;
use nativelink_util::store_trait::Store;
Expand Down Expand Up @@ -82,12 +85,69 @@ pub async fn get_size_and_decode_digest<T: Message + Default>(

/// Given a proto action result, return all relevant digests and
/// output directories that need to be checked.
<<<<<<< Updated upstream
pub fn get_digests_info<T>(
digests: Vec<T>,
handle_digest: &impl Fn(Vec<T>) -> Box<dyn Iterator<Item = Result<DigestInfo, Error>>>,
) -> Result<Vec<DigestInfo>, Error> {
// TODO(allada) When `try_collect()` is stable we can use it instead.
let mut digest_iter = handle_digest(digests);
=======
pub enum DigestInputType<'a> {
OutputFiles(&'a Vec<OutputFile>),
OutputDirectories(&'a Vec<OutputDirectory>),
Directories(&'a Vec<Directory>),
}

pub fn get_digests_info(
action_result: &ActionResult,
digest_input: DigestInputType,
is_contain_std: bool,
) -> Result<Vec<DigestInfo>, Error> {
// TODO(allada) When `try_collect()` is stable we can use it instead.
let mut digest_iter = match digest_input {
DigestInputType::OutputFiles(output_files) => {
let output_files = output_files
.iter()
.filter_map(|output_file| output_file.digest.as_ref().map(DigestInfo::try_from));

if is_contain_std {
output_files
.chain(
action_result
.stdout_digest
.as_ref()
.map(DigestInfo::try_from),
)
.chain(
action_result
.stderr_digest
.as_ref()
.map(DigestInfo::try_from),
)
.collect::<Vec<_>>()
.into_iter()
} else {
output_files.collect::<Vec<_>>().into_iter()
}
}
DigestInputType::OutputDirectories(output_files) => output_files
.iter()
.filter_map(|output_file| output_file.tree_digest.as_ref().map(DigestInfo::try_from))
.collect::<Vec<_>>()
.into_iter(),
DigestInputType::Directories(dir) => dir
.iter()
.flat_map(|dir| {
dir.files
.iter()
.filter_map(|f| f.digest.as_ref().map(DigestInfo::try_from))
})
.collect::<Vec<_>>()
.into_iter(),
};

>>>>>>> Stashed changes
let mut digest_infos = Vec::with_capacity(digest_iter.size_hint().1.unwrap_or(0));
digest_iter
.try_for_each(|maybe_digest| {
Expand Down
57 changes: 47 additions & 10 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ use parking_lot::Mutex;
use tokio::sync::Notify;
use tracing::warn;

<<<<<<< Updated upstream
use crate::ac_utils::{get_and_decode_digest, get_digests_info, get_size_and_decode_digest};
=======
use crate::ac_utils::{
get_and_decode_digest, get_digests_info, get_size_and_decode_digest, DigestInputType,
};
>>>>>>> Stashed changes

pub struct CompletenessCheckingStore {
cas_store: Arc<dyn Store>,
Expand All @@ -51,6 +57,7 @@ impl CompletenessCheckingStore {
/// that need to be checked and pass them into `handle_digest_infos_fn`
/// as they are found.
async fn check_output_directories(
action_result: &ProtoActionResult,
cas_store: Pin<&dyn Store>,
tree_digests: Vec<DigestInfo>,
handle_digest_infos_fn: &impl Fn(Vec<DigestInfo>),
Expand All @@ -63,6 +70,7 @@ async fn check_output_directories(
// TODO(allada) When `try_collect()` is stable we can use it instead.
// https://github.com/rust-lang/rust/issues/94047
let digest_infos = get_digests_info(
<<<<<<< Updated upstream
tree.children.into_iter().chain(tree.root).collect(),
&move |tree| {
Box::new(tree.into_iter().flat_map(|dir| {
Expand All @@ -71,6 +79,17 @@ async fn check_output_directories(
.filter_map(|f| f.digest.map(DigestInfo::try_from))
}))
},
=======
action_result,
DigestInputType::Directories(
&tree
.children
.into_iter()
.chain(tree.root)
.collect::<Vec<_>>(),
),
false,
>>>>>>> Stashed changes
)?;
handle_digest_infos_fn(digest_infos);
Ok(())
Expand Down Expand Up @@ -128,6 +147,7 @@ async fn inner_has_with_results(
let (action_result, size) =
get_size_and_decode_digest::<ProtoActionResult>(ac_store, digest).await?;

<<<<<<< Updated upstream
let mut digest_infos =
get_digests_info(action_result.output_files, &move |digests| {
Box::new(
Expand Down Expand Up @@ -155,6 +175,18 @@ async fn inner_has_with_results(
output_dir.tree_digest.map(DigestInfo::try_from)
}))
})?;
=======
let mut digest_infos = get_digests_info(
&action_result,
DigestInputType::OutputFiles(&action_result.output_files),
true,
)?;
let digest_output_infos = get_digests_info(
&action_result,
DigestInputType::OutputDirectories(&action_result.output_directories),
true,
)?;
>>>>>>> Stashed changes

{
let mut state = state_mux.lock();
Expand All @@ -180,19 +212,24 @@ async fn inner_has_with_results(

// Hot path: It is very common for no output directories to be defined.
// So we can avoid any needless work by early returning.
if output_directories.is_empty() {
if digest_output_infos.is_empty() {
return Ok(());
}

check_output_directories(cas_store, output_directories, &move |digest_infos| {
let mut state = state_mux.lock();
let rep_len = digest_infos.len();
state.digests_to_check.extend(digest_infos);
state
.digests_to_check_idxs
.extend(iter::repeat(i).take(rep_len));
state.notify.notify_one();
})
check_output_directories(
&action_result,
cas_store,
digest_output_infos,
&move |digest_infos| {
let mut state = state_mux.lock();
let rep_len = digest_infos.len();
state.digests_to_check.extend(digest_infos);
state
.digests_to_check_idxs
.extend(iter::repeat(i).take(rep_len));
state.notify.notify_one();
},
)
.await?;

Result::<(), Error>::Ok(())
Expand Down

0 comments on commit e22f2b9

Please sign in to comment.