diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 5d14a731e..100b08d33 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -55,7 +55,7 @@ impl VerifyStore { &self, mut tx: DropCloserWriteHalf, mut rx: DropCloserReadHalf, - size_info: UploadSizeInfo, + maybe_expected_digest_size: Option, original_hash: [u8; 32], mut maybe_hasher: Option<&mut D>, ) -> Result<(), Error> { @@ -64,13 +64,42 @@ impl VerifyStore { let chunk = rx .recv() .await - .err_tip(|| "Failed to reach chunk in check_update in verify store")?; + .err_tip(|| "Failed to read chunk in check_update in verify store")?; sum_size += chunk.len() as u64; - if chunk.is_empty() { - // Is EOF. - if let UploadSizeInfo::ExactSize(expected_size) = size_info { - if sum_size != expected_size as u64 { + let mut done = chunk.is_empty(); // Is EOF. + + if let Some(expected_size) = maybe_expected_digest_size { + match sum_size.cmp(&expected_size) { + std::cmp::Ordering::Greater => { + self.size_verification_failures.inc(); + return Err(make_input_err!( + "Expected size {} but already received {} on insert", + expected_size, + sum_size + )); + } + std::cmp::Ordering::Equal => { + let eof_chunk = rx + .recv() + .await + .err_tip(|| "Failed to read eof_chunk in verify store")?; + if !eof_chunk.is_empty() { + self.size_verification_failures.inc(); + return Err(make_input_err!( + "Expected EOF chunk when exact size was hit on insert in verify store - {}", + expected_size, + )); + } + done = true; + } + std::cmp::Ordering::Less => {} + } + } + + if done { + if let Some(expected_size) = maybe_expected_digest_size { + if sum_size != expected_size { self.size_verification_failures.inc(); return Err(make_input_err!( "Expected size {} but got size {} on insert", @@ -79,7 +108,7 @@ impl VerifyStore { )); } } - if let Some(hasher) = maybe_hasher { + if let Some(hasher) = maybe_hasher.as_mut() { let hash_result: [u8; 32] = hasher.finalize_digest().packed_hash; if original_hash != hash_result { self.hash_verification_failures.inc(); @@ -90,6 +119,9 @@ impl VerifyStore { )); } } + } + + if chunk.is_empty() { tx.send_eof().err_tip(|| "In verify_store::check_update")?; break; } @@ -104,6 +136,13 @@ impl VerifyStore { write_future .await .err_tip(|| "Failed to write chunk to inner store in verify store")?; + + // If are done, but not an empty `chunk`, it means we are at the exact + // size match and already received the EOF chunk above. + if done { + tx.send_eof().err_tip(|| "In verify_store::check_update")?; + break; + } } Ok(()) } @@ -133,10 +172,10 @@ impl StoreDriver for VerifyStore { )); } }; - let digest_size = usize::try_from(digest.size_bytes) + let digest_size = u64::try_from(digest.size_bytes) .err_tip(|| "Digest size_bytes was not convertible to usize")?; if let UploadSizeInfo::ExactSize(expected_size) = size_info { - if self.verify_size && expected_size != digest_size { + if self.verify_size && expected_size as u64 != digest_size { self.size_verification_failures.inc(); return Err(make_input_err!( "Expected size to match. Got {} but digest says {} on update", @@ -157,11 +196,21 @@ impl StoreDriver for VerifyStore { None }; + let maybe_digest_size = if self.verify_size { + Some(digest_size) + } else { + None + }; let (tx, rx) = make_buf_channel_pair(); let update_fut = self.inner_store.update(digest, rx, size_info); - let check_fut = - self.inner_check_update(tx, reader, size_info, digest.packed_hash, hasher.as_mut()); + let check_fut = self.inner_check_update( + tx, + reader, + maybe_digest_size, + digest.packed_hash, + hasher.as_mut(), + ); let (update_res, check_res) = tokio::join!(update_fut, check_fut); diff --git a/nativelink-store/tests/verify_store_test.rs b/nativelink-store/tests/verify_store_test.rs index 843bf9f23..21bc79313 100644 --- a/nativelink-store/tests/verify_store_test.rs +++ b/nativelink-store/tests/verify_store_test.rs @@ -14,6 +14,7 @@ use std::pin::Pin; +use futures::future::pending; use futures::try_join; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; @@ -143,12 +144,11 @@ async fn verify_size_true_suceeds_on_multi_chunk_stream_update() -> Result<(), E let (mut tx, rx) = make_buf_channel_pair(); let digest = DigestInfo::try_new(VALID_HASH1, 6).unwrap(); - let digest_clone = digest; let future = spawn!( "verify_size_true_suceeds_on_multi_chunk_stream_update", async move { Pin::new(&store) - .update(digest_clone, rx, UploadSizeInfo::ExactSize(6)) + .update(digest, rx, UploadSizeInfo::ExactSize(6)) .await }, ); @@ -304,3 +304,50 @@ async fn verify_blake3_hash_true_fails_on_update() -> Result<(), Error> { ); Ok(()) } + +// A potential bug could happen if the down stream component ignores the EOF but will +// stop receiving data when the expected size is reached. We should ensure this edge +// case is double protected. +#[nativelink_test] +async fn verify_fails_immediately_on_too_much_data_sent_update() -> Result<(), Error> { + let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store = VerifyStore::new( + &nativelink_config::stores::VerifyStore { + backend: nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + verify_size: true, + verify_hash: false, + }, + Store::new(inner_store.clone()), + ); + + const VALUE: &str = "123"; + let digest = DigestInfo::try_new(VALID_HASH1, 4).unwrap(); + let (mut tx, rx) = make_buf_channel_pair(); + let send_fut = async move { + tx.send(VALUE.into()).await?; + tx.send(VALUE.into()).await?; + pending::<()>().await; + panic!("Should not reach here"); + #[allow(unreachable_code)] + Ok(()) + }; + let result = try_join!( + send_fut, + store.update(digest, rx, UploadSizeInfo::ExactSize(4)) + ); + assert!(result.is_err(), "Expected error, got: {:?}", &result); + const EXPECTED_ERR: &str = "Expected size 4 but already received 6 on insert"; + let err = result.unwrap_err().to_string(); + assert!( + err.contains(EXPECTED_ERR), + "Error should contain '{EXPECTED_ERR}', got: {err:?}" + ); + assert_eq!( + inner_store.has(digest).await, + Ok(None), + "Expected data to not exist in store after update" + ); + Ok(()) +}