diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..75e9c4b6a 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -432,6 +432,21 @@ pub enum StoreSpec { noop(NoopSpec), } +impl StoreSpec { + // To enforce no duplicate connection configs for a store, add it to the matcher and implement + // disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique. + pub fn disallow_duplicates_digest(&self) -> Option { + match self { + Self::memory(spec) => Some(spec.disallow_duplicates_digest()), + Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()), + Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()), + Self::grpc(spec) => Some(spec.disallow_duplicates_digest()), + Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()), + _ => None, + } + } +} + /// Configuration for an individual shard of the store. #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] @@ -514,6 +529,12 @@ pub struct FilesystemSpec { pub block_size: u64, } +impl FilesystemSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.content_path, self.temp_path) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct FastSlowSpec { @@ -535,6 +556,12 @@ pub struct MemorySpec { pub eviction_policy: Option, } +impl MemorySpec { + pub fn disallow_duplicates_digest(&self) -> String { + "InMemoryStore".into() + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct DedupSpec { @@ -787,6 +814,14 @@ pub struct S3Spec { pub disable_http2: bool, } +impl S3Spec { + pub fn disallow_duplicates_digest(&self) -> String { + let key_prefix = self.key_prefix.as_deref().unwrap_or_default(); + + format!("{}{}{}", self.region, self.bucket, key_prefix) + } +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum StoreType { @@ -852,6 +887,22 @@ pub struct GrpcSpec { pub connections_per_endpoint: usize, } +impl GrpcSpec { + // todo: could improve duplication detection to individual endpoints to disallow accidental re-use + fn disallow_duplicates_digest(&self) -> String { + format!( + "{}{}", + self.instance_name, + self.endpoints + .clone() + .into_iter() + .map(|endpoint| endpoint.address) + .collect::>() + .join(",") + ) + } +} + /// The possible error codes that might occur on an upstream request. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum ErrorCode { @@ -993,6 +1044,12 @@ pub struct RedisSpec { pub retry: Retry, } +impl RedisSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.addresses.clone().join(","), self.key_prefix) + } +} + #[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum RedisMode { diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 99374b8bd..d28811d09 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -53,6 +53,7 @@ rust_test_suite( "tests/bytestream_server_test.rs", "tests/cas_server_test.rs", "tests/worker_api_server_test.rs", + "tests/store_overlap_rules_test.rs", ], proc_macro_deps = [ "//nativelink-macro", diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index ac7bb9b80..1eed0947a 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, }; use nativelink_service::ac_server::AcServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::StoreLike; @@ -53,24 +53,22 @@ async fn insert_into_store( async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); - store_manager.add_store( + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + make_and_add_store_to_manager( "main_ac", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index fe120a549..befe01054 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{ PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId, }; use nativelink_service::bep_server::BepServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::channel_body_for_tests::ChannelBody; @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep"; /// Utility function to construct a [`StoreManager`] async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( BEP_STORE_NAME, - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 97154e804..991bef6b5 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{ QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse, }; use nativelink_service::bytestream_server::ByteStreamServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::channel_body_for_tests::ChannelBody; use nativelink_util::common::{encode_stream_proto, DigestInfo}; @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index a9b6ffe34..642f7054e 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ use nativelink_proto::google::rpc::Status as GrpcStatus; use nativelink_service::cas_server::CasServer; use nativelink_store::ac_utils::serialize_and_upload_message; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH"; async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs new file mode 100644 index 000000000..1eed0947a --- /dev/null +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -0,0 +1,219 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; + +use bytes::BytesMut; +use maplit::hashmap; +use nativelink_config::stores::{MemorySpec, StoreSpec}; +use nativelink_error::Error; +use nativelink_macro::nativelink_test; +use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; +use nativelink_proto::build::bazel::remote::execution::v2::{ + digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, +}; +use nativelink_service::ac_server::AcServer; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; +use nativelink_store::store_manager::StoreManager; +use nativelink_util::common::DigestInfo; +use nativelink_util::store_trait::StoreLike; +use pretty_assertions::assert_eq; +use prost::Message; +use tonic::{Code, Request, Response, Status}; + +const INSTANCE_NAME: &str = "foo_instance_name"; +const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef"; +const HASH1_SIZE: i64 = 147; + +async fn insert_into_store( + store: Pin<&impl StoreLike>, + hash: &str, + action_size: i64, + action_result: &T, +) -> Result> { + let mut store_data = BytesMut::new(); + action_result.encode(&mut store_data)?; + let data_len = store_data.len(); + let digest = DigestInfo::try_new(hash, action_size)?; + store.update_oneshot(digest, store_data.freeze()).await?; + Ok(data_len.try_into().unwrap()) +} + +async fn make_store_manager() -> Result, Error> { + let store_manager = Arc::new(StoreManager::new()); + make_and_add_store_to_manager( + "main_cas", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + make_and_add_store_to_manager( + "main_ac", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + Ok(store_manager) +} + +fn make_ac_server(store_manager: &StoreManager) -> Result { + AcServer::new( + &hashmap! { + "foo_instance_name".to_string() => nativelink_config::cas_server::AcStoreConfig{ + ac_store: "main_ac".to_string(), + read_only: false, + } + }, + store_manager, + ) +} + +async fn get_action_result( + ac_server: &AcServer, + hash: &str, + size: i64, +) -> Result, Status> { + ac_server + .get_action_result(Request::new(GetActionResultRequest { + instance_name: INSTANCE_NAME.to_string(), + action_digest: Some(Digest { + hash: hash.to_string(), + size_bytes: size, + }), + inline_stdout: false, + inline_stderr: false, + inline_output_files: vec![], + digest_function: digest_function::Value::Sha256.into(), + })) + .await +} + +#[nativelink_test] +async fn empty_store() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + + let raw_response = get_action_result(&ac_server, HASH1, 0).await; + + let err = raw_response.unwrap_err(); + assert_eq!(err.code(), Code::NotFound); + assert!(err.message().is_empty()); + Ok(()) +} + +#[nativelink_test] +async fn has_single_item() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; + let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE).await; + + assert!( + raw_response.is_ok(), + "Expected value, got error {raw_response:?}" + ); + assert_eq!(raw_response.unwrap().into_inner(), action_result); + Ok(()) +} + +#[nativelink_test] +async fn single_item_wrong_digest_size() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + insert_into_store(ac_store.as_pin(), HASH1, HASH1_SIZE, &action_result).await?; + let raw_response = get_action_result(&ac_server, HASH1, HASH1_SIZE - 1).await; + + let err = raw_response.unwrap_err(); + assert_eq!(err.code(), Code::NotFound); + assert!(err.message().is_empty()); + Ok(()) +} + +fn get_encoded_proto_size(proto: &T) -> Result> { + let mut store_data = Vec::new(); + proto.encode(&mut store_data)?; + Ok(store_data.len()) +} + +async fn update_action_result( + ac_server: &AcServer, + digest: Digest, + action_result: ActionResult, +) -> Result, Status> { + ac_server + .update_action_result(Request::new(UpdateActionResultRequest { + instance_name: INSTANCE_NAME.to_string(), + action_digest: Some(digest), + action_result: Some(action_result), + results_cache_policy: None, + digest_function: digest_function::Value::Sha256.into(), + })) + .await +} + +#[nativelink_test] +async fn one_item_update_test() -> Result<(), Box> { + let store_manager = make_store_manager().await?; + let ac_server = make_ac_server(&store_manager)?; + let ac_store = store_manager.get_store("main_ac").unwrap(); + + let action_result = ActionResult { + exit_code: 45, + ..Default::default() + }; + + let size_bytes = get_encoded_proto_size(&action_result)? as i64; + + let raw_response = update_action_result( + &ac_server, + Digest { + hash: HASH1.to_string(), + size_bytes, + }, + action_result.clone(), + ) + .await; + + assert!( + raw_response.is_ok(), + "Expected success, got error {raw_response:?}" + ); + assert_eq!(raw_response.unwrap().into_inner(), action_result); + + let digest = DigestInfo::try_new(HASH1, size_bytes)?; + let raw_data = ac_store.get_part_unchunked(digest, 0, None).await?; + + let decoded_action_result = ActionResult::decode(raw_data)?; + assert_eq!(decoded_action_result, action_result); + Ok(()) +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..0e97f1ed2 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -42,12 +42,44 @@ use crate::verify_store::VerifyStore; type FutureMaybeStore<'a> = Box> + 'a>; -pub fn store_factory<'a>( +pub async fn make_and_add_store_to_manager<'a>( + name: &'a str, + backend: &'a StoreSpec, + store_manager: &'a Arc, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, +) -> Result<(), Error> { + match store_factory(backend, store_manager, maybe_health_registry_builder).await { + Ok(store) => match store_manager.add_store(name, store) { + Ok(_) => { + if let Some(digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&digest)?; + store_manager.config_digest_add(digest); + } + + Ok(()) + } + + Err(e) => { + return Err(e); + } + }, + + Err(e) => { + return Err(e); + } + } +} + +fn store_factory<'a>( backend: &'a StoreSpec, store_manager: &'a Arc, maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { + if let Some(backend_config_digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&backend_config_digest)?; + } + let store: Arc = match backend { StoreSpec::memory(spec) => MemoryStore::new(spec), StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index ec99b7bb1..c567d1fda 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -23,18 +23,21 @@ use parking_lot::RwLock; pub struct StoreManager { #[metric] stores: RwLock>, + store_config_anti_collision_digests: RwLock>, } impl StoreManager { pub fn new() -> StoreManager { StoreManager { stores: RwLock::new(HashMap::new()), + store_config_anti_collision_digests: RwLock::new(vec![]), } } pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { + let stores_rd = self.stores.read(); let mut stores = self.stores.write(); - match stores.contains_key(name) { + match stores_rd.contains_key(name) { true => Err(make_err!( Code::AlreadyExists, "A store with the name '{}' already exists", @@ -47,6 +50,22 @@ impl StoreManager { } } + pub fn digest_not_already_present(&self, digest: &str) -> Result<(), Error> { + let digests = self.store_config_anti_collision_digests.read(); + match digests.contains(&String::from(digest)) { + true => Err(make_err!( + Code::AlreadyExists, + "the provided config is already being used by another store" + )), + _ => Ok(()), + } + } + + pub fn config_digest_add(&self, digest: String) { + let mut digests = self.store_config_anti_collision_digests.write(); + digests.push(digest); + } + pub fn get_store(&self, name: &str) -> Option { let stores = self.stores.read(); if let Some(store) = stores.get(name) { diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 6a274eb56..5896b8d4e 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -45,7 +45,7 @@ use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; use nativelink_service::health_server::HealthServer; use nativelink_service::worker_api_server::WorkerApiServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::WorkerId; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; @@ -194,11 +194,14 @@ async fn inner_main( let health_component_name = format!("stores/{name}"); let mut health_register_store = health_registry_lock.sub_builder(&health_component_name); - let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?; - - store_manager.add_store(&name, store).err_tip(|| format!("Failed to add store to manager '{name}'"))?; + make_and_add_store_to_manager( + &name, + &store_cfg, + &store_manager, + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?; } }