Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-768 Store config uniqueness enforced by StoreManager #1555

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
50 changes: 50 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,20 @@ pub enum StoreSpec {
noop(NoopSpec),
}

impl StoreSpec {
// To enforce no duplicate connection configs for a store, add it to the matcher and implement
// disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique.
pub fn disallow_duplicates_digest(&self) -> Option<String> {
match self {
Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()),
Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()),
Self::grpc(spec) => Some(spec.disallow_duplicates_digest()),
Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()),
_ => None,
}
}
}

/// Configuration for an individual shard of the store.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -514,6 +528,12 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

impl FilesystemSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.content_path, self.temp_path)
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
Expand Down Expand Up @@ -787,6 +807,14 @@ pub struct S3Spec {
pub disable_http2: bool,
}

impl S3Spec {
pub fn disallow_duplicates_digest(&self) -> String {
let key_prefix = self.key_prefix.as_deref().unwrap_or_default();

format!("{}{}{}", self.region, self.bucket, key_prefix)
}
}

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum StoreType {
Expand Down Expand Up @@ -852,6 +880,22 @@ pub struct GrpcSpec {
pub connections_per_endpoint: usize,
}

impl GrpcSpec {
// todo: could improve duplication detection to individual endpoints to disallow accidental re-use
fn disallow_duplicates_digest(&self) -> String {
format!(
"{}{}",
self.instance_name,
self.endpoints
.clone()
.into_iter()
.map(|endpoint| endpoint.address)
.collect::<Vec<String>>()
.join(",")
)
}
}

/// The possible error codes that might occur on an upstream request.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ErrorCode {
Expand Down Expand Up @@ -993,6 +1037,12 @@ pub struct RedisSpec {
pub retry: Retry,
}

impl RedisSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.addresses.clone().join(","), self.key_prefix)
}
}

#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RedisMode {
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ rust_test_suite(
"tests/bep_server_test.rs",
"tests/bytestream_server_test.rs",
"tests/cas_server_test.rs",
"tests/store_overlap_rules_test.rs",
"tests/worker_api_server_test.rs",
],
proc_macro_deps = [
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/tests/.#store_overlap_rules_test.rs
33 changes: 16 additions & 17 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest,
};
use nativelink_service::ac_server::AcServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
Expand All @@ -48,29 +48,28 @@ async fn insert_into_store<T: Message>(
let data_len = store_data.len();
let digest = DigestInfo::try_new(hash, action_size)?;
store.update_oneshot(digest, store_data.freeze()).await?;

Ok(data_len.try_into().unwrap())
}

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, what is the reason why you moved away from store_factory rather than modifying store_factory?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using store_factory to build the Store, but wrapping up in make_and_add_store_to_manager because it makes building and adding a Store to a StoreManager a single step from a public-facing perspective.

"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
store_manager.add_store(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

make_and_add_store_to_manager(
"main_ac",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bep_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{
PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId,
};
use nativelink_service::bep_server::BepServer;
use nativelink_store::default_store_factory::store_factory;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems heavy at first read but maybe it is an improvement to both the naming and the functionality. store_factory as a name says nothing about what its function is, but developers could go look at the definition. make_and_add_store_to_manager is definitely clearer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store_factory is still used but I made private and think that make_and_add_store_to_manager is better as a front-facing API because it makes and associates the Store with a StoreManager -- rather than leaving them as separate steps. The door is still open to manually build any Store instance though and call add_store on StoreManager.

use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::channel_body_for_tests::ChannelBody;
Expand All @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep";
/// Utility function to construct a [`StoreManager`]
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
BEP_STORE_NAME,
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{
QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse,
};
use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::channel_body_for_tests::ChannelBody;
use nativelink_util::common::{encode_stream_proto, DigestInfo};
Expand All @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_service::cas_server::CasServer;
use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
Expand All @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH";

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
48 changes: 48 additions & 0 deletions nativelink-service/tests/store_overlap_rules_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PERFECT!

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use nativelink_config::stores::{MemorySpec, StoreSpec};
use nativelink_error::Error;
use nativelink_macro::nativelink_test;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;

#[nativelink_test]
async fn same_datasource_disallowed_simple() -> Result<(), Error> {
let store_manager = Arc::new(StoreManager::new());
assert!(make_and_add_store_to_manager(
"main_cas",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_ok());

assert!(make_and_add_store_to_manager(
"main_ac",
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await
.is_ok());

let existing_cas = store_manager.get_store("main_cas").unwrap();
store_manager.add_store("different_store", existing_cas)?;

Ok(())
}
19 changes: 18 additions & 1 deletion nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,24 @@ use crate::verify_store::VerifyStore;

type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Store, Error>> + 'a>;

pub fn store_factory<'a>(
pub async fn make_and_add_store_to_manager<'a>(
name: &'a str,
backend: &'a StoreSpec,
store_manager: &'a Arc<StoreManager>,
maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>,
) -> Result<(), Error> {
if let Some(digest) = backend.disallow_duplicates_digest() {
store_manager.digest_not_already_present(&digest)?;
store_manager.config_digest_add(digest);
}

let store = store_factory(backend, store_manager, maybe_health_registry_builder).await?;
store_manager.add_store(name, store)?;

Ok(())
}

fn store_factory<'a>(
backend: &'a StoreSpec,
store_manager: &'a Arc<StoreManager>,
maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>,
Expand Down
44 changes: 42 additions & 2 deletions nativelink-store/src/store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use std::collections::HashMap;
use std::ptr;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this.. will remove


use nativelink_error::{make_err, Code, Error};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::store_trait::Store;
use parking_lot::RwLock;
Expand All @@ -22,18 +24,56 @@ use parking_lot::RwLock;
pub struct StoreManager {
#[metric]
stores: RwLock<HashMap<String, Store>>,
store_config_anti_collision_digests: RwLock<Vec<String>>,
}

impl StoreManager {
pub fn new() -> StoreManager {
StoreManager {
stores: RwLock::new(HashMap::new()),
store_config_anti_collision_digests: RwLock::new(vec![]),
}
}

pub fn add_store(&self, name: &str, store: Store) {
pub fn add_store(&self, name: &str, store: Store) -> Result<(), Error> {
let mut stores = self.stores.write();
stores.insert(name.to_string(), store);

if stores.contains_key(name) {
return Err(make_err!(
Code::AlreadyExists,
"a store with the name '{}' already exists",
name
));
}

for existing_store in stores.values().into_iter() {
if ptr::eq(&store, existing_store) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove this check. Thanks to rust's memory model and the fact that in this design, ownership of the struct instance is passed to the StoreManager on line 58. We don't have Store instance pointer being passed around with a risk of passing the same pointer N times. We have a single instance of which ownership is transferred exactly once in guaranteed fashion (thanks Rust).

So instance protection is no longer a goal or needed. But I do think the config overlap detection is valuable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Rust is an underrated comment.

return Err(make_err!(
Code::AlreadyExists,
"an instance of this store is already managed"
));
}
}

stores.insert(name.into(), store);

Ok(())
}

pub fn digest_not_already_present(&self, digest: &str) -> Result<(), Error> {
let digests = self.store_config_anti_collision_digests.read();
match digests.contains(&String::from(digest)) {
true => Err(make_err!(
Code::AlreadyExists,
"the provided config is already being used by another store"
)),
_ => Ok(()),
}
}

pub fn config_digest_add(&self, digest: String) {
let mut digests = self.store_config_anti_collision_digests.write();
digests.push(digest);
}

pub fn get_store(&self, name: &str) -> Option<Store> {
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ async fn test_redis_fingerprint_metric() -> Result<(), Error> {
))
};

store_manager.add_store("redis_store", store);
store_manager.add_store("redis_store", store).unwrap();
};

let root_metrics = Arc::new(RwLock::new(RootMetricsTest {
Expand Down
Loading
Loading