Skip to content

Commit

Permalink
fix: topic index encoding (#25)
Browse files Browse the repository at this point in the history
Scru128s may include 0xFF
I was thinking of Scru128s as UTF-8 strings, but we use the direct byte encoding
  • Loading branch information
marvin-j97 authored Nov 28, 2024
1 parent 02938f6 commit cbf70a8
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ pub enum FollowOption {
WithHeartbeat(Duration),
}

// TODO: split_once is unstable as of 2024-11-28
fn split_once<'a, T, F>(slice: &'a [T], pred: F) -> Option<(&'a [T], &'a [T])>
where
F: FnMut(&T) -> bool,
{
let index = slice.iter().position(pred)?;
Some((&slice[..index], &slice[index + 1..]))
}

#[derive(Clone)]
pub struct Store {
pub path: PathBuf,
Expand Down Expand Up @@ -349,7 +358,8 @@ impl Store {

for kv in self.topic_index.prefix(prefix).rev() {
let (k, _) = kv.unwrap();
let frame_id = k.split(|&c| c == 0xFF).nth(1).unwrap();

let (_topic, frame_id) = split_once(&k, |&c| c == 0xFF).unwrap();

// Join back to "primary index"
if let Some(value) = self.frame_partition.get(frame_id).unwrap() {
Expand Down Expand Up @@ -379,7 +389,7 @@ impl Store {
};

let mut batch = self.keyspace.batch();
batch.remove(&self.frame_partition, id.to_bytes());
batch.remove(&self.frame_partition, id.as_bytes());
batch.remove(&self.topic_index, Self::topic_index_key(&frame));
batch.commit()
}
Expand Down Expand Up @@ -411,7 +421,7 @@ impl Store {
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();

let mut batch = self.keyspace.batch();
batch.insert(&self.frame_partition, frame.id.to_bytes(), encoded);
batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
batch.insert(&self.topic_index, Self::topic_index_key(&frame), b"");
batch.commit().unwrap();
self.keyspace.persist(fjall::PersistMode::SyncAll).unwrap();
Expand Down

0 comments on commit cbf70a8

Please sign in to comment.