diff --git a/Cargo.lock b/Cargo.lock index dee8c9ce..024099ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,8 +206,11 @@ dependencies = [ "caryatid_sdk", "config", "fjall", + "futures", + "hex", "pallas 0.33.0", "serde", + "thiserror 2.0.17", "tokio", "tracing", ] diff --git a/modules/custom_indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml index 29bcdc4a..dfed2df0 100644 --- a/modules/custom_indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -17,10 +17,15 @@ anyhow = { workspace = true } bincode = "1" config = { workspace = true } fjall = "2.7.0" +futures = "0.3" pallas = { workspace = true} serde = { workspace = true, features = ["rc"] } +thiserror = "2.0.17" tokio = { workspace = true } tracing = { workspace = true } +[dev-dependencies] +hex = "0.4" + [lib] path = "src/custom_indexer.rs" diff --git a/modules/custom_indexer/src/chain_index.rs b/modules/custom_indexer/src/chain_index.rs index f77bd697..3bc151d5 100644 --- a/modules/custom_indexer/src/chain_index.rs +++ b/modules/custom_indexer/src/chain_index.rs @@ -16,4 +16,6 @@ pub trait ChainIndex: Send + Sync + 'static { let _ = point; Ok(()) } + + async fn reset(&mut self, start: &Point) -> Result; } diff --git a/modules/custom_indexer/src/cursor_store.rs b/modules/custom_indexer/src/cursor_store.rs index 1f0b7b93..f1db929e 100644 --- a/modules/custom_indexer/src/cursor_store.rs +++ b/modules/custom_indexer/src/cursor_store.rs @@ -1,79 +1,158 @@ -use std::{future::Future, path::Path, sync::Mutex}; +use std::{collections::HashMap, future::Future, path::Path}; use acropolis_common::Point; use anyhow::Result; use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; +use tokio::sync::Mutex; +use tracing::warn; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CursorEntry { + pub tip: Point, + pub halted: bool, +} pub trait CursorStore: Send + Sync + 'static { - fn load(&self) -> impl Future>> + Send; - fn save(&self, point: &Point) -> impl Future> + Send; + fn load(&self) -> impl Future>> + Send; + fn save( + &self, + entries: &HashMap, + ) -> impl Future> + Send; } // In memory cursor store (Not persisted across runs) pub struct InMemoryCursorStore { - cursor: Mutex>, + entries: Mutex>, } impl InMemoryCursorStore { - pub fn new(point: Point) -> Self { + pub fn new() -> Self { Self { - cursor: Mutex::new(Some(point)), + entries: Mutex::new(HashMap::new()), } } } +impl Default for InMemoryCursorStore { + fn default() -> Self { + Self::new() + } +} + impl CursorStore for InMemoryCursorStore { - async fn load(&self) -> Result> { - let guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; - Ok(guard.as_ref().cloned()) + async fn load(&self) -> Result> { + let guard = self.entries.lock().await; + Ok(guard.clone()) } - async fn save(&self, point: &Point) -> Result<()> { - let mut guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; - *guard = Some(point.clone()); + async fn save(&self, entries: &HashMap) -> Result<(), CursorSaveError> { + let mut guard = self.entries.lock().await; + *guard = entries.clone(); Ok(()) } } // Fjall backed cursor store (Retains last stored point) +const CURSOR_PREFIX: &str = "cursor/"; + pub struct FjallCursorStore { - cursor: Partition, + partition: Partition, } impl FjallCursorStore { - pub fn new(path: impl AsRef, point: Point) -> Result { + pub fn new(path: impl AsRef) -> Result { let cfg = Config::new(path); let keyspace = Keyspace::open(cfg)?; let partition = keyspace.open_partition("cursor", PartitionCreateOptions::default())?; - // Use stored point if exists or initialize with provided point - match partition.get("cursor")? { - Some(_) => Ok(Self { cursor: partition }), - None => { - let raw = bincode::serialize(&point)?; - partition.insert("cursor", raw)?; - Ok(Self { cursor: partition }) - } - } + Ok(Self { partition }) + } + + fn key_for(name: &str) -> String { + format!("{CURSOR_PREFIX}{name}") + } + + fn name_from_key(key: &[u8]) -> Option { + let s = std::str::from_utf8(key).ok()?; + s.strip_prefix(CURSOR_PREFIX).map(|n| n.to_string()) + } + + fn prefix_iter( + &self, + ) -> impl Iterator> + '_ { + self.partition.prefix(CURSOR_PREFIX) } } impl CursorStore for FjallCursorStore { - async fn load(&self) -> Result> { - let raw = self.cursor.get("cursor")?; + async fn load(&self) -> Result> { + let mut out = HashMap::new(); + for next in self.prefix_iter() { + let (key_bytes, val_bytes) = match next { + Ok(r) => r, + Err(e) => { + warn!("CursorStore: failed to read row: {:#}", e); + continue; + } + }; - let Some(bytes) = raw else { - return Ok(None); - }; + let Some(name) = Self::name_from_key(&key_bytes) else { + warn!("CursorStore: invalid or non-matching key"); + continue; + }; - let point: Point = bincode::deserialize(&bytes)?; + let point = match bincode::deserialize::(&val_bytes) { + Ok(p) => p, + Err(e) => { + warn!( + "CursorStore: failed to deserialize cursor for '{}': {:#}", + name, e + ); + continue; + } + }; + out.insert(name, point); + } - Ok(Some(point)) + Ok(out) } - async fn save(&self, point: &Point) -> Result<()> { - let raw = bincode::serialize(point)?; + async fn save(&self, entries: &HashMap) -> Result<(), CursorSaveError> { + let mut failed = Vec::new(); - self.cursor.insert("cursor", raw)?; + for (name, entry) in entries { + let key = Self::key_for(name); - Ok(()) + let val = match bincode::serialize(entry) { + Ok(v) => v, + Err(e) => { + warn!( + "CursorStore: failed to serialize cursor for '{}': {:#}", + name, e + ); + failed.push(name.clone()); + continue; + } + }; + + if let Err(e) = self.partition.insert(&key, val) { + warn!( + "CursorStore: failed to write cursor for '{}': {:#}", + name, e + ); + failed.push(name.clone()); + continue; + } + } + + if failed.is_empty() { + Ok(()) + } else { + Err(CursorSaveError { failed }) + } } } + +#[derive(Debug, thiserror::Error)] +#[error("Failed to save cursor tips for: {failed:?}")] +pub struct CursorSaveError { + pub failed: Vec, +} diff --git a/modules/custom_indexer/src/custom_indexer.rs b/modules/custom_indexer/src/custom_indexer.rs index 172b043c..80431cea 100644 --- a/modules/custom_indexer/src/custom_indexer.rs +++ b/modules/custom_indexer/src/custom_indexer.rs @@ -7,47 +7,138 @@ pub mod chain_index; mod configuration; pub mod cursor_store; +mod index_actor; +mod utils; -use std::sync::Arc; -use tokio::sync::Mutex; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{mpsc, Mutex}; use anyhow::Result; use config::Config; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use caryatid_sdk::{async_trait, Context, Module}; -use pallas::ledger::traverse::MultiEraTx; use acropolis_common::{ - commands::chain_sync::ChainSyncCommand, - messages::{CardanoMessage, Command, Message, StateTransitionMessage}, + messages::{CardanoMessage, Message, StateTransitionMessage}, Point, }; use crate::{ - chain_index::ChainIndex, configuration::CustomIndexerConfig, cursor_store::CursorStore, + chain_index::ChainIndex, + configuration::CustomIndexerConfig, + cursor_store::{CursorEntry, CursorStore}, + index_actor::{index_actor, IndexCommand, IndexResult}, + utils::{change_sync_point, send_rollback_to_indexers, send_txs_to_indexers}, }; -pub struct CustomIndexer { - index: Arc>, - cursor_store: Arc>, - tip: Arc>, +type IndexSenders = HashMap>; +type SharedSenders = Arc>; +type IndexResponse = ( + String, + Result, +); + +struct IndexWrapper { + index: Box, + tip: Point, + default_start: Point, + halted: bool, +} + +pub struct CustomIndexer { + senders: SharedSenders, + cursor_store: Arc, } -impl CustomIndexer { - pub fn new(index: I, cursor_store: CS, start: Point) -> Self { +impl CustomIndexer { + pub fn new(cursor_store: CS) -> Self { Self { - index: Arc::new(Mutex::new(index)), - cursor_store: Arc::new(Mutex::new(cursor_store)), - tip: Arc::new(Mutex::new(start)), + senders: Arc::new(Mutex::new(HashMap::new())), + cursor_store: Arc::new(cursor_store), + } + } + + pub async fn add_index( + &self, + mut index: I, + default_start: Point, + force_restart: bool, + ) -> Result<()> { + let name = index.name(); + + let mut indexes = self.senders.lock().await; + if indexes.contains_key(&name) { + warn!("CustomIndexer: index '{name}' already exists, skipping add_index"); + return Ok(()); + } + + let mut cursors = self.cursor_store.load().await?; + + let mut entry = cursors.get(&name).cloned().unwrap_or(CursorEntry { + tip: default_start.clone(), + halted: false, + }); + + if force_restart || entry.halted { + index.reset(&default_start).await?; + entry.tip = default_start.clone(); + entry.halted = true; + } + + cursors.insert(name.clone(), entry.clone()); + self.cursor_store.save(&cursors).await?; + + let wrapper = IndexWrapper { + index: Box::new(index), + tip: entry.tip.clone(), + default_start, + halted: false, + }; + + let (tx, rx) = mpsc::channel(32); + tokio::spawn(index_actor(wrapper, rx)); + indexes.insert(name.clone(), tx); + + Ok(()) + } + + async fn compute_start_point(&self) -> Result { + let saved_tips = self.cursor_store.load().await?; + let index_names: Vec = { + let senders = self.senders.lock().await; + senders.keys().cloned().collect() + }; + + let mut min_point = None; + for index in index_names { + let index_entry = saved_tips + .get(&index) + .unwrap_or(&CursorEntry { + tip: Point::Origin, + halted: false, + }) + .clone(); + min_point = match min_point { + None => Some(index_entry.tip), + Some(current) => { + if index_entry.tip.slot() < current.slot() { + Some(index_entry.tip) + } else { + Some(current) + } + } + }; } + Ok(min_point.unwrap_or(Point::Origin)) } } #[async_trait] -impl Module for CustomIndexer +impl Module for CustomIndexer where - I: ChainIndex, CS: CursorStore, { fn get_name(&self) -> &'static str { @@ -63,29 +154,15 @@ where let mut subscription = context.subscribe(&cfg.txs_subscribe_topic).await?; let run_context = context.clone(); - // Retrieve tip from cursor store with fallback to initial sync point - let start_point = { - let saved = self.cursor_store.lock().await.load().await?; - let mut tip_guard = self.tip.lock().await; - let start_point = saved.unwrap_or_else(|| tip_guard.clone()); - *tip_guard = start_point.clone(); - start_point - }; - - let index = Arc::clone(&self.index); + let senders = Arc::clone(&self.senders); let cursor_store = Arc::clone(&self.cursor_store); - let tip = Arc::clone(&self.tip); + + // Get the lowest tip from added indexes to determine where chain sync should begin + let start_point = self.compute_start_point().await?; context.run(async move { // Publish initial sync point - let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect( - start_point, - ))); - run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; - info!( - "Publishing initial sync command on {}", - cfg.sync_command_publisher_topic - ); + change_sync_point(start_point, run_context.clone(), &cfg.sync_command_publisher_topic).await?; // Forward received txs and rollback notifications to index handlers loop { @@ -93,35 +170,21 @@ where Ok((_, message)) => { match message.as_ref() { Message::Cardano((block, CardanoMessage::ReceivedTxs(txs_msg))) => { - // Call handle_onchain_tx on the index for all decoded txs - let mut idx = index.lock().await; - for (tx_index, raw_tx) in txs_msg.txs.iter().enumerate() { - match MultiEraTx::decode(raw_tx) { - Ok(tx) => { - if let Err(e) = idx.handle_onchain_tx(block, &tx).await - { - warn!( - "Failed to index tx {} in block {}: {e:#}", - tx_index, block.number - ); - } - } - Err(_) => { - warn!( - "Failed to decode tx {} in block {}", - tx_index, block.number - ); - } - } - } + let txs: Vec> = txs_msg + .txs + .iter() + .map(|tx| Arc::<[u8]>::from(tx.as_slice())) + .collect(); + + // Send txs to all index tasks + let responses = send_txs_to_indexers(&senders, block, &txs).await; + + // Get responses with new tips and any halts that occured + let new_entries = process_tx_responses(responses, block.slot).await; + + // Save the new entries to the cursor store + cursor_store.save(&new_entries).await?; - // Update and save tip - let new_tip = Point::Specific { - hash: block.hash, - slot: block.slot, - }; - *tip.lock().await = new_tip.clone(); - cursor_store.lock().await.save(&new_tip).await?; } Message::Cardano(( @@ -130,22 +193,33 @@ where point, )), )) => { - // Call handle rollback on index - { - let mut idx = index.lock().await; - if let Err(e) = idx.handle_rollback(point).await { - error!("Failed to handle rollback at {:?}: {e:#}", point); - return Err(e); - } - } + // Inform indexes of a rollback + let responses = send_rollback_to_indexers(&senders, point).await; + + // Get responses with new tips and any indexes which failed to rollback and could not be reset + let (new_tips, to_remove) = process_rollback_responses( + responses, + run_context.clone(), + &cfg.sync_command_publisher_topic, + ).await?; + + // Save the new entries to the cursor store + cursor_store.save(&new_tips).await?; + + // Remove any indexes which were unable to rollback or reset successfully + if !to_remove.is_empty() { + let mut guard = senders.lock().await; - // Rollback tip and save - { - *tip.lock().await = point.clone(); + for name in &to_remove { + if guard.remove(name).is_some() { + warn!("Removed sender for '{name}' due to fatal reset error"); + } else { + warn!("Tried to remove sender for '{name}' but it wasn't present"); + } + } } - cursor_store.lock().await.save(point).await?; } - _ => (), + _ => error!("Unexpected message type: {message:?}"), } } Err(e) => { @@ -161,3 +235,75 @@ where Ok(()) } } + +async fn process_tx_responses + Send>( + mut results: FuturesUnordered, + block_slot: u64, +) -> HashMap { + let mut new_tips = HashMap::new(); + + while let Some((name, result)) = results.next().await { + match result { + Ok(IndexResult::Success { entry }) => { + new_tips.insert(name, entry); + } + Ok(IndexResult::DecodeError { entry, reason }) => { + error!( + "Failed to decode tx at slot {} for index '{}': {}", + block_slot, name, reason + ); + new_tips.insert(name, entry); + } + Ok(IndexResult::HandleError { entry, reason }) => { + error!( + "Failed to handle tx at slot {} for index '{}': {}", + block_slot, name, reason + ); + new_tips.insert(name, entry); + } + Ok(IndexResult::Halted) => { + warn!("Index '{}' is halted", name); + } + Err(_) => { + error!("Actor for index '{}' dropped unexpectedly", name); + } + _ => error!("Unexpected index result type: {result:?}"), + } + } + + new_tips +} + +async fn process_rollback_responses + Send>( + mut results: FuturesUnordered, + run_context: Arc>, + sync_topic: &str, +) -> Result<(HashMap, Vec)> { + let mut new_tips = HashMap::new(); + let mut to_remove = Vec::new(); + + while let Some((name, result)) = results.next().await { + match result { + Ok(IndexResult::Success { entry }) => { + new_tips.insert(name, entry); + } + Ok(IndexResult::Reset { entry }) => { + // Update tip and publish sync command to start fetching blocks from this point + new_tips.insert(name.clone(), entry.clone()); + change_sync_point(entry.tip, run_context.clone(), &sync_topic.to_string()).await?; + } + Ok(IndexResult::FatalResetError { entry, reason }) => { + // Update tip and add index for removal from senders list + new_tips.insert(name.clone(), entry); + to_remove.push(name.clone()); + error!("{name} failed to reset, halting and retrying next run: {reason}"); + } + Err(_) => { + error!("Actor for {name} index dropped"); + } + _ => error!("Unexpected index result type: {result:?}"), + } + } + + Ok((new_tips, to_remove)) +} diff --git a/modules/custom_indexer/src/index_actor.rs b/modules/custom_indexer/src/index_actor.rs new file mode 100644 index 00000000..6f7177d9 --- /dev/null +++ b/modules/custom_indexer/src/index_actor.rs @@ -0,0 +1,466 @@ +use std::sync::Arc; + +use acropolis_common::{BlockInfo, Point}; +use tokio::sync::{mpsc, oneshot}; + +use pallas::ledger::traverse::MultiEraTx; + +use crate::{cursor_store::CursorEntry, IndexWrapper}; + +pub enum IndexCommand { + ApplyTxs { + block: BlockInfo, + txs: Vec>, + response_tx: oneshot::Sender, + }, + Rollback { + point: Point, + response_tx: oneshot::Sender, + }, +} + +#[derive(Debug)] +pub enum IndexResult { + Success { entry: CursorEntry }, + DecodeError { entry: CursorEntry, reason: String }, + HandleError { entry: CursorEntry, reason: String }, + Reset { entry: CursorEntry }, + Halted, + FatalResetError { entry: CursorEntry, reason: String }, +} + +pub async fn index_actor(mut wrapper: IndexWrapper, mut rx: mpsc::Receiver) { + while let Some(cmd) = rx.recv().await { + match cmd { + IndexCommand::ApplyTxs { + block, + txs, + response_tx, + } => { + let result = handle_apply_txs(&mut wrapper, block, txs).await; + let _ = response_tx.send(result); + } + + IndexCommand::Rollback { point, response_tx } => { + let result = handle_rollback(&mut wrapper, point).await; + let _ = response_tx.send(result); + } + } + } +} + +async fn handle_apply_txs( + wrapper: &mut IndexWrapper, + block: acropolis_common::BlockInfo, + txs: Vec>, +) -> IndexResult { + // If the index is halted early return and continue waiting for a rollback event + if wrapper.halted { + return IndexResult::Halted; + } + + // If the index has a tip greater than the current set of transactions early return and continue waiting for chainsync to catch up + if block.slot <= wrapper.tip.slot() { + return IndexResult::Success { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: false, + }, + }; + } + + // Decode the transactions and call handle_onchain_tx for each, halting if decode or the handler return an error + for raw in txs { + let decoded = match MultiEraTx::decode(raw.as_ref()) { + Ok(tx) => tx, + Err(e) => { + wrapper.halted = true; + return IndexResult::DecodeError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), + }; + } + }; + if let Err(e) = wrapper.index.handle_onchain_tx(&block, &decoded).await { + wrapper.halted = true; + return IndexResult::HandleError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), + }; + } + } + + // Update index tip and return success + wrapper.tip = Point::Specific { + hash: block.hash, + slot: block.slot, + }; + IndexResult::Success { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: false, + }, + } +} + +async fn handle_rollback(wrapper: &mut IndexWrapper, point: Point) -> IndexResult { + match wrapper.index.handle_rollback(&point).await { + Ok(_) => { + // If the rollback is successful, remove the halt (if any), update the tip, and return success + wrapper.halted = false; + wrapper.tip = point.clone(); + IndexResult::Success { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: false, + }, + } + } + // If the rollback failed, attempt to reset the index + Err(_) => match wrapper.index.reset(&wrapper.default_start).await { + // If reset successful, remove the halt (if any), update the tip and return reset so the manager can send a FindIntersect command + Ok(point) => { + wrapper.tip = point; + wrapper.halted = false; + IndexResult::Reset { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: false, + }, + } + } + // If the reset fails, return a fatal error to remove the index from the manager (On next run the index will attempt to reset again) + Err(e) => IndexResult::FatalResetError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), + }, + }, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use acropolis_common::{BlockHash, BlockInfo, BlockIntent, BlockStatus, Era, Point}; + use caryatid_sdk::async_trait; + use pallas::ledger::traverse::MultiEraTx; + use tokio::sync::{mpsc, oneshot}; + + use crate::{ + chain_index::ChainIndex, + cursor_store::InMemoryCursorStore, + index_actor::{IndexCommand, IndexResult}, + CustomIndexer, + }; + + #[derive(Default)] + pub struct MockIndex { + pub on_tx: Option anyhow::Result<()> + Send + Sync>>, + pub on_rollback: Option anyhow::Result<()> + Send + Sync>>, + pub on_reset: Option anyhow::Result + Send + Sync>>, + } + + #[async_trait] + impl ChainIndex for MockIndex { + fn name(&self) -> String { + "mock-index".into() + } + + async fn handle_onchain_tx( + &mut self, + _info: &BlockInfo, + _tx: &MultiEraTx<'_>, + ) -> anyhow::Result<()> { + if let Some(f) = &self.on_tx { + f() + } else { + Ok(()) + } + } + + async fn handle_rollback(&mut self, _point: &Point) -> anyhow::Result<()> { + if let Some(f) = &self.on_rollback { + f() + } else { + Ok(()) + } + } + async fn reset(&mut self, start: &Point) -> anyhow::Result { + if let Some(f) = &self.on_reset { + f() + } else { + Ok(start.clone()) + } + } + } + + fn test_block(slot: u64) -> BlockInfo { + BlockInfo { + status: BlockStatus::Volatile, + intent: BlockIntent::none(), + slot, + number: 1, + hash: BlockHash::default(), + epoch: 0, + epoch_slot: 0, + new_epoch: false, + tip_slot: None, + timestamp: 0, + era: Era::Conway, + } + } + + fn valid_tx() -> Arc<[u8]> { + let raw_tx = hex::decode( + "84a600d9010281825820565573dcde964aa30e7e307531ee6c6f8e47279dcbade4b4301e9ef291b6791601018282583901b786e57fa44f9707d023719c60b712a3ebbaf89a932ee87ea4de39ce65f459f57e462edc82d90225fac6162f4757c226ad50a7adf230e4c81b0000000ac336383982583901b786e57fa44f9707d023719c60b712a3ebbaf89a932ee87ea4de39ce65f459f57e462edc82d90225fac6162f4757c226ad50a7adf230e4c81a004c4b40021a0002aac1031a0a0d7b1705a1581de165f459f57e462edc82d90225fac6162f4757c226ad50a7adf230e4c81a42fa31010801a100d9010282825820ed67aef668355b2f6220aeb7b5118adeb31b7cf0de7d9a4bb4ea0aac7bdfea5a58406718e1a35b9fae1c91d0ca08b90c0270bcd0e98b9df2b826b0ea6b9742b93631e0f2c43d098a9a8fdd58f1ba44c649d397ca32bd207a9d3fa784611694184904825820086b567b1b34bd97e1a79c46533ed4e771e170848a50983297605f1d7fe6acb8584040fe7d3108c4eaca8484ef9590a52214dae09af501aa84cba4f093c590acdd2c9c15977fc381c0224306567e775d2c7e62a65319fcf504657221e7648411bd0af5f6" + ).unwrap(); + Arc::from(raw_tx.as_slice()) + } + + async fn setup_indexer( + mock: MockIndex, + ) -> ( + Arc>, + mpsc::Sender, + ) { + let cursor_store = InMemoryCursorStore::new(); + let indexer = Arc::new(CustomIndexer::new(cursor_store)); + + indexer.add_index(mock, Point::Origin, false).await.expect("add_index failed"); + + let sender = { + let senders = indexer.senders.lock().await; + senders.get("mock-index").expect("index not registered").clone() + }; + + (indexer, sender) + } + + async fn send_apply( + sender: &mpsc::Sender, + block: BlockInfo, + txs: Vec>, + ) -> IndexResult { + let (tx, rx) = oneshot::channel(); + sender + .send(IndexCommand::ApplyTxs { + block, + txs, + response_tx: tx, + }) + .await + .expect("actor dropped"); + rx.await.expect("oneshot dropped") + } + + async fn send_rollback(sender: &mpsc::Sender, point: Point) -> IndexResult { + let (tx, rx) = oneshot::channel(); + sender + .send(IndexCommand::Rollback { + point, + response_tx: tx, + }) + .await + .expect("actor dropped"); + rx.await.expect("oneshot dropped") + } + + #[tokio::test] + async fn apply_txs_handle_error_sets_halt() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Err(anyhow::anyhow!("handle error response")))), + ..Default::default() + }; + + let (_indexer, sender) = setup_indexer(mock).await; + let (resp_tx, resp_rx) = oneshot::channel(); + + sender + .send(IndexCommand::ApplyTxs { + block: test_block(1), + txs: vec![valid_tx()], + response_tx: resp_tx, + }) + .await + .expect("actor dropped unexpectedly"); + + let result = resp_rx.await.expect("oneshot dropped"); + + match result { + IndexResult::HandleError { entry, reason } => { + assert!(entry.halted); + assert!(reason.contains("handle error response")); + } + other => panic!("Expected HandleError, got {:?}", other), + } + } + + #[tokio::test] + async fn apply_txs_decode_error_sets_halt() { + let mock = MockIndex { + on_tx: None, + ..Default::default() + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(1), vec![Arc::from([0u8; 1].as_slice())]).await { + IndexResult::DecodeError { entry, reason } => { + assert!(entry.halted); + assert!(!reason.is_empty()); + } + other => panic!("Expected DecodeError, got {:?}", other), + } + } + + #[tokio::test] + async fn apply_txs_skips_when_halted() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Err(anyhow::anyhow!("handle error response")))), + ..Default::default() + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(1), vec![valid_tx()]).await { + IndexResult::HandleError { entry, .. } => { + assert!(entry.halted); + } + other => panic!("Expected HandleError on first call, got {:?}", other), + } + + match send_apply(&sender, test_block(2), vec![valid_tx()]).await { + IndexResult::Halted => {} + other => panic!("Expected Halted, got {:?}", other), + } + } + + #[tokio::test] + async fn apply_txs_updates_tip_on_success() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Ok(()))), + ..Default::default() + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(50), vec![valid_tx()]).await { + IndexResult::Success { entry } => { + assert_eq!(entry.tip.slot(), 50); + assert!(!entry.halted, "index should not be halted on success"); + } + other => panic!("Expected Success, got {:?}", other), + } + } + + #[tokio::test] + async fn rollback_updates_tip_and_clears_halt_on_success() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Err(anyhow::anyhow!("boom")))), + on_rollback: Some(Box::new(|| Ok(()))), + ..Default::default() + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(1), vec![valid_tx()]).await { + IndexResult::HandleError { entry, .. } => assert!(entry.halted), + other => panic!("Expected HandleError, got {:?}", other), + } + + let rollback_point = Point::Specific { + hash: [9u8; 32].into(), + slot: 12345, + }; + + match send_rollback(&sender, rollback_point.clone()).await { + IndexResult::Success { entry } => { + assert_eq!(entry.tip, rollback_point); + assert!(!entry.halted); + } + other => panic!("Expected Success, got {:?}", other), + } + } + + #[tokio::test] + async fn rollback_fails_then_reset_succeeds_clears_halt_and_updates_tip() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Err(anyhow::anyhow!("fail tx")))), + on_rollback: Some(Box::new(|| Err(anyhow::anyhow!("rollback failed")))), + on_reset: Some(Box::new(|| { + Ok(Point::Specific { + hash: [3u8; 32].into(), + slot: 123, + }) + })), + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(1), vec![valid_tx()]).await { + IndexResult::HandleError { entry, .. } => assert!(entry.halted), + other => panic!("Expected HandleError, got {:?}", other), + } + + match send_rollback( + &sender, + Point::Specific { + hash: [7u8; 32].into(), + slot: 123, + }, + ) + .await + { + IndexResult::Reset { entry } => { + assert_eq!(entry.tip.slot(), 123); + assert!(!entry.halted); + } + other => panic!("Expected Reset, got {:?}", other), + } + } + + #[tokio::test] + async fn rollback_fails_then_reset_fails_halts() { + let mock = MockIndex { + on_tx: Some(Box::new(|| Err(anyhow::anyhow!("tx boom")))), + on_rollback: Some(Box::new(|| Err(anyhow::anyhow!("rollback boom")))), + on_reset: Some(Box::new(|| Err(anyhow::anyhow!("reset boom")))), + }; + + let (_indexer, sender) = setup_indexer(mock).await; + + match send_apply(&sender, test_block(1), vec![valid_tx()]).await { + IndexResult::HandleError { entry, .. } => assert!(entry.halted), + other => panic!("Expected HandleError, got {:?}", other), + } + + match send_rollback( + &sender, + Point::Specific { + hash: [9u8; 32].into(), + slot: 123, + }, + ) + .await + { + IndexResult::FatalResetError { entry, reason } => { + assert!(entry.halted, "halt must remain true after failed reset"); + assert!( + reason.contains("reset boom"), + "expected reset failure reason in: {reason}" + ); + } + other => panic!("Expected FatalResetError, got {:?}", other), + } + } +} diff --git a/modules/custom_indexer/src/utils.rs b/modules/custom_indexer/src/utils.rs new file mode 100644 index 00000000..859c6ea9 --- /dev/null +++ b/modules/custom_indexer/src/utils.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use acropolis_common::commands::chain_sync::ChainSyncCommand; +use acropolis_common::messages::{Command, Message}; +use acropolis_common::{BlockInfo, Point}; +use anyhow::Result; +use caryatid_sdk::Context; +use futures::stream::FuturesUnordered; +use tokio::sync::oneshot; +use tracing::info; + +use crate::index_actor::{IndexCommand, IndexResult}; +use crate::SharedSenders; + +pub async fn change_sync_point( + point: Point, + context: Arc>, + topic: &String, +) -> Result<()> { + let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect( + point.clone(), + ))); + context.publish(topic, Arc::new(msg)).await?; + info!( + "Publishing sync command on {} for slot {}", + topic, + point.slot() + ); + + Ok(()) +} + +pub async fn send_txs_to_indexers( + senders: &SharedSenders, + block: &BlockInfo, + txs: &[Arc<[u8]>], +) -> FuturesUnordered< + impl futures::Future< + Output = ( + String, + Result, + ), + >, +> { + let senders_snapshot: Vec<_> = { + let map = senders.lock().await; + map.iter().map(|(n, tx)| (n.clone(), tx.clone())).collect() + }; + + let futs = FuturesUnordered::new(); + for (name, sender) in senders_snapshot { + let (tx_resp, rx_resp) = oneshot::channel(); + + let cmd = IndexCommand::ApplyTxs { + block: block.clone(), + txs: txs.to_vec(), + response_tx: tx_resp, + }; + + let _ = sender.send(cmd).await; + + futs.push(async move { (name, rx_resp.await) }); + } + futs +} + +pub async fn send_rollback_to_indexers( + senders: &SharedSenders, + point: &Point, +) -> FuturesUnordered< + impl futures::Future< + Output = ( + String, + Result, + ), + >, +> { + let senders_snapshot: Vec<_> = { + let map = senders.lock().await; + map.iter().map(|(n, tx)| (n.clone(), tx.clone())).collect() + }; + + let futs = FuturesUnordered::new(); + for (name, sender) in senders_snapshot { + let (tx_resp, rx_resp) = oneshot::channel(); + + let cmd = IndexCommand::Rollback { + point: point.clone(), + response_tx: tx_resp, + }; + + let _ = sender.send(cmd).await; + + futs.push(async move { (name, rx_resp.await) }); + } + futs +} diff --git a/processes/indexer/src/indices/fjall_pool_cost_index.rs b/processes/indexer/src/indices/fjall_pool_cost_index.rs index 6c6e3f55..02e89709 100644 --- a/processes/indexer/src/indices/fjall_pool_cost_index.rs +++ b/processes/indexer/src/indices/fjall_pool_cost_index.rs @@ -1,6 +1,6 @@ #![allow(unused)] use acropolis_codec::utils::to_pool_id; -use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_common::{BlockInfo, Lovelace, Point, PoolId}; use acropolis_module_custom_indexer::chain_index::ChainIndex; use anyhow::Result; use caryatid_sdk::async_trait; @@ -50,10 +50,11 @@ impl FjallPoolCostIndex { #[async_trait] impl ChainIndex for FjallPoolCostIndex { fn name(&self) -> String { - "pool-cost-index".into() + "in-memory-pool-cost-index".into() } async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + let mut changed = false; for cert in tx.certs().iter() { match cert { MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { @@ -64,10 +65,7 @@ impl ChainIndex for FjallPoolCostIndex { self.state.pools.insert(pool_id, *cost); self.partition.insert(key, value)?; - - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } alonzo::Certificate::PoolRetirement(operator, ..) => { let pool_id = to_pool_id(operator); @@ -75,10 +73,7 @@ impl ChainIndex for FjallPoolCostIndex { self.state.pools.remove(&pool_id); self.partition.remove(key)?; - - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } _ => {} @@ -91,10 +86,7 @@ impl ChainIndex for FjallPoolCostIndex { self.state.pools.insert(pool_id, *cost); self.partition.insert(key, value)?; - - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } conway::Certificate::PoolRetirement(operator, ..) => { let pool_id = to_pool_id(operator); @@ -102,16 +94,29 @@ impl ChainIndex for FjallPoolCostIndex { self.state.pools.remove(&pool_id); self.partition.remove(key)?; - - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } _ => {} }, _ => {} } } + + if changed && self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + Ok(()) } + + async fn reset(&mut self, start: &Point) -> Result { + self.state.pools = BTreeMap::new(); + + for item in self.partition.iter() { + let (key, _) = item?; + self.partition.remove(key.as_ref())?; + } + + Ok(start.clone()) + } } diff --git a/processes/indexer/src/indices/in_memory_pool_cost_index.rs b/processes/indexer/src/indices/in_memory_pool_cost_index.rs index 1e9bf6ea..f0392b70 100644 --- a/processes/indexer/src/indices/in_memory_pool_cost_index.rs +++ b/processes/indexer/src/indices/in_memory_pool_cost_index.rs @@ -1,6 +1,6 @@ #![allow(unused)] use acropolis_codec::utils::to_pool_id; -use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_common::{BlockInfo, Lovelace, Point, PoolId}; use acropolis_module_custom_indexer::chain_index::ChainIndex; use anyhow::Result; use caryatid_sdk::async_trait; @@ -34,24 +34,21 @@ impl InMemoryPoolCostIndex { #[async_trait] impl ChainIndex for InMemoryPoolCostIndex { fn name(&self) -> String { - "pool-cost-index".into() + "fjall-pool-cost-index".into() } async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + let mut changed = false; for cert in tx.certs().iter() { match cert { MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { alonzo::Certificate::PoolRegistration { operator, cost, .. } => { self.state.pools.insert(to_pool_id(operator), *cost); - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } alonzo::Certificate::PoolRetirement(operator, ..) => { self.state.pools.remove(&to_pool_id(operator)); - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } _ => {} @@ -59,21 +56,25 @@ impl ChainIndex for InMemoryPoolCostIndex { MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { conway::Certificate::PoolRegistration { operator, cost, .. } => { self.state.pools.insert(to_pool_id(operator), *cost); - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } conway::Certificate::PoolRetirement(operator, ..) => { self.state.pools.remove(&to_pool_id(operator)); - if self.sender.send(self.state.clone()).is_err() { - warn!("Pool cost state receiver dropped"); - } + changed = true; } _ => {} }, _ => {} } } + if changed && self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } Ok(()) } + + async fn reset(&mut self, start: &Point) -> Result { + self.state.pools = BTreeMap::new(); + Ok(start.clone()) + } } diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs index bd9d2c04..532a5578 100644 --- a/processes/indexer/src/main.rs +++ b/processes/indexer/src/main.rs @@ -47,24 +47,30 @@ async fn main() -> Result<()> { BlockUnpacker::register(&mut process); PeerNetworkInterface::register(&mut process); - // watch channel to send latest state to consumer on index change - let (sender, receiver) = watch::channel(FjallPoolCostState { + let (sender_1, receiver_1) = watch::channel(InMemoryPoolCostState { pools: BTreeMap::new(), }); - - /* Uncomment to test in memory indexer - let (sender, receiver) = watch::channel(InMemoryPoolCostState { + let (sender_2, receiver_2) = watch::channel(FjallPoolCostState { pools: BTreeMap::new(), }); - */ // Example receiver { - let mut rx = receiver.clone(); tokio::spawn(async move { - while rx.changed().await.is_ok() { - let snapshot = rx.borrow().clone(); - tracing::info!("New PoolCostIndex state: {:?}", snapshot.pools); + let mut r1 = receiver_1.clone(); + let mut r2 = receiver_2.clone(); + + loop { + tokio::select! { + _ = r1.changed() => { + let state = r1.borrow_and_update().clone(); + tracing::info!("Index 1 updated: {:?}", state.pools); + } + _ = r2.changed() => { + let state = r2.borrow_and_update().clone(); + tracing::info!("Index 2 updated: {:?}", state.pools); + } + } } }); } @@ -75,23 +81,24 @@ async fn main() -> Result<()> { slot: 16588737, }; - // Fjall backed example indexer - let indexer = CustomIndexer::new( - FjallPoolCostIndex::new("fjall-pool-cost-index", sender)?, - FjallCursorStore::new("fjall-cursor-store", shelley_start.clone())?, - shelley_start, - ); - - // In memory example indexer - /* - let indexer = CustomIndexer::new( - InMemoryPoolCostIndex::new(sender), - InMemoryCursorStore::new(shelley_start.clone()), - shelley_start, - ); - */ - - process.register(Arc::new(indexer)); + let indexer = Arc::new(CustomIndexer::new(FjallCursorStore::new( + "fjall-cursor-store", + )?)); + process.register(indexer.clone()); + indexer + .add_index( + InMemoryPoolCostIndex::new(sender_1), + shelley_start.clone(), + true, + ) + .await?; + indexer + .add_index( + FjallPoolCostIndex::new("fjall-pool-cost-index", sender_2)?, + shelley_start, + false, + ) + .await?; process.run().await?; Ok(())