Skip to content

Commit 2790453

Browse files
authored
fix(storage): Correctly encoding FileStore keys (#4539)
Signed-off-by: Graham King <grahamk@nvidia.com>
1 parent f2769d8 commit 2790453

File tree

12 files changed

+89
-68
lines changed

12 files changed

+89
-68
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/llm/src/discovery/model_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ impl ModelManager {
315315
.get_or_create_bucket(KV_ROUTERS_ROOT_PATH, None)
316316
.await?;
317317
let router_uuid = uuid::Uuid::new_v4();
318-
let router_key = Key::from_raw(format!("{}/{router_uuid}", endpoint.path()));
318+
let router_key = Key::new(format!("{}/{router_uuid}", endpoint.path()));
319319
let json_router_config = serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?;
320320
router_bucket
321321
.insert(&router_key, json_router_config.into(), 0)

lib/llm/src/kv_router/subscriber.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -485,11 +485,11 @@ async fn cleanup_orphaned_consumers(
485485
.iter()
486486
.filter_map(|(key, _)| {
487487
// Check if key contains this component's path
488-
if !key.contains(&component_path) {
488+
if !key.as_ref().contains(&component_path) {
489489
return None;
490490
}
491491
// Extract the last part (should be the UUID)
492-
key.split('/').next_back().map(str::to_string)
492+
key.as_ref().split('/').next_back().map(str::to_string)
493493
})
494494
.collect();
495495

lib/runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ nid = { version = "3.0.0", features = ["serde"] }
7979
nix = { version = "0.29", features = ["signal"] }
8080
nuid = { version = "0.5" }
8181
once_cell = { version = "1" }
82+
percent-encoding = { version = "2.3.2" } # also used by tonic, reqwest, axum, etc
8283
rayon = { version = "1.10" }
8384
regex = { version = "1" }
8485
socket2 = { version = "0.5.8" }

lib/runtime/examples/Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/src/discovery/kv_store.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl Discovery for KVStoreDiscovery {
184184
key_path
185185
);
186186
let bucket = self.store.get_or_create_bucket(bucket_name, None).await?;
187-
let key = crate::storage::key_value_store::Key::from_raw(key_path.clone());
187+
let key = crate::storage::key_value_store::Key::new(key_path.clone());
188188

189189
tracing::debug!(
190190
"KVStoreDiscovery::register: Inserting into bucket={}, key={}",
@@ -251,7 +251,7 @@ impl Discovery for KVStoreDiscovery {
251251
return Ok(());
252252
};
253253

254-
let key = crate::storage::key_value_store::Key::from_raw(key_path.clone());
254+
let key = crate::storage::key_value_store::Key::new(key_path.clone());
255255

256256
// Delete the entry from the bucket
257257
bucket.delete(&key).await?;
@@ -277,12 +277,12 @@ impl Discovery for KVStoreDiscovery {
277277

278278
// Filter by prefix and deserialize
279279
let mut instances = Vec::new();
280-
for (key_str, value) in entries {
281-
if Self::matches_prefix(&key_str, &prefix, bucket_name) {
280+
for (key, value) in entries {
281+
if Self::matches_prefix(key.as_ref(), &prefix, bucket_name) {
282282
match Self::parse_instance(&value) {
283283
Ok(instance) => instances.push(instance),
284284
Err(e) => {
285-
tracing::warn!(key = %key_str, error = %e, "Failed to parse discovery instance");
285+
tracing::warn!(%key, error = %e, "Failed to parse discovery instance");
286286
}
287287
}
288288
}

lib/runtime/src/storage/key_value_store.rs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! Interface to a traditional key-value store such as etcd.
55
//! "key_value_store" spelt out because in AI land "KV" means something else.
66
7+
use std::borrow::Cow;
78
use std::pin::Pin;
89
use std::str::FromStr;
910
use std::sync::Arc;
@@ -12,10 +13,10 @@ use std::{collections::HashMap, path::PathBuf};
1213
use std::{env, fmt};
1314

1415
use crate::CancellationToken;
15-
use crate::slug::Slug;
1616
use crate::transports::etcd as etcd_transport;
1717
use async_trait::async_trait;
1818
use futures::StreamExt;
19+
use percent_encoding::{NON_ALPHANUMERIC, percent_decode_str, percent_encode};
1920
use serde::{Deserialize, Serialize};
2021

2122
mod mem;
@@ -29,27 +30,32 @@ pub use file::FileStore;
2930

3031
const WATCH_SEND_TIMEOUT: Duration = Duration::from_millis(100);
3132

32-
/// A key that is safe to use directly in the KV store.
33-
///
34-
/// TODO: Need to re-think this. etcd uses slash separators, so we often use from_raw
35-
/// to avoid the slug. But other impl's, particularly file, need a real slug.
36-
#[derive(Debug, Clone, PartialEq)]
33+
/// String we use as the Key in a key-value storage operation. Simple String wrapper
34+
/// that can encode / decode a string.
35+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3736
pub struct Key(String);
3837

3938
impl Key {
40-
pub fn new(s: &str) -> Key {
41-
Key(Slug::slugify(s).to_string())
39+
pub fn new(s: String) -> Key {
40+
Key(s)
4241
}
4342

44-
/// Create a Key without changing the string, it is assumed already KV store safe.
45-
pub fn from_raw(s: String) -> Key {
46-
Key(s)
43+
/// Takes a URL-safe percent-encoded string and creates a Key from it by decoding first.
44+
/// dynamo%2Fbackend%2Fgenerate%2F17216e63492ef21f becomes dynamo/backend/generate/17216e63492ef21f
45+
pub fn from_url_safe(s: &str) -> Key {
46+
Key(percent_decode_str(s).decode_utf8_lossy().to_string())
47+
}
48+
49+
/// A URL-safe percent-encoded representation of this key.
50+
/// e.g. dynamo/backend/generate/17216e63492ef21f becomes dynamo%2Fbackend%2Fgenerate%2F17216e63492ef21f
51+
pub fn url_safe(&self) -> Cow<'_, str> {
52+
percent_encode(self.0.as_bytes(), NON_ALPHANUMERIC).into()
4753
}
4854
}
4955

5056
impl From<&str> for Key {
5157
fn from(s: &str) -> Key {
52-
Key::new(s)
58+
Key::new(s.to_string())
5359
}
5460
}
5561

@@ -73,21 +79,21 @@ impl From<&Key> for String {
7379

7480
#[derive(Debug, Clone, PartialEq)]
7581
pub struct KeyValue {
76-
key: String,
82+
key: Key,
7783
value: bytes::Bytes,
7884
}
7985

8086
impl KeyValue {
81-
pub fn new(key: String, value: bytes::Bytes) -> Self {
87+
pub fn new(key: Key, value: bytes::Bytes) -> Self {
8288
KeyValue { key, value }
8389
}
8490

8591
pub fn key(&self) -> String {
86-
self.key.clone()
92+
self.key.clone().to_string()
8793
}
8894

8995
pub fn key_str(&self) -> &str {
90-
&self.key
96+
self.key.as_ref()
9197
}
9298

9399
pub fn value(&self) -> &[u8] {
@@ -394,6 +400,7 @@ impl KeyValueStoreManager {
394400
pub trait KeyValueBucket: Send + Sync {
395401
/// A bucket is a collection of key/value pairs.
396402
/// Insert a value into a bucket, if it doesn't exist already
403+
/// The Key should be the name of the item, not including the bucket name.
397404
async fn insert(
398405
&self,
399406
key: &Key,
@@ -402,9 +409,11 @@ pub trait KeyValueBucket: Send + Sync {
402409
) -> Result<StoreOutcome, StoreError>;
403410

404411
/// Fetch an item from the key-value storage
412+
/// The Key should be the name of the item, not including the bucket name.
405413
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError>;
406414

407415
/// Delete an item from the bucket
416+
/// The Key should be the name of the item, not including the bucket name.
408417
async fn delete(&self, key: &Key) -> Result<(), StoreError>;
409418

410419
/// A stream of items inserted into the bucket.
@@ -414,7 +423,10 @@ pub trait KeyValueBucket: Send + Sync {
414423
&self,
415424
) -> Result<Pin<Box<dyn futures::Stream<Item = WatchEvent> + Send + '_>>, StoreError>;
416425

417-
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError>;
426+
/// The entries in this bucket.
427+
/// The Key includes the full path including the bucket name.
428+
/// That means you cannot directory get a Key from `entries` and pass it to `get` or `delete`.
429+
async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError>;
418430
}
419431

420432
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
@@ -527,7 +539,7 @@ mod tests {
527539
let mut expected = Vec::with_capacity(3);
528540
for i in 1..=3 {
529541
let item = WatchEvent::Put(KeyValue::new(
530-
format!("test{i}"),
542+
Key::new(format!("test{i}")),
531543
format!("value{i}").into(),
532544
));
533545
expected.push(item);
@@ -596,7 +608,7 @@ mod tests {
596608
let mut rx1 = tap.subscribe();
597609
let mut rx2 = tap.subscribe();
598610

599-
let item = WatchEvent::Put(KeyValue::new("test1".to_string(), "GK".into()));
611+
let item = WatchEvent::Put(KeyValue::new(Key::new("test1".to_string()), "GK".into()));
600612
let item_clone = item.clone();
601613
let handle1 = tokio::spawn(async move {
602614
let b = rx1.recv().await.unwrap();

lib/runtime/src/storage/key_value_store/etcd.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl KeyValueBucket for EtcdBucket {
126126
etcd::WatchEvent::Put(kv) => {
127127
let (k, v) = kv.into_key_value();
128128
let key = match String::from_utf8(k) {
129-
Ok(k) => k,
129+
Ok(k) => Key::new(k),
130130
Err(err) => {
131131
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
132132
continue;
@@ -138,21 +138,21 @@ impl KeyValueBucket for EtcdBucket {
138138
etcd::WatchEvent::Delete(kv) => {
139139
let (k, _) = kv.into_key_value();
140140
let key = match String::from_utf8(k) {
141-
Ok(k) => k,
141+
Ok(k) => Key::new(k),
142142
Err(err) => {
143143
tracing::error!(%err, prefix, "Invalid UTF8 in etcd key");
144144
continue;
145145
}
146146
};
147-
yield WatchEvent::Delete(Key::from_raw(key));
147+
yield WatchEvent::Delete(key);
148148
}
149149
}
150150
}
151151
};
152152
Ok(Box::pin(output))
153153
}
154154

155-
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError> {
155+
async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
156156
let k = make_key(&self.bucket_name, &"".into());
157157
tracing::trace!("etcd entries: {k}");
158158

@@ -161,11 +161,11 @@ impl KeyValueBucket for EtcdBucket {
161161
.kv_get_prefix(k)
162162
.await
163163
.map_err(|e| StoreError::EtcdError(e.to_string()))?;
164-
let out: HashMap<String, bytes::Bytes> = resp
164+
let out: HashMap<Key, bytes::Bytes> = resp
165165
.into_iter()
166166
.map(|kv| {
167167
let (k, v) = kv.into_key_value();
168-
(String::from_utf8_lossy(&k).to_string(), v.into())
168+
(Key::new(String::from_utf8_lossy(&k).to_string()), v.into())
169169
})
170170
.collect();
171171

@@ -287,7 +287,7 @@ mod concurrent_create_tests {
287287
let barrier = Arc::new(Barrier::new(num_workers));
288288

289289
// Shared test data
290-
let test_key: Key = Key::new(&format!("concurrent_test_key_{}", uuid::Uuid::new_v4()));
290+
let test_key: Key = Key::new(format!("concurrent_test_key_{}", uuid::Uuid::new_v4()));
291291
let test_value = "test_value";
292292

293293
// Spawn multiple tasks that will all try to create the same key simultaneously

lib/runtime/src/storage/key_value_store/file.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ impl KeyValueBucket for Directory {
286286
value: bytes::Bytes,
287287
_revision: u64, // Not used. Maybe put in file name?
288288
) -> Result<StoreOutcome, StoreError> {
289-
let safe_key = Key::new(key.as_ref()); // because of from_raw
289+
let safe_key = key.url_safe();
290290
let full_path = self.p.join(safe_key.as_ref());
291291
self.owned_files.lock().insert(full_path.clone());
292292
let str_path = full_path.display().to_string();
@@ -298,7 +298,7 @@ impl KeyValueBucket for Directory {
298298

299299
/// Read a file from the directory
300300
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError> {
301-
let safe_key = Key::new(key.as_ref()); // because of from_raw
301+
let safe_key = key.url_safe();
302302
let full_path = self.p.join(safe_key.as_ref());
303303
if !full_path.exists() {
304304
return Ok(None);
@@ -313,7 +313,7 @@ impl KeyValueBucket for Directory {
313313

314314
/// Delete a file from the directory
315315
async fn delete(&self, key: &Key) -> Result<(), StoreError> {
316-
let safe_key = Key::new(key.as_ref()); // because of from_raw
316+
let safe_key = key.url_safe();
317317
let full_path = self.p.join(safe_key.as_ref());
318318
let str_path = full_path.display().to_string();
319319
if !full_path.exists() {
@@ -374,7 +374,7 @@ impl KeyValueBucket for Directory {
374374
let canonical_item_path = item_path.canonicalize().unwrap_or_else(|_| item_path.clone());
375375

376376
let key = match canonical_item_path.strip_prefix(&root) {
377-
Ok(stripped) => stripped.display().to_string().replace("_", "/"),
377+
Ok(stripped) => Key::from_url_safe(&stripped.display().to_string()),
378378
Err(err) => {
379379
// Possibly this should be a panic.
380380
// A key cannot be outside the file store root.
@@ -400,7 +400,7 @@ impl KeyValueBucket for Directory {
400400
yield WatchEvent::Put(item);
401401
}
402402
EventKind::Remove(event::RemoveKind::File) => {
403-
yield WatchEvent::Delete(Key::from_raw(key));
403+
yield WatchEvent::Delete(key);
404404
}
405405
_ => {
406406
// These happen every time the keep-alive updates last modified time
@@ -412,7 +412,7 @@ impl KeyValueBucket for Directory {
412412
}))
413413
}
414414

415-
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError> {
415+
async fn entries(&self) -> Result<HashMap<Key, bytes::Bytes>, StoreError> {
416416
let contents = fs::read_dir(&self.p)
417417
.with_context(|| self.p.display().to_string())
418418
.map_err(a_to_fs_err)?;
@@ -437,7 +437,7 @@ impl KeyValueBucket for Directory {
437437
};
438438

439439
let key = match canonical_entry_path.strip_prefix(&self.root) {
440-
Ok(p) => p.to_string_lossy().to_string().replace("_", "/"),
440+
Ok(p) => Key::from_url_safe(&p.to_string_lossy()),
441441
Err(err) => {
442442
tracing::error!(
443443
error = %err,
@@ -482,17 +482,17 @@ mod tests {
482482
let m = FileStore::new(t.path());
483483
let bucket = m.get_or_create_bucket("v1/tests", None).await.unwrap();
484484
let _ = bucket
485-
.insert(&Key::new("key1/multi/part"), "value1".into(), 0)
485+
.insert(&Key::new("key1/multi/part".to_string()), "value1".into(), 0)
486486
.await
487487
.unwrap();
488488
let _ = bucket
489-
.insert(&Key::new("key2"), "value2".into(), 0)
489+
.insert(&Key::new("key2".to_string()), "value2".into(), 0)
490490
.await
491491
.unwrap();
492492
let entries = bucket.entries().await.unwrap();
493-
let keys: HashSet<String> = entries.into_keys().collect();
493+
let keys: HashSet<Key> = entries.into_keys().collect();
494494

495-
assert!(keys.contains("v1/tests/key1/multi/part"));
496-
assert!(keys.contains("v1/tests/key2"));
495+
assert!(keys.contains(&Key::new("v1/tests/key1/multi/part".to_string())));
496+
assert!(keys.contains(&Key::new("v1/tests/key2".to_string())));
497497
}
498498
}

0 commit comments

Comments
 (0)