diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 7e6e31821..c6293a387 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -432,6 +432,20 @@ pub enum StoreSpec { noop(NoopSpec), } +impl StoreSpec { + // To enforce no duplicate connection configs for a store, add it to the matcher and implement + // disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique. + pub fn disallow_duplicates_digest(&self) -> Option { + match self { + Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()), + Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()), + Self::grpc(spec) => Some(spec.disallow_duplicates_digest()), + Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()), + _ => None, + } + } +} + /// Configuration for an individual shard of the store. #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] @@ -514,6 +528,12 @@ pub struct FilesystemSpec { pub block_size: u64, } +impl FilesystemSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.content_path, self.temp_path) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct FastSlowSpec { @@ -787,6 +807,14 @@ pub struct S3Spec { pub disable_http2: bool, } +impl S3Spec { + pub fn disallow_duplicates_digest(&self) -> String { + let key_prefix = self.key_prefix.as_deref().unwrap_or_default(); + + format!("{}{}{}", self.region, self.bucket, key_prefix) + } +} + #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum StoreType { @@ -852,6 +880,22 @@ pub struct GrpcSpec { pub connections_per_endpoint: usize, } +impl GrpcSpec { + // todo: could improve duplication detection to individual endpoints to disallow accidental re-use + fn disallow_duplicates_digest(&self) -> String { + format!( + "{}{}", + self.instance_name, + self.endpoints + .clone() + .into_iter() + .map(|endpoint| endpoint.address) + .collect::>() + .join(",") + ) + } +} + /// The possible error codes that might occur on an upstream request. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum ErrorCode { @@ -993,6 +1037,12 @@ pub struct RedisSpec { pub retry: Retry, } +impl RedisSpec { + fn disallow_duplicates_digest(&self) -> String { + format!("{}{}", self.addresses.clone().join(","), self.key_prefix) + } +} + #[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum RedisMode { diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 99374b8bd..92209cf17 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -52,6 +52,7 @@ rust_test_suite( "tests/bep_server_test.rs", "tests/bytestream_server_test.rs", "tests/cas_server_test.rs", + "tests/store_overlap_rules_test.rs", "tests/worker_api_server_test.rs", ], proc_macro_deps = [ diff --git a/nativelink-service/tests/.#store_overlap_rules_test.rs b/nativelink-service/tests/.#store_overlap_rules_test.rs new file mode 120000 index 000000000..7e7432617 --- /dev/null +++ b/nativelink-service/tests/.#store_overlap_rules_test.rs @@ -0,0 +1 @@ +mkeen@lab.245673:1735391239 \ No newline at end of file diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index ac7bb9b80..918a10afb 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest, }; use nativelink_service::ac_server::AcServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::StoreLike; @@ -48,29 +48,28 @@ async fn insert_into_store( let data_len = store_data.len(); let digest = DigestInfo::try_new(hash, action_size)?; store.update_oneshot(digest, store_data.freeze()).await?; + Ok(data_len.try_into().unwrap()) } async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); - store_manager.add_store( + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + + make_and_add_store_to_manager( "main_ac", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index fe120a549..befe01054 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{ PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId, }; use nativelink_service::bep_server::BepServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::channel_body_for_tests::ChannelBody; @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep"; /// Utility function to construct a [`StoreManager`] async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( BEP_STORE_NAME, - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index 97154e804..991bef6b5 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{ QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse, }; use nativelink_service::bytestream_server::ByteStreamServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::channel_body_for_tests::ChannelBody; use nativelink_util::common::{encode_stream_proto, DigestInfo}; @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index a9b6ffe34..642f7054e 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ use nativelink_proto::google::rpc::Status as GrpcStatus; use nativelink_service::cas_server::CasServer; use nativelink_store::ac_utils::serialize_and_upload_message; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH"; async fn make_store_manager() -> Result, Error> { let store_manager = Arc::new(StoreManager::new()); - store_manager.add_store( + make_and_add_store_to_manager( "main_cas", - store_factory( - &StoreSpec::memory(MemorySpec::default()), - &store_manager, - None, - ) - .await?, - ); + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await?; + Ok(store_manager) } diff --git a/nativelink-service/tests/store_overlap_rules_test.rs b/nativelink-service/tests/store_overlap_rules_test.rs new file mode 100644 index 000000000..385fea763 --- /dev/null +++ b/nativelink-service/tests/store_overlap_rules_test.rs @@ -0,0 +1,48 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use nativelink_config::stores::{MemorySpec, StoreSpec}; +use nativelink_error::Error; +use nativelink_macro::nativelink_test; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; +use nativelink_store::store_manager::StoreManager; + +#[nativelink_test] +async fn same_datasource_disallowed_simple() -> Result<(), Error> { + let store_manager = Arc::new(StoreManager::new()); + assert!(make_and_add_store_to_manager( + "main_cas", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await + .is_ok()); + + assert!(make_and_add_store_to_manager( + "main_ac", + &StoreSpec::memory(MemorySpec::default()), + &store_manager, + None, + ) + .await + .is_ok()); + + let existing_cas = store_manager.get_store("main_cas").unwrap(); + store_manager.add_store("different_store", existing_cas)?; + + Ok(()) +} diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 506ef6752..db1d89e91 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -42,7 +42,24 @@ use crate::verify_store::VerifyStore; type FutureMaybeStore<'a> = Box> + 'a>; -pub fn store_factory<'a>( +pub async fn make_and_add_store_to_manager<'a>( + name: &'a str, + backend: &'a StoreSpec, + store_manager: &'a Arc, + maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, +) -> Result<(), Error> { + if let Some(digest) = backend.disallow_duplicates_digest() { + store_manager.digest_not_already_present(&digest)?; + store_manager.config_digest_add(digest); + } + + let store = store_factory(backend, store_manager, maybe_health_registry_builder).await?; + store_manager.add_store(name, store)?; + + Ok(()) +} + +fn store_factory<'a>( backend: &'a StoreSpec, store_manager: &'a Arc, maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, diff --git a/nativelink-store/src/store_manager.rs b/nativelink-store/src/store_manager.rs index 32efda709..2ef5c5f96 100644 --- a/nativelink-store/src/store_manager.rs +++ b/nativelink-store/src/store_manager.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::ptr; +use nativelink_error::{make_err, Code, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::store_trait::Store; use parking_lot::RwLock; @@ -22,18 +24,56 @@ use parking_lot::RwLock; pub struct StoreManager { #[metric] stores: RwLock>, + store_config_anti_collision_digests: RwLock>, } impl StoreManager { pub fn new() -> StoreManager { StoreManager { stores: RwLock::new(HashMap::new()), + store_config_anti_collision_digests: RwLock::new(vec![]), } } - pub fn add_store(&self, name: &str, store: Store) { + pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> { let mut stores = self.stores.write(); - stores.insert(name.to_string(), store); + + if stores.contains_key(name) { + return Err(make_err!( + Code::AlreadyExists, + "a store with the name '{}' already exists", + name + )); + } + + for existing_store in stores.values().into_iter() { + if ptr::eq(&store, existing_store) { + return Err(make_err!( + Code::AlreadyExists, + "an instance of this store is already managed" + )); + } + } + + stores.insert(name.into(), store); + + Ok(()) + } + + pub fn digest_not_already_present(&self, digest: &str) -> Result<(), Error> { + let digests = self.store_config_anti_collision_digests.read(); + match digests.contains(&String::from(digest)) { + true => Err(make_err!( + Code::AlreadyExists, + "the provided config is already being used by another store" + )), + _ => Ok(()), + } + } + + pub fn config_digest_add(&self, digest: String) { + let mut digests = self.store_config_anti_collision_digests.write(); + digests.push(digest); } pub fn get_store(&self, name: &str) -> Option { diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 48f1cbdfa..2b2d71c85 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -883,7 +883,7 @@ async fn test_redis_fingerprint_metric() -> Result<(), Error> { )) }; - store_manager.add_store("redis_store", store); + store_manager.add_store("redis_store", store).unwrap(); }; let root_metrics = Arc::new(RwLock::new(RootMetricsTest { diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index 18b1463a0..a26752a0d 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -15,9 +15,10 @@ use std::ptr::from_ref; use std::sync::Arc; -use nativelink_config::stores::{MemorySpec, RefSpec}; +use nativelink_config::stores::{MemorySpec, RefSpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::memory_store::MemoryStore; use nativelink_store::ref_store::RefStore; use nativelink_store::store_manager::StoreManager; @@ -27,19 +28,24 @@ use pretty_assertions::assert_eq; const VALID_HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef"; -fn setup_stores() -> (Arc, Store, Store) { +async fn setup_stores() -> (Arc, Store, Store) { let store_manager = Arc::new(StoreManager::new()); - let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); - store_manager.add_store("foo", memory_store.clone()); + let memory_store_spec = StoreSpec::memory(MemorySpec::default()); + + make_and_add_store_to_manager("foo", &memory_store_spec, &store_manager, None) + .await + .unwrap(); + + let ref_store_spec = StoreSpec::ref_store(RefSpec { name: "foo".into() }); + + make_and_add_store_to_manager("bar", &ref_store_spec, &store_manager, None) + .await + .unwrap(); + + let memory_store = store_manager.get_store("foo").unwrap(); + let ref_store = store_manager.get_store("bar").unwrap(); - let ref_store = Store::new(RefStore::new( - &RefSpec { - name: "foo".to_string(), - }, - Arc::downgrade(&store_manager), - )); - store_manager.add_store("bar", ref_store.clone()); (store_manager, memory_store, ref_store) } @@ -47,7 +53,7 @@ fn setup_stores() -> (Arc, Store, Store) { async fn has_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into memory store. @@ -77,7 +83,7 @@ async fn has_test() -> Result<(), Error> { async fn get_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into memory store. @@ -108,7 +114,7 @@ async fn get_test() -> Result<(), Error> { async fn update_test() -> Result<(), Error> { const VALUE1: &str = "13"; - let (_store_manager, memory_store, ref_store) = setup_stores(); + let (_store_manager, memory_store, ref_store) = setup_stores().await; { // Insert data into ref_store. @@ -140,7 +146,7 @@ async fn inner_store_test() -> Result<(), Error> { let store_manager = Arc::new(StoreManager::new()); let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); - store_manager.add_store("mem_store", memory_store.clone()); + store_manager.add_store("mem_store", memory_store.clone())?; let ref_store_inner = Store::new(RefStore::new( &RefSpec { @@ -148,7 +154,7 @@ async fn inner_store_test() -> Result<(), Error> { }, Arc::downgrade(&store_manager), )); - store_manager.add_store("ref_store_inner", ref_store_inner.clone()); + store_manager.add_store("ref_store_inner", ref_store_inner.clone())?; let ref_store_outer = Store::new(RefStore::new( &RefSpec { @@ -156,7 +162,7 @@ async fn inner_store_test() -> Result<(), Error> { }, Arc::downgrade(&store_manager), )); - store_manager.add_store("ref_store_outer", ref_store_outer.clone()); + store_manager.add_store("ref_store_outer", ref_store_outer.clone())?; // Ensure the result of inner_store() points to exact same memory store. assert_eq!( diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index c41a74072..81f657741 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -12,11 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. + + + use std::collections::{HashMap, HashSet}; + + + use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + + + use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; @@ -45,7 +55,7 @@ use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; use nativelink_service::health_server::HealthServer; use nativelink_service::worker_api_server::WorkerApiServer; -use nativelink_store::default_store_factory::store_factory; +use nativelink_store::default_store_factory::make_and_add_store_to_manager; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::WorkerId; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; @@ -194,10 +204,14 @@ async fn inner_main( let health_component_name = format!("stores/{name}"); let mut health_register_store = health_registry_lock.sub_builder(&health_component_name); - let store = store_factory(&store_cfg, &store_manager, Some(&mut health_register_store)) - .await - .err_tip(|| format!("Failed to create store '{name}'"))?; - store_manager.add_store(&name, store); + make_and_add_store_to_manager( + &name, + &store_cfg, + &store_manager, + Some(&mut health_register_store), + ) + .await + .err_tip(|| format!("Failed to create store '{name}'"))?; } }