Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Double protect output stream of verify store #1180

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions nativelink-store/src/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl VerifyStore {
&self,
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
size_info: UploadSizeInfo,
maybe_expected_digest_size: Option<u64>,
original_hash: [u8; 32],
mut maybe_hasher: Option<&mut D>,
) -> Result<(), Error> {
Expand All @@ -64,13 +64,42 @@ impl VerifyStore {
let chunk = rx
.recv()
.await
.err_tip(|| "Failed to reach chunk in check_update in verify store")?;
.err_tip(|| "Failed to read chunk in check_update in verify store")?;
sum_size += chunk.len() as u64;

if chunk.is_empty() {
// Is EOF.
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if sum_size != expected_size as u64 {
let mut done = chunk.is_empty(); // Is EOF.

if let Some(expected_size) = maybe_expected_digest_size {
match sum_size.cmp(&expected_size) {
std::cmp::Ordering::Greater => {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size {} but already received {} on insert",
expected_size,
sum_size
));
}
std::cmp::Ordering::Equal => {
let eof_chunk = rx
.recv()
.await
.err_tip(|| "Failed to read eof_chunk in verify store")?;
if !eof_chunk.is_empty() {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected EOF chunk when exact size was hit on insert in verify store - {}",
expected_size,
));
}
done = true;
}
std::cmp::Ordering::Less => {}
}
}

if done {
if let Some(expected_size) = maybe_expected_digest_size {
if sum_size != expected_size {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size {} but got size {} on insert",
Expand All @@ -79,7 +108,7 @@ impl VerifyStore {
));
}
}
if let Some(hasher) = maybe_hasher {
if let Some(hasher) = maybe_hasher.as_mut() {
let hash_result: [u8; 32] = hasher.finalize_digest().packed_hash;
if original_hash != hash_result {
self.hash_verification_failures.inc();
Expand All @@ -90,6 +119,9 @@ impl VerifyStore {
));
}
}
}

if chunk.is_empty() {
tx.send_eof().err_tip(|| "In verify_store::check_update")?;
break;
}
Expand All @@ -104,6 +136,13 @@ impl VerifyStore {
write_future
.await
.err_tip(|| "Failed to write chunk to inner store in verify store")?;

// If are done, but not an empty `chunk`, it means we are at the exact
// size match and already received the EOF chunk above.
if done {
tx.send_eof().err_tip(|| "In verify_store::check_update")?;
break;
}
}
Ok(())
}
Expand Down Expand Up @@ -133,10 +172,10 @@ impl StoreDriver for VerifyStore {
));
}
};
let digest_size = usize::try_from(digest.size_bytes)
let digest_size = u64::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if self.verify_size && expected_size != digest_size {
if self.verify_size && expected_size as u64 != digest_size {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size to match. Got {} but digest says {} on update",
Expand All @@ -157,11 +196,21 @@ impl StoreDriver for VerifyStore {
None
};

let maybe_digest_size = if self.verify_size {
Some(digest_size)
} else {
None
};
let (tx, rx) = make_buf_channel_pair();

let update_fut = self.inner_store.update(digest, rx, size_info);
let check_fut =
self.inner_check_update(tx, reader, size_info, digest.packed_hash, hasher.as_mut());
let check_fut = self.inner_check_update(
tx,
reader,
maybe_digest_size,
digest.packed_hash,
hasher.as_mut(),
);

let (update_res, check_res) = tokio::join!(update_fut, check_fut);

Expand Down
51 changes: 49 additions & 2 deletions nativelink-store/tests/verify_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::pin::Pin;

use futures::future::pending;
use futures::try_join;
use nativelink_error::{Error, ResultExt};
use nativelink_macro::nativelink_test;
Expand Down Expand Up @@ -143,12 +144,11 @@ async fn verify_size_true_suceeds_on_multi_chunk_stream_update() -> Result<(), E
let (mut tx, rx) = make_buf_channel_pair();

let digest = DigestInfo::try_new(VALID_HASH1, 6).unwrap();
let digest_clone = digest;
let future = spawn!(
"verify_size_true_suceeds_on_multi_chunk_stream_update",
async move {
Pin::new(&store)
.update(digest_clone, rx, UploadSizeInfo::ExactSize(6))
.update(digest, rx, UploadSizeInfo::ExactSize(6))
.await
},
);
Expand Down Expand Up @@ -304,3 +304,50 @@ async fn verify_blake3_hash_true_fails_on_update() -> Result<(), Error> {
);
Ok(())
}

// A potential bug could happen if the down stream component ignores the EOF but will
// stop receiving data when the expected size is reached. We should ensure this edge
// case is double protected.
#[nativelink_test]
async fn verify_fails_immediately_on_too_much_data_sent_update() -> Result<(), Error> {
let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default());
let store = VerifyStore::new(
&nativelink_config::stores::VerifyStore {
backend: nativelink_config::stores::StoreConfig::memory(
nativelink_config::stores::MemoryStore::default(),
),
verify_size: true,
verify_hash: false,
},
Store::new(inner_store.clone()),
);

const VALUE: &str = "123";
let digest = DigestInfo::try_new(VALID_HASH1, 4).unwrap();
let (mut tx, rx) = make_buf_channel_pair();
let send_fut = async move {
tx.send(VALUE.into()).await?;
tx.send(VALUE.into()).await?;
pending::<()>().await;
panic!("Should not reach here");
#[allow(unreachable_code)]
Ok(())
};
let result = try_join!(
send_fut,
store.update(digest, rx, UploadSizeInfo::ExactSize(4))
);
assert!(result.is_err(), "Expected error, got: {:?}", &result);
const EXPECTED_ERR: &str = "Expected size 4 but already received 6 on insert";
let err = result.unwrap_err().to_string();
assert!(
err.contains(EXPECTED_ERR),
"Error should contain '{EXPECTED_ERR}', got: {err:?}"
);
assert_eq!(
inner_store.has(digest).await,
Ok(None),
"Expected data to not exist in store after update"
);
Ok(())
}