Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config option to prefix keys in Redis stores #981

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,15 @@ pub struct RedisStore {
/// Default: 10
#[serde(default)]
pub connection_timeout_s: u64,

/// An optional prefix to prepend to all keys in this store.
///
/// Setting this value can make it convenient to query or
/// organize your data according to the shared prefix.
///
/// Default: (Empty String / No Prefix)
#[serde(default)]
pub key_prefix: String,
}

/// Retry configuration. This configuration is exponential and each iteration
Expand Down
57 changes: 50 additions & 7 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthS
use nativelink_util::metrics_utils::{Collector, CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
use redis::aio::{ConnectionLike, ConnectionManager};
use redis::AsyncCommands;
use redis::{AsyncCommands, ToRedisArgs};

use crate::cas_utils::is_zero_digest;

Expand All @@ -44,6 +44,11 @@ pub enum LazyConnection<T: ConnectionLike + Unpin + Clone + Send + Sync> {
pub struct RedisStore<T: ConnectionLike + Unpin + Clone + Send + Sync = ConnectionManager> {
lazy_conn: ArcCell<LazyConnection<T>>,
temp_name_generator_fn: fn() -> String,

/// A common prefix to append to all keys before they are sent to Redis.
///
/// See [`RedisStore::key_prefix`](`nativelink_config::stores::RedisStore::key_prefix`).
key_prefix: String,
}

impl RedisStore {
Expand Down Expand Up @@ -77,9 +82,10 @@ impl RedisStore {

let lazy_conn = LazyConnection::Future(conn_fut);

Ok(RedisStore::new_with_conn_and_name_generator(
Ok(RedisStore::new_with_conn_and_name_generator_and_prefix(
lazy_conn,
|| uuid::Uuid::new_v4().to_string(),
config.key_prefix.clone(),
))
}
}
Expand All @@ -88,10 +94,23 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync> RedisStore<T> {
pub fn new_with_conn_and_name_generator(
lazy_conn: LazyConnection<T>,
temp_name_generator_fn: fn() -> String,
) -> RedisStore<T> {
RedisStore::new_with_conn_and_name_generator_and_prefix(
lazy_conn,
temp_name_generator_fn,
String::new(),
)
}

pub fn new_with_conn_and_name_generator_and_prefix(
lazy_conn: LazyConnection<T>,
temp_name_generator_fn: fn() -> String,
key_prefix: String,
) -> RedisStore<T> {
RedisStore {
lazy_conn: ArcCell::new(Arc::new(lazy_conn)),
temp_name_generator_fn,
key_prefix,
}
}

Expand All @@ -104,6 +123,30 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync> RedisStore<T> {
.set(Arc::new(LazyConnection::Connection(result.clone())));
result
}

/// Encode a [`StoreKey`] so it can be sent to Redis.
fn encode_key(&self, key: StoreKey) -> impl ToRedisArgs {
// TODO(caass): Once https://github.com/redis-rs/redis-rs/pull/1219 makes it into a release,
// this can be changed to
// ```rust
// if self.key_prefix.is_empty() {
// key.as_str()
// } else {
// let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len());
// encoded_key.push_str(&self.key_prefix);
// encoded_key.push_str(&key_body);
// Cow::Owned(encoded_key)
// }
//```
// to avoid an allocation
let key_body = key.as_str();

let mut encoded_key = String::with_capacity(self.key_prefix.len() + key_body.len());
encoded_key.push_str(&self.key_prefix);
encoded_key.push_str(&key_body);

encoded_key
}
}

#[async_trait]
Expand All @@ -128,7 +171,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> StoreDriver for
zero_digest_indexes.push(index);
}

pipe.strlen(key.as_str().as_ref());
pipe.strlen(self.encode_key(key.borrow()));
});

let digest_sizes = pipe
Expand Down Expand Up @@ -182,7 +225,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> StoreDriver for
return Ok(());
}
if force_recv {
conn.append(key.as_str().as_ref(), &chunk[..])
conn.append(self.encode_key(key.borrow()), &chunk[..])
.await
.map_err(from_redis_err)
.err_tip(|| "In RedisStore::update() single chunk")?;
Expand All @@ -209,7 +252,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> StoreDriver for

pipe.cmd("RENAME")
.arg(temp_key.get_or_init(make_temp_name))
.arg(key.as_str().as_ref());
.arg(self.encode_key(key));
pipe.query_async(&mut conn)
.await
.map_err(from_redis_err)
Expand Down Expand Up @@ -237,7 +280,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> StoreDriver for
let mut conn = self.get_conn().await?;
if length == Some(0) {
let exists = conn
.exists::<_, bool>(key.as_str().as_ref())
.exists::<_, bool>(self.encode_key(key.borrow()))
.await
.map_err(from_redis_err)
.err_tip(|| "In RedisStore::get_part::zero_exists")?;
Expand All @@ -264,7 +307,7 @@ impl<T: ConnectionLike + Unpin + Clone + Send + Sync + 'static> StoreDriver for
let current_end =
std::cmp::min(current_start.saturating_add(READ_CHUNK_SIZE), end_position) - 1;
let chunk = conn
.getrange::<_, Bytes>(key.as_str().as_ref(), current_start, current_end)
.getrange::<_, Bytes>(self.encode_key(key.borrow()), current_start, current_end)
.await
.map_err(from_redis_err)
.err_tip(|| "In RedisStore::get_part::getrange")?;
Expand Down
75 changes: 75 additions & 0 deletions nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,55 @@ async fn upload_and_get_data() -> Result<(), Error> {
Ok(())
}

#[nativelink_test]
async fn upload_and_get_data_with_prefix() -> Result<(), Error> {
let data = Bytes::from_static(b"14");
let prefix = "TEST_PREFIX-";

let digest = DigestInfo::try_new(VALID_HASH1, 2)?;
let packed_hash_hex = format!("{prefix}{}-{}", digest.hash_str(), digest.size_bytes);

let chunk_data = "14";

let redis_connection = MockRedisConnectionBuilder::new()
.pipe(&[("APPEND", &[TEMP_UUID, chunk_data], Ok(&[redis::Value::Nil]))])
.cmd("APPEND", &[&packed_hash_hex, ""], Ok(""))
.pipe(&[(
"RENAME",
&[TEMP_UUID, &packed_hash_hex],
Ok(&[redis::Value::Nil]),
)])
.pipe(&[(
"STRLEN",
&[&packed_hash_hex],
Ok(&[redis::Value::Bulk(vec![redis::Value::Int(2)])]),
)])
.cmd("GETRANGE", &[&packed_hash_hex, "0", "1"], Ok("14"))
.build();

let store = RedisStore::new_with_conn_and_name_generator_and_prefix(
LazyConnection::Connection(Ok(redis_connection)),
mock_uuid_generator,
prefix.to_string(),
);

store.update_oneshot(digest, data.clone()).await?;

let result = store.has(digest).await?;
assert!(
result.is_some(),
"Expected redis store to have hash: {VALID_HASH1}",
);

let result = store
.get_part_unchunked(digest, 0, Some(data.clone().len()))
.await?;

assert_eq!(result, data, "Expected redis store to have updated value",);

Ok(())
}

#[nativelink_test]
async fn upload_empty_data() -> Result<(), Error> {
let data = Bytes::from_static(b"");
Expand All @@ -149,6 +198,32 @@ async fn upload_empty_data() -> Result<(), Error> {
Ok(())
}

#[nativelink_test]
async fn upload_empty_data_with_prefix() -> Result<(), Error> {
let data = Bytes::from_static(b"");
let prefix = "TEST_PREFIX-";

let digest = ZERO_BYTE_DIGESTS[0];

let redis_connection = MockRedisConnectionBuilder::new().build();

let store = RedisStore::new_with_conn_and_name_generator_and_prefix(
LazyConnection::Connection(Ok(redis_connection)),
mock_uuid_generator,
prefix.to_string(),
);

store.update_oneshot(digest, data).await?;

let result = store.has(digest).await?;
assert!(
result.is_some(),
"Expected redis store to have hash: {VALID_HASH1}",
);

Ok(())
}

#[nativelink_test]
async fn test_uploading_large_data() -> Result<(), Error> {
// Requires multiple chunks as data is larger than 64K
Expand Down
Loading