Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedupe load_bytes_with calls to a remote Store (Cherry-pick of #15901) #15915

Merged
merged 1 commit into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 142 additions & 101 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ mod snapshot_ops_tests;
mod snapshot_tests;
pub use crate::snapshot_ops::{SnapshotOps, SubsetParams};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Debug, Display};
use std::fs::OpenOptions;
use std::future::Future;
use std::io::{self, Read, Write};
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
Expand Down Expand Up @@ -148,35 +149,78 @@ pub struct UploadSummary {
pub upload_wall_time: Duration,
}

///
/// Wraps a `remote::ByteStore` with state to help avoid uploading common blobs multiple times.
///
/// If a blob is generated that many downstream actions depend on, we would otherwise get an
/// expanded set of digests from each of those actions that includes the new blob. If those actions
/// all execute in a time window smaller than the time taken to upload the blob, the effort would be
/// duplicated leading to both wasted resources locally buffering up the blob as well as wasted
/// effort on the remote server depending on its handling of this.
///
#[derive(Clone, Debug)]
struct RemoteStore {
store: remote::ByteStore,
in_flight_uploads: Arc<parking_lot::Mutex<HashSet<Digest>>>,
in_flight_uploads: Arc<Mutex<HashMap<Digest, Weak<OnceCell<()>>>>>,
in_flight_downloads: Arc<Mutex<HashMap<Digest, Weak<OnceCell<()>>>>>,
}

impl RemoteStore {
fn new(store: remote::ByteStore) -> Self {
Self {
store,
in_flight_uploads: Arc::new(parking_lot::Mutex::new(HashSet::new())),
in_flight_uploads: Arc::default(),
in_flight_downloads: Arc::default(),
}
}

fn reserve_uploads(&self, candidates: HashSet<Digest>) -> HashSet<Digest> {
let mut active_uploads = self.in_flight_uploads.lock();
let to_upload = candidates
.difference(&active_uploads)
.cloned()
.collect::<HashSet<_>>();
active_uploads.extend(&to_upload);
to_upload
///
/// Returns a strongly held cell from a map of weakly held cells, creating it if necessary.
///
fn cell_from(
cells: &Mutex<HashMap<Digest, Weak<OnceCell<()>>>>,
digest: Digest,
) -> Arc<OnceCell<()>> {
let mut cells = cells.lock();
if let Some(cell) = cells.get(&digest).and_then(|weak_cell| weak_cell.upgrade()) {
cell
} else {
let cell = Arc::new(OnceCell::new());
cells.insert(digest, Arc::downgrade(&cell));
cell
}
}

fn release_uploads(&self, uploads: HashSet<Digest>) {
self
.in_flight_uploads
.lock()
.retain(|d| !uploads.contains(d));
///
/// Guards an attempt to upload the given `Digest`, skipping the upload if another attempt has
/// been successful. Will not return until either an attempt has succeed, or this attempt has
/// failed.
///
async fn maybe_upload<E>(
&self,
digest: Digest,
upload: impl Future<Output = Result<(), E>>,
) -> Result<(), E> {
Self::cell_from(&self.in_flight_uploads, digest)
.get_or_try_init(upload)
.await
.map(|&()| ())
}

///
/// Guards an attempt to download the given `Digest`, skipping the download if another attempt
/// has been successful. Will not return until either an attempt has succeed, or this attempt has
/// failed.
///
async fn maybe_download<E>(
&self,
digest: Digest,
upload: impl Future<Output = Result<(), E>>,
) -> Result<(), E> {
Self::cell_from(&self.in_flight_downloads, digest)
.get_or_try_init(upload)
.await
.map(|&()| ())
}
}

Expand Down Expand Up @@ -386,23 +430,19 @@ impl Store {
///
pub async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Send + Sync + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
) -> Result<T, StoreError> {
// No transformation or verification is needed for files, so we pass in a pair of functions
// which always succeed, whether the underlying bytes are coming from a local or remote store.
// Unfortunately, we need to be a little verbose to do this.
let f_local = Arc::new(f);
let f_remote = f_local.clone();
// No transformation or verification is needed for files.
self
.load_bytes_with(
EntryType::File,
digest,
move |v: &[u8]| Ok(f_local(v)),
move |v: Bytes| Ok(f_remote(&v)),
move |v: &[u8]| Ok(f(v)),
|_: Bytes| Ok(()),
)
.await
}
Expand Down Expand Up @@ -571,7 +611,7 @@ impl Store {
)
})?;
protos::verify_directory_canonical(digest, &directory)?;
Ok(directory)
Ok(())
},
)
.await
Expand Down Expand Up @@ -599,8 +639,8 @@ impl Store {
///
async fn load_bytes_with<
T: Send + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<T, String> + Send + Sync + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Clone + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
entry_type: EntryType,
Expand All @@ -613,51 +653,64 @@ impl Store {

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local)
.load_bytes_with(entry_type, digest, f_local.clone())
.await?
{
return Ok(bytes_res?);
}

let remote = maybe_remote
.ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?
.store;

let bytes = retry_call(
remote,
|remote| async move { remote.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
let remote = maybe_remote.ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

let value = f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(value)
} else {
Err(
format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.into(),
)
}
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await?;

Ok(
self
.local
.load_bytes_with(entry_type, digest, f_local)
.await?
.ok_or_else(|| {
format!("After downloading {digest:?}, the local store claimed that it was not present.")
})??,
)
}

///
Expand Down Expand Up @@ -700,45 +753,30 @@ impl Store {
remote.list_missing_digests(request).await?
};

let uploaded_digests = {
// Here we best-effort avoid uploading common blobs multiple times. If a blob is generated
// that many downstream actions depend on, we would otherwise get an expanded set of digests
// from each of those actions that includes the new blob. If those actions all execute in a
// time window smaller than the time taken to upload the blob, the effort would be
// duplicated leading to both wasted resources locally buffering up the blob as well as
// wasted effort on the remote server depending on its handling of this.
let to_upload = remote_store.reserve_uploads(digests_to_upload);
let uploaded_digests_result = future::try_join_all(
to_upload
.clone()
.into_iter()
.map(|digest| {
let entry_type = ingested_digests[&digest];
let local = store.local.clone();
let remote = remote.clone();
async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote, entry_type, digest).await
} else {
Self::store_small_blob_remote(local, remote, entry_type, digest).await
}
future::try_join_all(
digests_to_upload
.iter()
.cloned()
.map(|digest| {
let entry_type = ingested_digests[&digest];
let local = store.local.clone();
let remote = remote.clone();
remote_store.maybe_upload(digest, async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote, entry_type, digest).await
} else {
Self::store_small_blob_remote(local, remote, entry_type, digest).await
}
.map_ok(move |()| digest)
})
.collect::<Vec<_>>(),
)
.await;
// We release the uploads whether or not they actually succeeded. Future checks for large
// uploads will issue `find_missing_blobs_request`s that will eventually reconcile our
// accounting. In the mean-time we error on the side of at least once semantics.
remote_store.release_uploads(to_upload);
uploaded_digests_result?
};
})
.collect::<Vec<_>>(),
)
.await?;

let ingested_file_sizes = ingested_digests.iter().map(|(digest, _)| digest.size_bytes);
let uploaded_file_sizes = uploaded_digests.iter().map(|digest| digest.size_bytes);
let uploaded_file_sizes = digests_to_upload.iter().map(|digest| digest.size_bytes);

Ok(UploadSummary {
ingested_file_count: ingested_file_sizes.len(),
Expand Down Expand Up @@ -1361,7 +1399,10 @@ pub enum LocalMissingBehavior {
impl SnapshotOps for Store {
type Error = StoreError;

async fn load_file_bytes_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + Sync + 'static>(
async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
Expand Down
5 changes: 4 additions & 1 deletion src/rust/engine/fs/store/src/snapshot_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ async fn render_merge_error<T: SnapshotOps + 'static>(
pub trait SnapshotOps: Clone + Send + Sync + 'static {
type Error: Debug + Display + From<String>;

async fn load_file_bytes_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + Sync + 'static>(
async fn load_file_bytes_with<
T: Send + 'static,
F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static,
>(
&self,
digest: Digest,
f: F,
Expand Down