Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions modules/custom_indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ anyhow = { workspace = true }
bincode = "1"
config = { workspace = true }
fjall = "2.7.0"
futures = "0.3"
pallas = { workspace = true}
serde = { workspace = true, features = ["rc"] }
thiserror = "2.0.17"
tokio = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
hex = "0.4"

[lib]
path = "src/custom_indexer.rs"
2 changes: 2 additions & 0 deletions modules/custom_indexer/src/chain_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ pub trait ChainIndex: Send + Sync + 'static {
let _ = point;
Ok(())
}

async fn reset(&mut self, start: &Point) -> Result<Point>;
Comment on lines +19 to +20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we return a point from this? I'm not sure what it makes sense to return besides the start it was passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent here was to give implementations the option to adjust their replay point on reset. For example, an index that only needs the last N blocks could implement reset to resume from a later point than the one provided. If we don't expect any index to diverge from the provided start, then returning a point isn't needed. I can switch it to () unless we want to support this kind of behavior.

}
147 changes: 113 additions & 34 deletions modules/custom_indexer/src/cursor_store.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,158 @@
use std::{future::Future, path::Path, sync::Mutex};
use std::{collections::HashMap, future::Future, path::Path};

use acropolis_common::Point;
use anyhow::Result;
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
use tokio::sync::Mutex;
use tracing::warn;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CursorEntry {
pub tip: Point,
pub halted: bool,
}

pub trait CursorStore: Send + Sync + 'static {
fn load(&self) -> impl Future<Output = Result<Option<Point>>> + Send;
fn save(&self, point: &Point) -> impl Future<Output = Result<()>> + Send;
fn load(&self) -> impl Future<Output = Result<HashMap<String, CursorEntry>>> + Send;
fn save(
&self,
entries: &HashMap<String, CursorEntry>,
) -> impl Future<Output = Result<(), CursorSaveError>> + Send;
}

// In memory cursor store (Not persisted across runs)
pub struct InMemoryCursorStore {
cursor: Mutex<Option<Point>>,
entries: Mutex<HashMap<String, CursorEntry>>,
}
impl InMemoryCursorStore {
pub fn new(point: Point) -> Self {
pub fn new() -> Self {
Self {
cursor: Mutex::new(Some(point)),
entries: Mutex::new(HashMap::new()),
}
}
}
impl Default for InMemoryCursorStore {
fn default() -> Self {
Self::new()
}
}

impl CursorStore for InMemoryCursorStore {
async fn load(&self) -> Result<Option<Point>> {
let guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?;
Ok(guard.as_ref().cloned())
async fn load(&self) -> Result<HashMap<String, CursorEntry>> {
let guard = self.entries.lock().await;
Ok(guard.clone())
}

async fn save(&self, point: &Point) -> Result<()> {
let mut guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?;
*guard = Some(point.clone());
async fn save(&self, entries: &HashMap<String, CursorEntry>) -> Result<(), CursorSaveError> {
let mut guard = self.entries.lock().await;
*guard = entries.clone();
Ok(())
}
}

// Fjall backed cursor store (Retains last stored point)
const CURSOR_PREFIX: &str = "cursor/";

pub struct FjallCursorStore {
cursor: Partition,
partition: Partition,
}

impl FjallCursorStore {
pub fn new(path: impl AsRef<Path>, point: Point) -> Result<Self> {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let cfg = Config::new(path);
let keyspace = Keyspace::open(cfg)?;
let partition = keyspace.open_partition("cursor", PartitionCreateOptions::default())?;

// Use stored point if exists or initialize with provided point
match partition.get("cursor")? {
Some(_) => Ok(Self { cursor: partition }),
None => {
let raw = bincode::serialize(&point)?;
partition.insert("cursor", raw)?;
Ok(Self { cursor: partition })
}
}
Ok(Self { partition })
}

fn key_for(name: &str) -> String {
format!("{CURSOR_PREFIX}{name}")
}

fn name_from_key(key: &[u8]) -> Option<String> {
let s = std::str::from_utf8(key).ok()?;
s.strip_prefix(CURSOR_PREFIX).map(|n| n.to_string())
}

fn prefix_iter(
&self,
) -> impl Iterator<Item = fjall::Result<(fjall::Slice, fjall::Slice)>> + '_ {
self.partition.prefix(CURSOR_PREFIX)
}
}

impl CursorStore for FjallCursorStore {
async fn load(&self) -> Result<Option<Point>> {
let raw = self.cursor.get("cursor")?;
async fn load(&self) -> Result<HashMap<String, CursorEntry>> {
let mut out = HashMap::new();
for next in self.prefix_iter() {
let (key_bytes, val_bytes) = match next {
Ok(r) => r,
Err(e) => {
warn!("CursorStore: failed to read row: {:#}", e);
continue;
}
};

let Some(bytes) = raw else {
return Ok(None);
};
let Some(name) = Self::name_from_key(&key_bytes) else {
warn!("CursorStore: invalid or non-matching key");
continue;
};

let point: Point = bincode::deserialize(&bytes)?;
let point = match bincode::deserialize::<CursorEntry>(&val_bytes) {
Ok(p) => p,
Err(e) => {
warn!(
"CursorStore: failed to deserialize cursor for '{}': {:#}",
name, e
);
continue;
}
};
out.insert(name, point);
}

Ok(Some(point))
Ok(out)
}

async fn save(&self, point: &Point) -> Result<()> {
let raw = bincode::serialize(point)?;
async fn save(&self, entries: &HashMap<String, CursorEntry>) -> Result<(), CursorSaveError> {
let mut failed = Vec::new();

self.cursor.insert("cursor", raw)?;
for (name, entry) in entries {
let key = Self::key_for(name);

Ok(())
let val = match bincode::serialize(entry) {
Ok(v) => v,
Err(e) => {
warn!(
"CursorStore: failed to serialize cursor for '{}': {:#}",
name, e
);
failed.push(name.clone());
continue;
}
};

if let Err(e) = self.partition.insert(&key, val) {
warn!(
"CursorStore: failed to write cursor for '{}': {:#}",
name, e
);
failed.push(name.clone());
continue;
}
}

if failed.is_empty() {
Ok(())
} else {
Err(CursorSaveError { failed })
}
}
}

#[derive(Debug, thiserror::Error)]
#[error("Failed to save cursor tips for: {failed:?}")]
pub struct CursorSaveError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Could leverage thiserror here

Suggested change
pub struct CursorSaveError {
#[derive(Debug, thiserror::Error)]
#[error("Failed to save cursor tips for: {failed:?}")]
pub struct CursorSaveError {
pub failed: Vec<String>,
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this suggestion! Switched to using thiserror in 7b5d429.

pub failed: Vec<String>,
}
Loading