diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index db145255e..a3d6c29eb 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -879,6 +879,15 @@ pub struct RedisStore { /// Default: 10 #[serde(default)] pub connection_timeout_s: u64, + + /// An optional prefix to prepend to all keys in this store. + /// + /// Setting this value can make it convenient to query or + /// organize your data according to the shared prefix. + /// + /// Default: (Empty String / No Prefix) + #[serde(default)] + pub key_prefix: String, } /// Retry configuration. This configuration is exponential and each iteration diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 9814f9178..3ba3ec75b 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -28,7 +28,7 @@ use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthS use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry}; use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo}; use redis::aio::{ConnectionLike, ConnectionManager}; -use redis::AsyncCommands; +use redis::{AsyncCommands, ToRedisArgs}; use crate::cas_utils::is_zero_digest; @@ -44,6 +44,11 @@ pub enum LazyConnection { pub struct RedisStore { lazy_conn: ArcCell>, temp_name_generator_fn: fn() -> String, + + /// A common prefix to append to all keys before they are sent to Redis. + /// + /// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`). + key_prefix: String, } impl RedisStore { @@ -77,9 +82,10 @@ impl RedisStore { let lazy_conn = LazyConnection::Future(conn_fut); - Ok(RedisStore::new_with_conn_and_name_generator( + Ok(RedisStore::new_with_conn_and_name_generator_and_prefix( lazy_conn, || uuid::Uuid::new_v4().to_string(), + config.key_prefix.clone(), )) } } @@ -88,10 +94,23 @@ impl RedisStore { pub fn new_with_conn_and_name_generator( lazy_conn: LazyConnection, temp_name_generator_fn: fn() -> String, + ) -> RedisStore { + RedisStore::new_with_conn_and_name_generator_and_prefix( + lazy_conn, + temp_name_generator_fn, + String::new(), + ) + } + + pub fn new_with_conn_and_name_generator_and_prefix( + lazy_conn: LazyConnection, + temp_name_generator_fn: fn() -> String, + key_prefix: String, ) -> RedisStore { RedisStore { lazy_conn: ArcCell::new(Arc::new(lazy_conn)), temp_name_generator_fn, + key_prefix, } } @@ -104,6 +123,30 @@ impl RedisStore { .set(Arc::new(LazyConnection::Connection(result.clone()))); result } + + /// Encode a [`StoreKey`] so it can be sent to Redis. + fn encode_key(&self, key: StoreKey) -> impl ToRedisArgs { + // TODO(caass): Once https://github.com/redis-rs/redis-rs/pull/1219 makes it into a release, + // this can be changed to + // ```rust + // if self.key_prefix.is_empty() { + // key.as_str() + // } else { + // let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len()); + // encoded_key.push_str(&self.key_prefix); + // encoded_key.push_str(&key_body); + // Cow::Owned(encoded_key) + // } + //``` + // to avoid an allocation + let key_body = key.as_str(); + + let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len()); + encoded_key.push_str(&self.key_prefix); + encoded_key.push_str(&key_body); + + encoded_key + } } #[async_trait] @@ -128,7 +171,7 @@ impl StoreDriver for zero_digest_indexes.push(index); } - pipe.strlen(key.as_str().as_ref()); + pipe.strlen(self.encode_key(key.borrow())); }); let digest_sizes = pipe @@ -182,7 +225,7 @@ impl StoreDriver for return Ok(()); } if force_recv { - conn.append(key.as_str().as_ref(), &chunk[..]) + conn.append(self.encode_key(key.borrow()), &chunk[..]) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::update() single chunk")?; @@ -209,7 +252,7 @@ impl StoreDriver for pipe.cmd("RENAME") .arg(temp_key.get_or_init(make_temp_name)) - .arg(key.as_str().as_ref()); + .arg(self.encode_key(key)); pipe.query_async(&mut conn) .await .map_err(from_redis_err) @@ -237,7 +280,7 @@ impl StoreDriver for let mut conn = self.get_conn().await?; if length == Some(0) { let exists = conn - .exists::<_, bool>(key.as_str().as_ref()) + .exists::<_, bool>(self.encode_key(key.borrow())) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::zero_exists")?; @@ -264,7 +307,7 @@ impl StoreDriver for let current_end = std::cmp::min(current_start.saturating_add(READ_CHUNK_SIZE), end_position) - 1; let chunk = conn - .getrange::<_, Bytes>(key.as_str().as_ref(), current_start, current_end) + .getrange::<_, Bytes>(self.encode_key(key.borrow()), current_start, current_end) .await .map_err(from_redis_err) .err_tip(|| "In RedisStore::get_part::getrange")?; diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 0918dd0b6..706ebf773 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -125,6 +125,55 @@ async fn upload_and_get_data() -> Result<(), Error> { Ok(()) } +#[nativelink_test] +async fn upload_and_get_data_with_prefix() -> Result<(), Error> { + let data = Bytes::from_static(b"14"); + let prefix = "TEST_PREFIX-"; + + let digest = DigestInfo::try_new(VALID_HASH1, 2)?; + let packed_hash_hex = format!("{prefix}{}-{}", digest.hash_str(), digest.size_bytes); + + let chunk_data = "14"; + + let redis_connection = MockRedisConnectionBuilder::new() + .pipe(&[("APPEND", &[TEMP_UUID, chunk_data], Ok(&[redis::Value::Nil]))]) + .cmd("APPEND", &[&packed_hash_hex, ""], Ok("")) + .pipe(&[( + "RENAME", + &[TEMP_UUID, &packed_hash_hex], + Ok(&[redis::Value::Nil]), + )]) + .pipe(&[( + "STRLEN", + &[&packed_hash_hex], + Ok(&[redis::Value::Bulk(vec![redis::Value::Int(2)])]), + )]) + .cmd("GETRANGE", &[&packed_hash_hex, "0", "1"], Ok("14")) + .build(); + + let store = RedisStore::new_with_conn_and_name_generator_and_prefix( + LazyConnection::Connection(Ok(redis_connection)), + mock_uuid_generator, + prefix.to_string(), + ); + + store.update_oneshot(digest, data.clone()).await?; + + let result = store.has(digest).await?; + assert!( + result.is_some(), + "Expected redis store to have hash: {VALID_HASH1}", + ); + + let result = store + .get_part_unchunked(digest, 0, Some(data.clone().len())) + .await?; + + assert_eq!(result, data, "Expected redis store to have updated value",); + + Ok(()) +} + #[nativelink_test] async fn upload_empty_data() -> Result<(), Error> { let data = Bytes::from_static(b""); @@ -149,6 +198,32 @@ async fn upload_empty_data() -> Result<(), Error> { Ok(()) } +#[nativelink_test] +async fn upload_empty_data_with_prefix() -> Result<(), Error> { + let data = Bytes::from_static(b""); + let prefix = "TEST_PREFIX-"; + + let digest = ZERO_BYTE_DIGESTS[0]; + + let redis_connection = MockRedisConnectionBuilder::new().build(); + + let store = RedisStore::new_with_conn_and_name_generator_and_prefix( + LazyConnection::Connection(Ok(redis_connection)), + mock_uuid_generator, + prefix.to_string(), + ); + + store.update_oneshot(digest, data).await?; + + let result = store.has(digest).await?; + assert!( + result.is_some(), + "Expected redis store to have hash: {VALID_HASH1}", + ); + + Ok(()) +} + #[nativelink_test] async fn test_uploading_large_data() -> Result<(), Error> { // Requires multiple chunks as data is larger than 64K