Skip to content

Commit

Permalink
store flow improvements / anti-collision
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeen committed Dec 29, 2024
1 parent 324df36 commit 9f67019
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 52 deletions.
57 changes: 57 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,21 @@ pub enum StoreSpec {
noop(NoopSpec),
}

impl StoreSpec {
// To enforce no duplicate connection configs for a store, add it to the matcher and implement
// disallow_duplicates_digest() on it. Returns `None` for stores that are not being enforced unique.
pub fn disallow_duplicates_digest(&self) -> Option<String> {
match self {
Self::memory(spec) => Some(spec.disallow_duplicates_digest()),
Self::experimental_s3_store(spec) => Some(spec.disallow_duplicates_digest()),
Self::filesystem(spec) => Some(spec.disallow_duplicates_digest()),
Self::grpc(spec) => Some(spec.disallow_duplicates_digest()),
Self::redis_store(spec) => Some(spec.disallow_duplicates_digest()),
_ => None,
}
}
}

/// Configuration for an individual shard of the store.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -514,6 +529,12 @@ pub struct FilesystemSpec {
pub block_size: u64,
}

impl FilesystemSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.content_path, self.temp_path)
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct FastSlowSpec {
Expand All @@ -535,6 +556,12 @@ pub struct MemorySpec {
pub eviction_policy: Option<EvictionPolicy>,
}

impl MemorySpec {
pub fn disallow_duplicates_digest(&self) -> String {
"InMemoryStore".into()
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct DedupSpec {
Expand Down Expand Up @@ -787,6 +814,14 @@ pub struct S3Spec {
pub disable_http2: bool,
}

impl S3Spec {
pub fn disallow_duplicates_digest(&self) -> String {
let key_prefix = self.key_prefix.as_deref().unwrap_or_default();

format!("{}{}{}", self.region, self.bucket, key_prefix)
}
}

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum StoreType {
Expand Down Expand Up @@ -852,6 +887,22 @@ pub struct GrpcSpec {
pub connections_per_endpoint: usize,
}

impl GrpcSpec {
// todo: could improve duplication detection to individual endpoints to disallow accidental re-use
fn disallow_duplicates_digest(&self) -> String {
format!(
"{}{}",
self.instance_name,
self.endpoints
.clone()
.into_iter()
.map(|endpoint| endpoint.address)
.collect::<Vec<String>>()
.join(",")
)
}
}

/// The possible error codes that might occur on an upstream request.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum ErrorCode {
Expand Down Expand Up @@ -993,6 +1044,12 @@ pub struct RedisSpec {
pub retry: Retry,
}

impl RedisSpec {
fn disallow_duplicates_digest(&self) -> String {
format!("{}{}", self.addresses.clone().join(","), self.key_prefix)
}
}

#[derive(Debug, Default, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum RedisMode {
Expand Down
1 change: 1 addition & 0 deletions nativelink-service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rust_test_suite(
"tests/bytestream_server_test.rs",
"tests/cas_server_test.rs",
"tests/worker_api_server_test.rs",
"tests/store_overlap_rules_test.rs",
],
proc_macro_deps = [
"//nativelink-macro",
Expand Down
32 changes: 15 additions & 17 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
digest_function, ActionResult, Digest, GetActionResultRequest, UpdateActionResultRequest,
};
use nativelink_service::ac_server::AcServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::StoreLike;
Expand Down Expand Up @@ -53,24 +53,22 @@ async fn insert_into_store<T: Message>(

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
store_manager.add_store(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

make_and_add_store_to_manager(
"main_ac",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bep_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use nativelink_proto::google::devtools::build::v1::{
PublishBuildToolEventStreamRequest, PublishLifecycleEventRequest, StreamId,
};
use nativelink_service::bep_server::BepServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::channel_body_for_tests::ChannelBody;
Expand All @@ -52,15 +52,14 @@ const BEP_STORE_NAME: &str = "main_bep";
/// Utility function to construct a [`StoreManager`]
async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
BEP_STORE_NAME,
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use nativelink_proto::google::bytestream::{
QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, WriteRequest, WriteResponse,
};
use nativelink_service::bytestream_server::ByteStreamServer;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::channel_body_for_tests::ChannelBody;
use nativelink_util::common::{encode_stream_proto, DigestInfo};
Expand All @@ -58,15 +58,14 @@ const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789a

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
17 changes: 8 additions & 9 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_proto::google::rpc::Status as GrpcStatus;
use nativelink_service::cas_server::CasServer;
use nativelink_store::ac_utils::serialize_and_upload_message;
use nativelink_store::default_store_factory::store_factory;
use nativelink_store::default_store_factory::make_and_add_store_to_manager;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
Expand All @@ -47,15 +47,14 @@ const BAD_HASH: &str = "BAD_HASH";

async fn make_store_manager() -> Result<Arc<StoreManager>, Error> {
let store_manager = Arc::new(StoreManager::new());
store_manager.add_store(
make_and_add_store_to_manager(
"main_cas",
store_factory(
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?,
);
&StoreSpec::memory(MemorySpec::default()),
&store_manager,
None,
)
.await?;

Ok(store_manager)
}

Expand Down
Loading

0 comments on commit 9f67019

Please sign in to comment.