diff --git a/deployment-examples/docker-compose/local-storage-cas.json b/deployment-examples/docker-compose/local-storage-cas.json index 65dbea88c..b54b96a1a 100644 --- a/deployment-examples/docker-compose/local-storage-cas.json +++ b/deployment-examples/docker-compose/local-storage-cas.json @@ -25,7 +25,7 @@ } }, "verify_size": true, - "verify_hash": true + "hash_verification_function": "sha256" } }, "AC_MAIN_STORE": { diff --git a/deployment-examples/terraform/experimental_AWS/scripts/cas.json b/deployment-examples/terraform/experimental_AWS/scripts/cas.json index 65d2187fa..472fcf160 100644 --- a/deployment-examples/terraform/experimental_AWS/scripts/cas.json +++ b/deployment-examples/terraform/experimental_AWS/scripts/cas.json @@ -108,7 +108,7 @@ } }, "verify_size": true, - "verify_hash": true + "hash_verification_function": "sha256" } } }, diff --git a/nativelink-config/README.md b/nativelink-config/README.md index fbc41bee9..036894f23 100644 --- a/nativelink-config/README.md +++ b/nativelink-config/README.md @@ -291,10 +291,11 @@ the rest will be stored in AWS's S3: This store is special. It's only job is to verify the content as it is fetched and uploaded to ensure it meets some criteria or errors. This store should only -be added to the CAS. If `verify_hash` is set to true, it will apply a sha256 -algorithm on the data as it is sent/received and at the end if it does not match -the name of the digest it will cancel the upload/download and return an error. -If `verify_size` is set, a similar item will happen, but count the bytes sent +be added to the CAS. If `hash_verification_function` is set, it will apply the +hashing algorithm on the data as it is sent/received and at the end if it does +not match the name of the digest it will cancel the upload/download and return +an error. If it's not set, the hashing verification will be disabled. +If `verify_size` is set, a similar item will happen, but count the bytes sent and check it against the digest instead. ```js @@ -311,7 +312,8 @@ and check it against the digest instead. } }, "verify_size": true, - "verify_hash": true, + // sha256 or blake3 + "hash_verification_function": "sha256", } }, "AC_MAIN_STORE": { diff --git a/nativelink-config/examples/filesystem_cas.json b/nativelink-config/examples/filesystem_cas.json index bbc6d8d63..00dc65a38 100644 --- a/nativelink-config/examples/filesystem_cas.json +++ b/nativelink-config/examples/filesystem_cas.json @@ -76,7 +76,7 @@ } }, "verify_size": true, - "verify_hash": true + "hash_verification_function": "sha256" } }, "AC_MAIN_STORE": { diff --git a/nativelink-config/examples/s3_backend_with_local_fast_cas.json b/nativelink-config/examples/s3_backend_with_local_fast_cas.json index 35b6bd3bd..72841ff21 100644 --- a/nativelink-config/examples/s3_backend_with_local_fast_cas.json +++ b/nativelink-config/examples/s3_backend_with_local_fast_cas.json @@ -68,7 +68,7 @@ } }, "verify_size": true, - "verify_hash": true + "hash_verification_function": "sha256" } }, "AC_MAIN_STORE": { diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 0aea903a9..a771f61c3 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -20,7 +20,7 @@ use crate::schedulers::SchedulerConfig; use crate::serde_utils::{ convert_numeric_with_shellexpand, convert_optinoal_numeric_with_shellexpand, convert_string_with_shellexpand, }; -use crate::stores::{StoreConfig, StoreRefName}; +use crate::stores::{ConfigDigestHashFunction, StoreConfig, StoreRefName}; /// Name of the scheduler. This type will be used when referencing a /// scheduler in the `CasConfig::schedulers`'s map key. @@ -543,18 +543,6 @@ pub enum WorkerConfig { local(LocalWorkerConfig), } -#[allow(non_camel_case_types)] -#[derive(Deserialize, Debug, Clone, Copy)] -pub enum ConfigDigestHashFunction { - /// Use the sha256 hash function. - /// - sha256, - - /// Use the blake3 hash function. - /// - blake3, -} - #[derive(Deserialize, Debug, Clone, Copy)] pub struct GlobalConfig { /// Maximum number of open files that can be opened at one time. diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 597373d27..6b5e5a662 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -20,6 +20,18 @@ use crate::serde_utils::{convert_numeric_with_shellexpand, convert_string_with_s /// in the `CasConfig::stores`'s map key. pub type StoreRefName = String; +#[allow(non_camel_case_types)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub enum ConfigDigestHashFunction { + /// Use the sha256 hash function. + /// + sha256, + + /// Use the blake3 hash function. + /// + blake3, +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StoreConfig { @@ -329,12 +341,13 @@ pub struct VerifyStore { #[serde(default)] pub verify_size: bool, - /// If set this store will hash the contents and verify it matches the - /// digest hash before writing the entry to underlying store. + /// The digest hash function to hash the contents and to verify if the digest hash is + /// matching before writing the entry to underlying store. /// - /// This should be set to false for AC, but true for CAS stores. - #[serde(default)] - pub verify_hash: bool, + /// If None, the hash verification will be disabled. + /// + /// This should be set to None for AC, but hashing function like `sha256` for CAS stores. + pub hash_verification_function: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 6716adce2..61a60fb3d 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -17,17 +17,18 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use nativelink_config::stores::ConfigDigestHashFunction; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf}; use nativelink_util::common::DigestInfo; +use nativelink_util::digest_hasher::{DigestHasher, DigestHasherFunc}; use nativelink_util::metrics_utils::{Collector, CollectorState, CounterWithTime, MetricsComponent, Registry}; use nativelink_util::store_trait::{Store, UploadSizeInfo}; -use sha2::{Digest, Sha256}; pub struct VerifyStore { inner_store: Arc, verify_size: bool, - verify_hash: bool, + hash_verification_function: Option, // Metrics. size_verification_failures: CounterWithTime, @@ -39,7 +40,7 @@ impl VerifyStore { VerifyStore { inner_store, verify_size: config.verify_size, - verify_hash: config.verify_hash, + hash_verification_function: config.hash_verification_function, size_verification_failures: CounterWithTime::default(), hash_verification_failures: CounterWithTime::default(), } @@ -54,7 +55,8 @@ impl VerifyStore { mut tx: DropCloserWriteHalf, mut rx: DropCloserReadHalf, size_info: UploadSizeInfo, - mut maybe_hasher: Option<([u8; 32], Sha256)>, + original_hash: [u8; 32], + mut maybe_hasher: Option, ) -> Result<(), Error> { let mut sum_size: u64 = 0; loop { @@ -76,8 +78,9 @@ impl VerifyStore { )); } } - if let Some((original_hash, hasher)) = maybe_hasher { - let hash_result: [u8; 32] = hasher.finalize().into(); + if let Some(hasher) = maybe_hasher.as_mut() { + // We are passing -1 here because we just need to get the hashing result not the size. + let hash_result: [u8; 32] = hasher.finalize_digest(-1).packed_hash; if original_hash != hash_result { self.hash_verification_failures.inc(); return Err(make_input_err!( @@ -94,7 +97,7 @@ impl VerifyStore { // This will allows us to hash while sending data to another thread. let write_future = tx.send(chunk.clone()); - if let Some((_, hasher)) = maybe_hasher.as_mut() { + if let Some(hasher) = maybe_hasher.as_mut() { hasher.update(chunk.as_ref()); } @@ -135,15 +138,14 @@ impl Store for VerifyStore { } } - let mut hasher = None; - if self.verify_hash { - hasher = Some((digest.packed_hash, Sha256::new())); - } + let hasher = self + .hash_verification_function + .map(|v| DigestHasher::from(DigestHasherFunc::from(v))); let (tx, rx) = make_buf_channel_pair(); let update_fut = self.pin_inner().update(digest, rx, size_info); - let check_fut = self.inner_check_update(tx, reader, size_info, hasher); + let check_fut = self.inner_check_update(tx, reader, size_info, digest.packed_hash, hasher); let (update_res, check_res) = tokio::join!(update_fut, check_fut); @@ -181,9 +183,9 @@ impl MetricsComponent for VerifyStore { "If the verification store is verifying the size of the data", ); c.publish( - "verify_hash_enabled", - &self.verify_hash, - "If the verification store is verifying the hash of the data", + "hash_verification_function", + &format!("{:?}", self.hash_verification_function), + "Hash verification function to verify the contents of the data", ); c.publish( "size_verification_failures_total", diff --git a/nativelink-store/tests/verify_store_test.rs b/nativelink-store/tests/verify_store_test.rs index 72a200a83..07974f4f9 100644 --- a/nativelink-store/tests/verify_store_test.rs +++ b/nativelink-store/tests/verify_store_test.rs @@ -40,7 +40,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: false, - verify_hash: false, + hash_verification_function: None, }, inner_store.clone(), ); @@ -72,7 +72,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: true, - verify_hash: false, + hash_verification_function: None, }, inner_store.clone(), ); @@ -112,7 +112,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: true, - verify_hash: false, + hash_verification_function: None, }, inner_store.clone(), ); @@ -139,7 +139,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: true, - verify_hash: false, + hash_verification_function: None, }, inner_store.clone(), ); @@ -167,7 +167,7 @@ mod verify_store_tests { } #[tokio::test] - async fn verify_hash_true_suceeds_on_update() -> Result<(), Error> { + async fn verify_sha256_hash_true_suceeds_on_update() -> Result<(), Error> { let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default())); let store_owned = VerifyStore::new( &nativelink_config::stores::VerifyStore { @@ -175,7 +175,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: false, - verify_hash: true, + hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::sha256), }, inner_store.clone(), ); @@ -196,7 +196,7 @@ mod verify_store_tests { } #[tokio::test] - async fn verify_hash_true_fails_on_update() -> Result<(), Error> { + async fn verify_sha256_hash_true_fails_on_update() -> Result<(), Error> { let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default())); let store_owned = VerifyStore::new( &nativelink_config::stores::VerifyStore { @@ -204,7 +204,7 @@ mod verify_store_tests { nativelink_config::stores::MemoryStore::default(), ), verify_size: false, - verify_hash: true, + hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::sha256), }, inner_store.clone(), ); @@ -231,4 +231,70 @@ mod verify_store_tests { ); Ok(()) } + + #[tokio::test] + async fn verify_blake3_hash_true_suceeds_on_update() -> Result<(), Error> { + let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default())); + let store_owned = VerifyStore::new( + &nativelink_config::stores::VerifyStore { + backend: nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + verify_size: false, + hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::blake3), + }, + inner_store.clone(), + ); + let store = Pin::new(&store_owned); + + /// This value is blake3("123"). + const HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e"; + const VALUE: &str = "123"; + let digest = DigestInfo::try_new(HASH, 3).unwrap(); + let result = store.update_oneshot(digest, VALUE.into()).await; + assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); + assert_eq!( + Pin::new(inner_store.as_ref()).has(digest).await, + Ok(Some(VALUE.len())), + "Expected data to exist in store after update" + ); + Ok(()) + } + + #[tokio::test] + async fn verify_blake3_hash_true_fails_on_update() -> Result<(), Error> { + let inner_store = Arc::new(MemoryStore::new(&nativelink_config::stores::MemoryStore::default())); + let store_owned = VerifyStore::new( + &nativelink_config::stores::VerifyStore { + backend: nativelink_config::stores::StoreConfig::memory( + nativelink_config::stores::MemoryStore::default(), + ), + verify_size: false, + hash_verification_function: Some(nativelink_config::stores::ConfigDigestHashFunction::blake3), + }, + inner_store.clone(), + ); + let store = Pin::new(&store_owned); + + /// This value is blake3("12"). + const HASH: &str = "b944a0a3b20cf5927e594ff306d256d16cd5b0ba3e27b3285f40d7ef5e19695b"; + const VALUE: &str = "123"; + let digest = DigestInfo::try_new(HASH, 3).unwrap(); + let result = store.update_oneshot(digest, VALUE.into()).await; + let err = result.unwrap_err().to_string(); + const ACTUAL_HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e"; + let expected_err = format!("Hashes do not match, got: {} but digest hash was {}", HASH, ACTUAL_HASH); + assert!( + err.contains(&expected_err), + "Error should contain '{}', got: {:?}", + expected_err, + err + ); + assert_eq!( + Pin::new(inner_store.as_ref()).has(digest).await, + Ok(None), + "Expected data to not exist in store after update" + ); + Ok(()) + } } diff --git a/nativelink-util/src/digest_hasher.rs b/nativelink-util/src/digest_hasher.rs index d52f6044b..b9febf8ad 100644 --- a/nativelink-util/src/digest_hasher.rs +++ b/nativelink-util/src/digest_hasher.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use blake3::Hasher as Blake3Hasher; -use nativelink_config::cas_server::ConfigDigestHashFunction; +use nativelink_config::stores::ConfigDigestHashFunction; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction; use sha2::{Digest, Sha256}; diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 2498652ec..29de99661 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -25,8 +25,9 @@ use futures::FutureExt; use hyper::server::conn::Http; use hyper::{Response, StatusCode}; use nativelink_config::cas_server::{ - CasConfig, CompressionAlgorithm, ConfigDigestHashFunction, GlobalConfig, ListenerConfig, ServerConfig, WorkerConfig, + CasConfig, CompressionAlgorithm, GlobalConfig, ListenerConfig, ServerConfig, WorkerConfig, }; +use nativelink_config::stores::ConfigDigestHashFunction; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_scheduler::default_scheduler_factory::scheduler_factory; use nativelink_scheduler::worker::WorkerId;