Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk FindMissingBlobsRequest appropriately #20708

Merged
merged 6 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/notes/2.26.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Thank you to [Klayvio](https://www.klaviyo.com/) and [Normal Computing](https://

### Highlights

### Remote caching/execution

- Remote cache: `FindMissingBlobsRequest` will now make multiple request if the number of files is large. (https://github.com/pantsbuild/pants/pull/20708)

### Deprecations

Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio-util = { workspace = true, features = ["io"] }
tonic = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
workunit_store = { path = "../../workunit_store" }
prost = { workspace = true }
tgolsson marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
mock = { path = "../../testutil/mock" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use workunit_store::{Metric, ObservationMetric};

use remote_provider_traits::{ByteStoreProvider, LoadDestination, RemoteStoreOptions};

const RPC_DIGEST_SIZE: usize = 70;

pub struct Provider {
instance_name: Option<String>,
chunk_size_bytes: usize,
Expand Down Expand Up @@ -393,24 +395,53 @@ impl ByteStoreProvider for Provider {
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String> {
let request = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.into_iter().map(|d| d.into()).collect::<Vec<_>>(),
};
let blob_digests = digests.into_iter().map(|d| d.into()).collect::<Vec<_>>();
tgolsson marked this conversation as resolved.
Show resolved Hide resolved

let client = self.cas_client.as_ref().clone();
const DEFAULT_MAX_GRPC_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
let max_digests_per_request: usize = (DEFAULT_MAX_GRPC_MESSAGE_SIZE
- self
.instance_name
.as_ref()
.cloned()
.unwrap_or_default()
.len()
- 10)
/ RPC_DIGEST_SIZE;

let requests = blob_digests.chunks(max_digests_per_request).map(|digests| {
tgolsson marked this conversation as resolved.
Show resolved Hide resolved
let msg = remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.to_vec(),
};

workunit_store::increment_counter_if_in_workunit(Metric::RemoteStoreExistsAttempts, 1);
let result = retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
)
.await
.map_err(status_to_str);
msg
});

let client = self.cas_client.as_ref();
let futures = requests
.map(|request| {
workunit_store::increment_counter_if_in_workunit(
Metric::RemoteStoreExistsAttempts,
1,
);

let client = client.clone();
retry_call(
client,
move |mut client, _| {
let request = request.clone();
async move { client.find_missing_blobs(request).await }
},
status_is_retryable,
)
})
.collect::<Vec<_>>();

let result = futures::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(status_to_str);

let metric = match result {
Ok(_) => Metric::RemoteStoreExistsSuccesses,
Expand All @@ -421,10 +452,45 @@ impl ByteStoreProvider for Provider {
let response = result?;

response
.into_inner()
.missing_blob_digests
.iter()
.map(|digest| digest.try_into())
.into_iter()
.flat_map(|response| {
response
.into_inner()
.missing_blob_digests
.into_iter()
.map(|digest| digest.try_into())
})
.collect::<Result<HashSet<_>, _>>()
}
}

#[cfg(test)]
mod tests {
use super::RPC_DIGEST_SIZE;
use crate::remexec::FindMissingBlobsRequest;
use prost::Message;
use testutil::data::TestData;

#[test]
fn test_size_of_find_missing_blobs_request() {
let mut blobs = Vec::new();
let instance_name = "";
// NOTE[TSolberg]: This test is a bit of a hack, but it's the best way I could think of to
// ensure that the size of the FindMissingBlobsRequest is roughly what we expect. The only
// delta would be the encoding of the instance name.
for it in (0..10).into_iter().chain(1000..1010).chain(10000..10010) {
while blobs.len() < it {
blobs.push(TestData::roland().digest().into());
}

let request = FindMissingBlobsRequest {
instance_name: instance_name.to_string(),
blob_digests: blobs.clone(),
};

let size = request.encoded_len();

assert_eq!(size, RPC_DIGEST_SIZE * it);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,51 @@ async fn list_missing_digests_none_missing() {
)
}

#[tokio::test]
async fn list_missing_digests_more_than_4mb() {
let testdata = TestData::roland();
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::builder().file(&testdata).build().await;

let provider = new_provider(&cas).await;

let test_data = (0..100_000).map(|_| testdata.digest()).collect::<Vec<_>>();
assert_eq!(
provider
.list_missing_digests(&mut test_data.into_iter())
.await,
Ok(HashSet::new())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test. I think there's a few plausible mistakes that we could introduce in future that won't be caught:

  1. only sending each unique digest once, thus invalidating this test (i.e. if we do the HashSet idea, the list_missing_digests will be working with just one digest and thus only send one request)
  2. skipping the second+ request (either not sending them at all, or not including the results in the final aggregation)
  3. Not sure about this one: if we break this in another way and start sending >4MiB requests, does the StubCAS reject the requests (i.e. does this test fail without the fix)?

I think we can address each of them:

  1. check assert_eq!(cas.request_count(RequestType::CASFindMissingBlobs), 2) so that the test starts failing if that core assumption changes
  2. add two digests that is missing, within each "chunk" (e.g. one at the start and one at the end)
  3. add request size enforcement to StubCAS if not already there (this might cause other tests to fail, so we might want it to be optional until we resolve them)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add request size enforcement to StubCAS if not already there (this might cause other tests to fail, so we might want it to be optional until we resolve them

If I remove my fix, the existing test fails.

assertion `left == right` failed
  left: Err("OutOfRange: \"Error, decoded message length too large: found 7000000 bytes, the limit is: 4194304 bytes\"")
 right: Ok({})

);

assert_eq!(cas.request_count(RequestType::CASFindMissingBlobs), 2)
}


#[tokio::test]
async fn list_missing_digests_more_than_4mb_some_missing() {
let testdata = TestData::roland();
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::builder().file(&testdata).build().await;

let provider = new_provider(&cas).await;


let henries = TestData::all_the_henries();
let robin = TestData::robin();
let mut test_data = vec![henries.digest()];
test_data.extend((0..100_000).map(|_| testdata.digest()));
test_data.push(robin.digest());

assert_eq!(
provider
.list_missing_digests(&mut test_data.into_iter())
.await,
Ok(HashSet::from([henries.digest(), robin.digest()]))
);

assert_eq!(cas.request_count(RequestType::CASFindMissingBlobs), 2)
}

#[tokio::test]
async fn list_missing_digests_some_missing() {
let cas = StubCAS::empty().await;
Expand Down
Loading