From ef55c74277632524c51511df169258c486855956 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 1 Dec 2025 21:21:16 +0000 Subject: [PATCH 1/8] feat: custom indexer module with example index (WIP) Signed-off-by: William Hankins --- Cargo.lock | 57 +++++++--- common/src/commands/chain_sync.rs | 17 ++- .../{indexer => custom_indexer}/Cargo.toml | 5 +- modules/custom_indexer/config.default.toml | 3 + modules/custom_indexer/src/chain_indexer.rs | 107 ++++++++++++++++++ .../src/configuration.rs | 7 +- modules/custom_indexer/src/cursor_store.rs | 30 +++++ modules/custom_indexer/src/indexer.rs | 5 + modules/custom_indexer/src/managed_index.rs | 19 ++++ modules/indexer/config.default.toml | 2 - modules/indexer/src/indexer.rs | 67 ----------- modules/peer_network_interface/src/network.rs | 10 +- .../src/peer_network_interface.rs | 13 ++- processes/indexer/Cargo.toml | 7 +- processes/indexer/indexer.toml | 3 +- processes/indexer/src/indices/mod.rs | 1 + .../indexer/src/indices/pool_cost_index.rs | 69 +++++++++++ processes/indexer/src/main.rs | 48 +++++++- 18 files changed, 368 insertions(+), 102 deletions(-) rename modules/{indexer => custom_indexer}/Cargo.toml (79%) create mode 100644 modules/custom_indexer/config.default.toml create mode 100644 modules/custom_indexer/src/chain_indexer.rs rename modules/{indexer => custom_indexer}/src/configuration.rs (77%) create mode 100644 modules/custom_indexer/src/cursor_store.rs create mode 100644 modules/custom_indexer/src/indexer.rs create mode 100644 modules/custom_indexer/src/managed_index.rs delete mode 100644 modules/indexer/config.default.toml delete mode 100644 modules/indexer/src/indexer.rs create mode 100644 processes/indexer/src/indices/mod.rs create mode 100644 processes/indexer/src/indices/pool_cost_index.rs diff --git a/Cargo.lock b/Cargo.lock index cf4d93228..d61d41743 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_custom_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "futures", + "pallas 0.33.0", + "serde", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_drdd_state" version = "0.1.0" @@ -304,18 +319,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "acropolis_module_indexer" -version = "0.1.0" -dependencies = [ - "acropolis_common", - "anyhow", - "caryatid_sdk", - "config", - "serde", - "tracing", -] - [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -512,16 +515,21 @@ dependencies = [ name = "acropolis_process_indexer" version = "0.1.0" dependencies = [ + "acropolis_codec", "acropolis_common", "acropolis_module_block_unpacker", + "acropolis_module_custom_indexer", "acropolis_module_genesis_bootstrapper", - "acropolis_module_indexer", "acropolis_module_peer_network_interface", "anyhow", "caryatid_process", + "caryatid_sdk", "clap 4.5.51", "config", + "pallas 0.33.0", + "plutus-parser", "tokio", + "tracing", "tracing-subscriber", ] @@ -4855,6 +4863,29 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plutus-parser" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c91c35ed696191fddf770ed73605614336cfa1458c21efae905aed0783c806e" +dependencies = [ + "indexmap 2.12.0", + "pallas-primitives 0.33.0", + "plutus-parser-derive", + "thiserror 2.0.17", +] + +[[package]] +name = "plutus-parser-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb90dc0af619a3872f56e62da683792c43ccb940f55f76ed0f84b7e666bf813a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.109", +] + [[package]] name = "polling" version = "3.11.0" diff --git a/common/src/commands/chain_sync.rs b/common/src/commands/chain_sync.rs index 77a55f12b..093b99a44 100644 --- a/common/src/commands/chain_sync.rs +++ b/common/src/commands/chain_sync.rs @@ -2,5 +2,20 @@ use crate::{BlockHash, Slot}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ChainSyncCommand { - FindIntersect { slot: Slot, hash: BlockHash }, + FindIntersect { point: Point }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum Point { + Origin, + Specific(Slot, BlockHash), +} + +impl Point { + pub fn slot_or_default(&self) -> u64 { + match self { + Point::Origin => 0, + Point::Specific(slot, _) => *slot, + } + } } diff --git a/modules/indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml similarity index 79% rename from modules/indexer/Cargo.toml rename to modules/custom_indexer/Cargo.toml index aef2fcd75..ea4b2af33 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -1,7 +1,7 @@ # Acropolis indexer module [package] -name = "acropolis_module_indexer" +name = "acropolis_module_custom_indexer" version = "0.1.0" edition = "2021" authors = ["William Hankins "] @@ -16,7 +16,10 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } serde = { workspace = true, features = ["rc"] } +tokio = { workspace = true } tracing = { workspace = true } +pallas = { workspace = true} +futures = "0.3.31" [lib] path = "src/indexer.rs" diff --git a/modules/custom_indexer/config.default.toml b/modules/custom_indexer/config.default.toml new file mode 100644 index 000000000..1adcc0b21 --- /dev/null +++ b/modules/custom_indexer/config.default.toml @@ -0,0 +1,3 @@ +# The topic to publish sync commands on +sync-command-publisher-topic = "cardano.sync.command" +txs-subscribe-topic = "cardano.txs" \ No newline at end of file diff --git a/modules/custom_indexer/src/chain_indexer.rs b/modules/custom_indexer/src/chain_indexer.rs new file mode 100644 index 000000000..1c6ea0c89 --- /dev/null +++ b/modules/custom_indexer/src/chain_indexer.rs @@ -0,0 +1,107 @@ +use acropolis_common::{ + commands::chain_sync::{ChainSyncCommand, Point}, + messages::{CardanoMessage, Command, Message}, +}; +use anyhow::Result; +use caryatid_sdk::{async_trait, Context, Module}; +use config::Config; +use std::sync::Arc; +use tokio::sync::Mutex; + +use pallas::ledger::traverse::MultiEraTx; + +use crate::{ + configuration::CustomIndexerConfig, cursor_store::CursorStore, managed_index::ManagedIndex, +}; + +pub struct CustomIndexer { + index: Arc>, + cursor_store: Arc>, + tip: Arc>, +} + +impl CustomIndexer { + pub fn new(index: I, cursor_store: CS, start: Point) -> Self { + Self { + index: Arc::new(Mutex::new(index)), + cursor_store: Arc::new(Mutex::new(cursor_store)), + tip: Arc::new(Mutex::new(start)), + } + } +} + +#[async_trait] +impl Module for CustomIndexer +where + I: ManagedIndex, + CS: CursorStore, +{ + fn get_name(&self) -> &'static str { + "custom-indexer" + } + + fn get_description(&self) -> &'static str { + "Single external chain indexer module" + } + + async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let cfg = CustomIndexerConfig::try_load(&config)?; + let mut subscription = context.subscribe(&cfg.txs_subscribe_topic).await?; + let run_context = context.clone(); + + // Retrieve tip from cursor store with fallback to initial sync point + let start_point = { + let saved = { + let cs = self.cursor_store.lock().await; + cs.load().await? + }; + + let mut tip_guard = self.tip.lock().await; + if let Some(saved_point) = saved { + *tip_guard = saved_point.clone(); + saved_point + } else { + tip_guard.clone() + } + }; + + let index = Arc::clone(&self.index); + let cursor_store = Arc::clone(&self.cursor_store); + let tip = Arc::clone(&self.tip); + + context.run(async move { + // Publish initial sync point + let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect { + point: start_point.clone(), + })); + run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; + + // Forward received txs to index handlers + while let Ok((_, message)) = subscription.read().await { + if let Message::Cardano((block, CardanoMessage::ReceivedTxs(txs_msg))) = + message.as_ref() + { + // Call handle_onchain_tx on the index for all decoded txs + { + let mut idx = index.lock().await; + for raw_tx in &txs_msg.txs { + let tx = MultiEraTx::decode(raw_tx)?; + idx.handle_onchain_tx(block, &tx).await?; + } + } + + // Update and save tip + let new_tip = Point::Specific(block.slot, block.hash); + { + *tip.lock().await = new_tip.clone(); + cursor_store.lock().await.save(&new_tip).await?; + } + } + } + + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) + } +} diff --git a/modules/indexer/src/configuration.rs b/modules/custom_indexer/src/configuration.rs similarity index 77% rename from modules/indexer/src/configuration.rs rename to modules/custom_indexer/src/configuration.rs index 6b2b57310..023c5b920 100644 --- a/modules/indexer/src/configuration.rs +++ b/modules/custom_indexer/src/configuration.rs @@ -3,11 +3,12 @@ use config::Config; #[derive(serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub struct IndexerConfig { - pub sync_command_topic: String, +pub struct CustomIndexerConfig { + pub sync_command_publisher_topic: String, + pub txs_subscribe_topic: String, } -impl IndexerConfig { +impl CustomIndexerConfig { pub fn try_load(config: &Config) -> Result { let full_config = Config::builder() .add_source(config::File::from_str( diff --git a/modules/custom_indexer/src/cursor_store.rs b/modules/custom_indexer/src/cursor_store.rs new file mode 100644 index 000000000..ca0983836 --- /dev/null +++ b/modules/custom_indexer/src/cursor_store.rs @@ -0,0 +1,30 @@ +use std::future::Future; + +use acropolis_common::commands::chain_sync::Point; +use anyhow::Result; + +pub trait CursorStore: Send + Sync + 'static { + fn load(&self) -> impl Future>> + Send; + fn save(&mut self, point: &Point) -> impl Future> + Send; +} + +pub struct InMemoryCursorStore { + cursor: Option, +} +impl InMemoryCursorStore { + pub fn new(cursor: Point) -> Self { + Self { + cursor: Some(cursor), + } + } +} +impl CursorStore for InMemoryCursorStore { + async fn load(&self) -> Result> { + Ok(self.cursor.clone()) + } + + async fn save(&mut self, cursor: &Point) -> Result<()> { + self.cursor = Some(cursor.clone()); + Ok(()) + } +} diff --git a/modules/custom_indexer/src/indexer.rs b/modules/custom_indexer/src/indexer.rs new file mode 100644 index 000000000..90ebb5ed9 --- /dev/null +++ b/modules/custom_indexer/src/indexer.rs @@ -0,0 +1,5 @@ +//! Acropolis indexer module for Caryatid +pub mod chain_indexer; +mod configuration; +pub mod cursor_store; +pub mod managed_index; diff --git a/modules/custom_indexer/src/managed_index.rs b/modules/custom_indexer/src/managed_index.rs new file mode 100644 index 000000000..e95de5400 --- /dev/null +++ b/modules/custom_indexer/src/managed_index.rs @@ -0,0 +1,19 @@ +use acropolis_common::BlockInfo; +use anyhow::Result; +use caryatid_sdk::async_trait; +use pallas::ledger::traverse::MultiEraTx; + +#[async_trait] +pub trait ManagedIndex: Send + Sync + 'static { + fn name(&self) -> String; + + async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + let _ = (info, tx); + Ok(()) + } + + async fn handle_rollback(&mut self, info: &BlockInfo) -> Result<()> { + let _ = info; + Ok(()) + } +} diff --git a/modules/indexer/config.default.toml b/modules/indexer/config.default.toml deleted file mode 100644 index e8549e768..000000000 --- a/modules/indexer/config.default.toml +++ /dev/null @@ -1,2 +0,0 @@ -# The topic to publish sync commands on -sync-command-topic = "cardano.sync.command" \ No newline at end of file diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs deleted file mode 100644 index 8877c6d5d..000000000 --- a/modules/indexer/src/indexer.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! Acropolis indexer module for Caryatid -mod configuration; - -use acropolis_common::{ - commands::chain_sync::ChainSyncCommand, - hash::Hash, - messages::{Command, Message}, -}; -use anyhow::Result; -use caryatid_sdk::{module, Context}; -use config::Config; -use std::{str::FromStr, sync::Arc}; -use tracing::info; - -use crate::configuration::IndexerConfig; - -/// Indexer module -#[module( - message_type(Message), - name = "indexer", - description = "Core indexer module for indexer process" -)] -pub struct Indexer; - -impl Indexer { - /// Async initialisation - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - let cfg = IndexerConfig::try_load(&config)?; - info!( - "Creating sync command publisher on '{}'", - cfg.sync_command_topic - ); - - let ctx = context.clone(); - - // This is a placeholder to test dynamic sync - context.run(async move { - let example = ChainSyncCommand::FindIntersect { - slot: 4492799, - hash: Hash::from_str( - "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", - ) - .expect("Valid hash"), - }; - - // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) - ctx.message_bus - .publish( - &cfg.sync_command_topic, - Arc::new(Message::Command(Command::ChainSync(example.clone()))), - ) - .await - .unwrap(); - - // Simulate a later sync command to reset sync point to where we started - - ctx.message_bus - .publish( - &cfg.sync_command_topic, - Arc::new(Message::Command(Command::ChainSync(example))), - ) - .await - .unwrap(); - }); - Ok(()) - } -} diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index f6c4039eb..c5491e55b 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -9,7 +9,7 @@ use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; struct PeerData { conn: PeerConnection, @@ -102,6 +102,12 @@ impl NetworkManager { self.block_sink.last_epoch = Some(epoch); } + if let Some((&peer_id, _)) = self.peers.iter().next() { + self.set_preferred_upstream(peer_id); + } else { + warn!("Sync requested but no upstream peers available"); + } + self.sync_to_point(point); } } @@ -246,7 +252,7 @@ impl NetworkManager { self.block_sink.announce(header, body, rolled_back).await?; self.published_blocks += 1; if self.published_blocks.is_multiple_of(100) { - info!("Published block {}", header.number); + debug!("Published block {}", header.number); } self.chain.handle_block_published(); } diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 7456565c1..9dfcae015 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -153,12 +153,15 @@ impl PeerNetworkInterface { events_sender: mpsc::Sender, ) -> Result<()> { while let Ok((_, msg)) = subscription.read().await { - if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect { - slot, - hash, - })) = msg.as_ref() + if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect { point })) = + msg.as_ref() { - let point = Point::new(*slot, hash.to_vec()); + let point = match point { + acropolis_common::commands::chain_sync::Point::Specific(slot, hash) => { + Point::new(*slot, hash.to_vec()) + } + _ => Point::Origin, + }; if events_sender.send(NetworkEvent::SyncPointUpdate { point }).await.is_err() { bail!("event channel closed"); diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml index 3a368efbc..32682d75f 100644 --- a/processes/indexer/Cargo.toml +++ b/processes/indexer/Cargo.toml @@ -8,17 +8,22 @@ description = "Acropolis indexer process containing core modules" license = "Apache-2.0" [dependencies] +acropolis_codec = { path = "../../codec" } acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } -acropolis_module_indexer = { path = "../../modules/indexer" } +acropolis_module_custom_indexer = { path = "../../modules/custom_indexer" } caryatid_process = { workspace = true } +caryatid_sdk = { workspace = true } anyhow = { workspace = true } clap = { workspace = true } config = { workspace = true } +pallas = { workspace = true} +plutus-parser = { version = "0.1", default-features = false, features = ["derive", "pallas-v0_33"] } +tracing = { workspace = true } tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } tokio = { workspace = true } diff --git a/processes/indexer/indexer.toml b/processes/indexer/indexer.toml index 013c8dac1..1989cc48d 100644 --- a/processes/indexer/indexer.toml +++ b/processes/indexer/indexer.toml @@ -3,6 +3,7 @@ [module.genesis-bootstrapper] [module.peer-network-interface] +block-topic = "cardano.block.proposed" # Skip validation for indexer sync-point = "dynamic" node-addresses = [ "backbone.cardano.iog.io:3001", @@ -13,7 +14,7 @@ magic-number = 764824073 [module.block-unpacker] -[module.indexer] +[module.custom-indexer] [startup] topic = "cardano.sequence.start" diff --git a/processes/indexer/src/indices/mod.rs b/processes/indexer/src/indices/mod.rs new file mode 100644 index 000000000..7277c0646 --- /dev/null +++ b/processes/indexer/src/indices/mod.rs @@ -0,0 +1 @@ +pub mod pool_cost_index; diff --git a/processes/indexer/src/indices/pool_cost_index.rs b/processes/indexer/src/indices/pool_cost_index.rs new file mode 100644 index 000000000..0a40dfba4 --- /dev/null +++ b/processes/indexer/src/indices/pool_cost_index.rs @@ -0,0 +1,69 @@ +use acropolis_codec::map_parameters::to_pool_id; +use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_module_custom_indexer::managed_index::ManagedIndex; +use anyhow::Result; +use caryatid_sdk::async_trait; +use pallas::ledger::primitives::{alonzo, conway}; +use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; +use std::collections::BTreeMap; +use tokio::sync::watch; + +#[derive(Clone)] +pub struct PoolCostState { + pub pools: BTreeMap, +} + +pub struct PoolCostIndex { + state: PoolCostState, + sender: watch::Sender, +} + +impl PoolCostIndex { + pub fn new(sender: watch::Sender) -> Self { + Self { + state: PoolCostState { + pools: BTreeMap::new(), + }, + sender, + } + } +} + +#[async_trait] +impl ManagedIndex for PoolCostIndex { + fn name(&self) -> String { + "pool-cost-index".into() + } + + async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + for cert in tx.certs().iter() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::PoolRegistration { operator, cost, .. } => { + self.state.pools.insert(to_pool_id(operator), *cost); + let _ = self.sender.send(self.state.clone()); + } + alonzo::Certificate::PoolRetirement(operator, ..) => { + self.state.pools.remove(&to_pool_id(operator)); + let _ = self.sender.send(self.state.clone()); + } + + _ => {} + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::PoolRegistration { operator, cost, .. } => { + self.state.pools.insert(to_pool_id(operator), *cost); + let _ = self.sender.send(self.state.clone()); + } + conway::Certificate::PoolRetirement(operator, ..) => { + self.state.pools.remove(&to_pool_id(operator)); + let _ = self.sender.send(self.state.clone()); + } + _ => {} + }, + _ => {} + } + } + Ok(()) + } +} diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs index 50a3b458b..01a8400f2 100644 --- a/processes/indexer/src/main.rs +++ b/processes/indexer/src/main.rs @@ -1,15 +1,23 @@ -use acropolis_common::messages::Message; -use acropolis_module_indexer::Indexer; +use acropolis_common::{commands::chain_sync::Point, hash::Hash, messages::Message}; use anyhow::Result; use caryatid_process::Process; +use caryatid_sdk::module_registry::ModuleRegistry; use clap::Parser; use config::{Config, Environment, File}; -use std::sync::Arc; +use std::{collections::BTreeMap, str::FromStr, sync::Arc}; +use tokio::sync::watch; + +mod indices; use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_custom_indexer::{ + chain_indexer::CustomIndexer, cursor_store::InMemoryCursorStore, +}; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_peer_network_interface::PeerNetworkInterface; +use crate::indices::pool_cost_index::{PoolCostIndex, PoolCostState}; + #[derive(Debug, clap::Parser)] struct Args { #[arg(long, value_name = "PATH", default_value = "indexer.toml")] @@ -18,10 +26,9 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + // Get arguments and config let args = Args::parse(); - tracing_subscriber::fmt().with_env_filter("info").init(); - let config = Arc::new( Config::builder() .add_source(File::with_name(&args.config)) @@ -32,11 +39,40 @@ async fn main() -> Result<()> { let mut process = Process::::create(config).await; + // Core modules to fetch blocks and publish decoded transactions GenesisBootstrapper::register(&mut process); BlockUnpacker::register(&mut process); PeerNetworkInterface::register(&mut process); - Indexer::register(&mut process); + + // watch channel to send latest state to downstream process on index change + let (sender, receiver) = watch::channel(PoolCostState { + pools: BTreeMap::new(), + }); + + // Example receiver (This would likely be provided in initialization of a new module) + { + let mut rx = receiver.clone(); + tokio::spawn(async move { + while rx.changed().await.is_ok() { + let snapshot = rx.borrow().clone(); + tracing::info!("New PoolCostIndex state: {:?}", snapshot.pools); + } + }); + } + + // Initialize and register indexer + let shelley_start = Point::Specific( + 16588737, + Hash::from_str("4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a")?, + ); + let indexer = CustomIndexer::new( + PoolCostIndex::new(sender), + InMemoryCursorStore::new(shelley_start.clone()), + shelley_start, + ); + process.register(Arc::new(indexer)); process.run().await?; + Ok(()) } From f930b9069226d009c00ec77599c8f51d200c86d3 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Mon, 1 Dec 2025 23:48:40 +0000 Subject: [PATCH 2/8] feat: Fjall backed indexer example and cursor store Signed-off-by: William Hankins --- Cargo.lock | 15 ++- modules/custom_indexer/Cargo.toml | 6 +- modules/custom_indexer/config.default.toml | 1 + modules/custom_indexer/src/chain_indexer.rs | 5 + modules/custom_indexer/src/cursor_store.rs | 68 +++++++++-- processes/indexer/.gitignore | 1 + processes/indexer/Cargo.toml | 2 + processes/indexer/indexer.toml | 4 + .../src/indices/fjall_pool_cost_index.rs | 107 ++++++++++++++++++ ..._index.rs => in_memory_pool_cost_index.rs} | 17 +-- processes/indexer/src/indices/mod.rs | 3 +- processes/indexer/src/main.rs | 32 ++++-- 12 files changed, 232 insertions(+), 29 deletions(-) create mode 100644 processes/indexer/.gitignore create mode 100644 processes/indexer/src/indices/fjall_pool_cost_index.rs rename processes/indexer/src/indices/{pool_cost_index.rs => in_memory_pool_cost_index.rs} (85%) diff --git a/Cargo.lock b/Cargo.lock index 892770944..3772a9df2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,8 +200,10 @@ version = "0.1.0" dependencies = [ "acropolis_common", "anyhow", + "bincode 1.3.3", "caryatid_sdk", "config", + "fjall", "futures", "pallas 0.33.0", "serde", @@ -522,10 +524,12 @@ dependencies = [ "acropolis_module_genesis_bootstrapper", "acropolis_module_peer_network_interface", "anyhow", + "bincode 1.3.3", "caryatid_process", "caryatid_sdk", "clap 4.5.51", "config", + "fjall", "pallas 0.33.0", "plutus-parser", "tokio", @@ -1221,6 +1225,15 @@ dependencies = [ "paste", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bincode" version = "2.0.1" @@ -3768,7 +3781,7 @@ dependencies = [ "anyhow", "async-trait", "bech32 0.11.0", - "bincode", + "bincode 2.0.1", "blake2 0.10.6", "chrono", "ciborium", diff --git a/modules/custom_indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml index ea4b2af33..a58b2c004 100644 --- a/modules/custom_indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -14,12 +14,14 @@ acropolis_common = { path = "../../common" } caryatid_sdk = { workspace = true } anyhow = { workspace = true } +bincode = "1" config = { workspace = true } +fjall = "2.7.0" +futures = "0.3.31" +pallas = { workspace = true} serde = { workspace = true, features = ["rc"] } tokio = { workspace = true } tracing = { workspace = true } -pallas = { workspace = true} -futures = "0.3.31" [lib] path = "src/indexer.rs" diff --git a/modules/custom_indexer/config.default.toml b/modules/custom_indexer/config.default.toml index 1adcc0b21..a30c059a7 100644 --- a/modules/custom_indexer/config.default.toml +++ b/modules/custom_indexer/config.default.toml @@ -1,3 +1,4 @@ # The topic to publish sync commands on sync-command-publisher-topic = "cardano.sync.command" +# The topic to receive txs on txs-subscribe-topic = "cardano.txs" \ No newline at end of file diff --git a/modules/custom_indexer/src/chain_indexer.rs b/modules/custom_indexer/src/chain_indexer.rs index 43d093593..da48d5600 100644 --- a/modules/custom_indexer/src/chain_indexer.rs +++ b/modules/custom_indexer/src/chain_indexer.rs @@ -8,6 +8,7 @@ use caryatid_sdk::{async_trait, Context, Module}; use config::Config; use std::sync::Arc; use tokio::sync::Mutex; +use tracing::info; use pallas::ledger::traverse::MultiEraTx; @@ -76,6 +77,10 @@ where start_point, ))); run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; + info!( + "Publishing initial sync command on {}", + cfg.sync_command_publisher_topic + ); // Forward received txs to index handlers while let Ok((_, message)) = subscription.read().await { diff --git a/modules/custom_indexer/src/cursor_store.rs b/modules/custom_indexer/src/cursor_store.rs index cf7ad7f5a..c1664c463 100644 --- a/modules/custom_indexer/src/cursor_store.rs +++ b/modules/custom_indexer/src/cursor_store.rs @@ -1,30 +1,82 @@ -use std::future::Future; +use std::{future::Future, path::Path, sync::Mutex}; use acropolis_common::Point; use anyhow::Result; +use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; pub trait CursorStore: Send + Sync + 'static { fn load(&self) -> impl Future>> + Send; - fn save(&mut self, point: &Point) -> impl Future> + Send; + fn save(&self, point: &Point) -> impl Future> + Send; } +// In memory cursor store (Not persisted across runs) pub struct InMemoryCursorStore { - cursor: Option, + cursor: Mutex>, } impl InMemoryCursorStore { - pub fn new(cursor: Point) -> Self { + pub fn new(point: Point) -> Self { Self { - cursor: Some(cursor), + cursor: Mutex::new(Some(point)), } } } impl CursorStore for InMemoryCursorStore { async fn load(&self) -> Result> { - Ok(self.cursor.clone()) + let guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; + Ok(guard.as_ref().cloned()) } - async fn save(&mut self, cursor: &Point) -> Result<()> { - self.cursor = Some(cursor.clone()); + async fn save(&self, point: &Point) -> Result<()> { + let mut guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; + *guard = Some(point.clone()); + Ok(()) + } +} + +// Fjall backed cursor store (Retains last stored point) +pub struct FjallCursorStore { + cursor: Partition, +} + +impl FjallCursorStore { + pub fn new(path: impl AsRef, point: Point) -> Result { + let cfg = Config::new(path); + let keyspace = Keyspace::open(cfg)?; + let partition = keyspace.open_partition("cursor", PartitionCreateOptions::default())?; + + // Use stored point if exists or initialize with provided point + match partition.get("cursor")? { + Some(bytes) => { + bincode::deserialize::(&bytes)?; + Ok(Self { cursor: partition }) + } + None => { + let raw = bincode::serialize(&point)?; + partition.insert("cursor", raw)?; + Ok(Self { cursor: partition }) + } + } + } +} + +impl CursorStore for FjallCursorStore { + async fn load(&self) -> Result> { + let raw = self.cursor.get("cursor")?; + + let Some(bytes) = raw else { + return Ok(None); + }; + + let point: Point = bincode::deserialize(&bytes)?; + + Ok(Some(point)) + } + + async fn save(&self, point: &Point) -> Result<()> { + let raw = bincode::serialize(point)?; + + self.cursor.insert("cursor", raw)?; + Ok(()) } } diff --git a/processes/indexer/.gitignore b/processes/indexer/.gitignore new file mode 100644 index 000000000..6e36ceea7 --- /dev/null +++ b/processes/indexer/.gitignore @@ -0,0 +1 @@ +fjall-*/ \ No newline at end of file diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml index 32682d75f..776e08336 100644 --- a/processes/indexer/Cargo.toml +++ b/processes/indexer/Cargo.toml @@ -19,8 +19,10 @@ caryatid_process = { workspace = true } caryatid_sdk = { workspace = true } anyhow = { workspace = true } +bincode = "1" clap = { workspace = true } config = { workspace = true } +fjall = "2.7.0" pallas = { workspace = true} plutus-parser = { version = "0.1", default-features = false, features = ["derive", "pallas-v0_33"] } tracing = { workspace = true } diff --git a/processes/indexer/indexer.toml b/processes/indexer/indexer.toml index 1989cc48d..0a4e358b1 100644 --- a/processes/indexer/indexer.toml +++ b/processes/indexer/indexer.toml @@ -1,5 +1,9 @@ # Top-level configuration for Acropolis indexer process +[global.startup] +method = "snapshot" # Options: "mithril" | "snapshot" +topic = "cardano.sequence.start" + [module.genesis-bootstrapper] [module.peer-network-interface] diff --git a/processes/indexer/src/indices/fjall_pool_cost_index.rs b/processes/indexer/src/indices/fjall_pool_cost_index.rs new file mode 100644 index 000000000..ece76c618 --- /dev/null +++ b/processes/indexer/src/indices/fjall_pool_cost_index.rs @@ -0,0 +1,107 @@ +use acropolis_codec::map_parameters::to_pool_id; +use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_module_custom_indexer::managed_index::ManagedIndex; +use anyhow::Result; +use caryatid_sdk::async_trait; +use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; +use pallas::ledger::primitives::{alonzo, conway}; +use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; +use std::collections::BTreeMap; +use std::path::Path; +use tokio::sync::watch; + +#[derive(Clone)] +pub struct FjallPoolCostState { + pub pools: BTreeMap, +} + +pub struct FjallPoolCostIndex { + state: FjallPoolCostState, + sender: watch::Sender, + partition: Partition, +} + +impl FjallPoolCostIndex { + pub fn new(path: impl AsRef, sender: watch::Sender) -> Result { + // Open DB + let cfg = Config::new(path).max_write_buffer_size(512 * 1024 * 1024); + let keyspace = Keyspace::open(cfg)?; + let partition = keyspace.open_partition("pools", PartitionCreateOptions::default())?; + + // Read existing state into memory + let mut pools = BTreeMap::new(); + for item in partition.iter() { + let (key, val) = item?; + let pool_id = PoolId::try_from(key.as_ref())?; + let cost: Lovelace = bincode::deserialize(&val)?; + pools.insert(pool_id, cost); + } + + Ok(Self { + state: FjallPoolCostState { pools }, + sender, + partition, + }) + } +} + +#[async_trait] +impl ManagedIndex for FjallPoolCostIndex { + fn name(&self) -> String { + "pool-cost-index".into() + } + + async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + for cert in tx.certs().iter() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::PoolRegistration { operator, cost, .. } => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + let value = bincode::serialize(cost)?; + + self.state.pools.insert(pool_id, *cost); + self.partition.insert(key, value)?; + + let _ = self.sender.send(self.state.clone()); + } + alonzo::Certificate::PoolRetirement(operator, ..) => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + + self.state.pools.remove(&pool_id); + self.partition.remove(key)?; + + let _ = self.sender.send(self.state.clone()); + } + + _ => {} + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::PoolRegistration { operator, cost, .. } => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + let value = bincode::serialize(cost)?; + + self.state.pools.insert(pool_id, *cost); + self.partition.insert(key, value)?; + + let _ = self.sender.send(self.state.clone()); + } + conway::Certificate::PoolRetirement(operator, ..) => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + + self.state.pools.remove(&to_pool_id(operator)); + self.partition.remove(key)?; + + let _ = self.sender.send(self.state.clone()); + } + _ => {} + }, + _ => {} + } + } + Ok(()) + } +} diff --git a/processes/indexer/src/indices/pool_cost_index.rs b/processes/indexer/src/indices/in_memory_pool_cost_index.rs similarity index 85% rename from processes/indexer/src/indices/pool_cost_index.rs rename to processes/indexer/src/indices/in_memory_pool_cost_index.rs index 0a40dfba4..f95465b4b 100644 --- a/processes/indexer/src/indices/pool_cost_index.rs +++ b/processes/indexer/src/indices/in_memory_pool_cost_index.rs @@ -1,3 +1,4 @@ +#![allow(unused)] use acropolis_codec::map_parameters::to_pool_id; use acropolis_common::{BlockInfo, Lovelace, PoolId}; use acropolis_module_custom_indexer::managed_index::ManagedIndex; @@ -9,19 +10,19 @@ use std::collections::BTreeMap; use tokio::sync::watch; #[derive(Clone)] -pub struct PoolCostState { +pub struct InMemoryPoolCostState { pub pools: BTreeMap, } -pub struct PoolCostIndex { - state: PoolCostState, - sender: watch::Sender, +pub struct InMemoryPoolCostIndex { + state: InMemoryPoolCostState, + sender: watch::Sender, } -impl PoolCostIndex { - pub fn new(sender: watch::Sender) -> Self { +impl InMemoryPoolCostIndex { + pub fn new(sender: watch::Sender) -> Self { Self { - state: PoolCostState { + state: InMemoryPoolCostState { pools: BTreeMap::new(), }, sender, @@ -30,7 +31,7 @@ impl PoolCostIndex { } #[async_trait] -impl ManagedIndex for PoolCostIndex { +impl ManagedIndex for InMemoryPoolCostIndex { fn name(&self) -> String { "pool-cost-index".into() } diff --git a/processes/indexer/src/indices/mod.rs b/processes/indexer/src/indices/mod.rs index 7277c0646..72ffb5978 100644 --- a/processes/indexer/src/indices/mod.rs +++ b/processes/indexer/src/indices/mod.rs @@ -1 +1,2 @@ -pub mod pool_cost_index; +pub mod fjall_pool_cost_index; +pub mod in_memory_pool_cost_index; diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs index e8043c777..ef9343e87 100644 --- a/processes/indexer/src/main.rs +++ b/processes/indexer/src/main.rs @@ -7,16 +7,19 @@ use config::{Config, Environment, File}; use std::{collections::BTreeMap, str::FromStr, sync::Arc}; use tokio::sync::watch; -mod indices; - use acropolis_module_block_unpacker::BlockUnpacker; -use acropolis_module_custom_indexer::{ - chain_indexer::CustomIndexer, cursor_store::InMemoryCursorStore, -}; +use acropolis_module_custom_indexer::chain_indexer::CustomIndexer; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_peer_network_interface::PeerNetworkInterface; -use crate::indices::pool_cost_index::{PoolCostIndex, PoolCostState}; +mod indices; + +#[allow(unused_imports)] +use crate::indices::fjall_pool_cost_index::{FjallPoolCostIndex, FjallPoolCostState}; +#[allow(unused_imports)] +use crate::indices::in_memory_pool_cost_index::{InMemoryPoolCostIndex, InMemoryPoolCostState}; +#[allow(unused_imports)] +use acropolis_module_custom_indexer::cursor_store::{FjallCursorStore, InMemoryCursorStore}; #[derive(Debug, clap::Parser)] struct Args { @@ -45,7 +48,7 @@ async fn main() -> Result<()> { PeerNetworkInterface::register(&mut process); // watch channel to send latest state to consumer on index change - let (sender, receiver) = watch::channel(PoolCostState { + let (sender, receiver) = watch::channel(FjallPoolCostState { pools: BTreeMap::new(), }); @@ -65,13 +68,24 @@ async fn main() -> Result<()> { hash: Hash::from_str("4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a")?, slot: 16588737, }; + + // Fjall backed example indexer let indexer = CustomIndexer::new( - PoolCostIndex::new(sender), + FjallPoolCostIndex::new("fjall-pool-cost-index", sender)?, + FjallCursorStore::new("fjall-cursor-store", shelley_start.clone())?, + shelley_start, + ); + + // In memory example indexer + /* + let indexer = CustomIndexer::new( + InMemoryPoolCostIndex::new(sender), InMemoryCursorStore::new(shelley_start.clone()), shelley_start, ); - process.register(Arc::new(indexer)); + */ + process.register(Arc::new(indexer)); process.run().await?; Ok(()) From 55ec7d55741a06c6414f236e98636d8096297cd9 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 2 Dec 2025 20:03:35 +0000 Subject: [PATCH 3/8] fix: cleanup Signed-off-by: William Hankins --- modules/custom_indexer/Cargo.toml | 2 +- .../src/{managed_index.rs => chain_index.rs} | 8 +- modules/custom_indexer/src/chain_indexer.rs | 116 ------------- modules/custom_indexer/src/cursor_store.rs | 5 +- modules/custom_indexer/src/custom_indexer.rs | 163 ++++++++++++++++++ modules/custom_indexer/src/indexer.rs | 5 - .../src/indices/fjall_pool_cost_index.rs | 22 ++- .../src/indices/in_memory_pool_cost_index.rs | 21 ++- processes/indexer/src/main.rs | 8 +- 9 files changed, 207 insertions(+), 143 deletions(-) rename modules/custom_indexer/src/{managed_index.rs => chain_index.rs} (63%) delete mode 100644 modules/custom_indexer/src/chain_indexer.rs create mode 100644 modules/custom_indexer/src/custom_indexer.rs delete mode 100644 modules/custom_indexer/src/indexer.rs diff --git a/modules/custom_indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml index a58b2c004..4f9247473 100644 --- a/modules/custom_indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -24,4 +24,4 @@ tokio = { workspace = true } tracing = { workspace = true } [lib] -path = "src/indexer.rs" +path = "src/custom_indexer.rs" diff --git a/modules/custom_indexer/src/managed_index.rs b/modules/custom_indexer/src/chain_index.rs similarity index 63% rename from modules/custom_indexer/src/managed_index.rs rename to modules/custom_indexer/src/chain_index.rs index e95de5400..f77bd697a 100644 --- a/modules/custom_indexer/src/managed_index.rs +++ b/modules/custom_indexer/src/chain_index.rs @@ -1,10 +1,10 @@ -use acropolis_common::BlockInfo; +use acropolis_common::{BlockInfo, Point}; use anyhow::Result; use caryatid_sdk::async_trait; use pallas::ledger::traverse::MultiEraTx; #[async_trait] -pub trait ManagedIndex: Send + Sync + 'static { +pub trait ChainIndex: Send + Sync + 'static { fn name(&self) -> String; async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { @@ -12,8 +12,8 @@ pub trait ManagedIndex: Send + Sync + 'static { Ok(()) } - async fn handle_rollback(&mut self, info: &BlockInfo) -> Result<()> { - let _ = info; + async fn handle_rollback(&mut self, point: &Point) -> Result<()> { + let _ = point; Ok(()) } } diff --git a/modules/custom_indexer/src/chain_indexer.rs b/modules/custom_indexer/src/chain_indexer.rs deleted file mode 100644 index da48d5600..000000000 --- a/modules/custom_indexer/src/chain_indexer.rs +++ /dev/null @@ -1,116 +0,0 @@ -use acropolis_common::{ - commands::chain_sync::ChainSyncCommand, - messages::{CardanoMessage, Command, Message}, - Point, -}; -use anyhow::Result; -use caryatid_sdk::{async_trait, Context, Module}; -use config::Config; -use std::sync::Arc; -use tokio::sync::Mutex; -use tracing::info; - -use pallas::ledger::traverse::MultiEraTx; - -use crate::{ - configuration::CustomIndexerConfig, cursor_store::CursorStore, managed_index::ManagedIndex, -}; - -pub struct CustomIndexer { - index: Arc>, - cursor_store: Arc>, - tip: Arc>, -} - -impl CustomIndexer { - pub fn new(index: I, cursor_store: CS, start: Point) -> Self { - Self { - index: Arc::new(Mutex::new(index)), - cursor_store: Arc::new(Mutex::new(cursor_store)), - tip: Arc::new(Mutex::new(start)), - } - } -} - -#[async_trait] -impl Module for CustomIndexer -where - I: ManagedIndex, - CS: CursorStore, -{ - fn get_name(&self) -> &'static str { - "custom-indexer" - } - - fn get_description(&self) -> &'static str { - "Single external chain indexer module" - } - - async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - let cfg = CustomIndexerConfig::try_load(&config)?; - let mut subscription = context.subscribe(&cfg.txs_subscribe_topic).await?; - let run_context = context.clone(); - - // Retrieve tip from cursor store with fallback to initial sync point - let start_point = { - let saved = { - let cs = self.cursor_store.lock().await; - cs.load().await? - }; - - let mut tip_guard = self.tip.lock().await; - if let Some(saved_point) = saved { - *tip_guard = saved_point.clone(); - saved_point - } else { - tip_guard.clone() - } - }; - - let index = Arc::clone(&self.index); - let cursor_store = Arc::clone(&self.cursor_store); - let tip = Arc::clone(&self.tip); - - context.run(async move { - // Publish initial sync point - let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect( - start_point, - ))); - run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; - info!( - "Publishing initial sync command on {}", - cfg.sync_command_publisher_topic - ); - - // Forward received txs to index handlers - while let Ok((_, message)) = subscription.read().await { - if let Message::Cardano((block, CardanoMessage::ReceivedTxs(txs_msg))) = - message.as_ref() - { - // Call handle_onchain_tx on the index for all decoded txs - { - let mut idx = index.lock().await; - for raw_tx in &txs_msg.txs { - let tx = MultiEraTx::decode(raw_tx)?; - idx.handle_onchain_tx(block, &tx).await?; - } - } - - // Update and save tip - let new_tip = Point::Specific { - hash: block.hash, - slot: block.slot, - }; - { - *tip.lock().await = new_tip.clone(); - cursor_store.lock().await.save(&new_tip).await?; - } - } - } - - Ok::<_, anyhow::Error>(()) - }); - - Ok(()) - } -} diff --git a/modules/custom_indexer/src/cursor_store.rs b/modules/custom_indexer/src/cursor_store.rs index c1664c463..1f0b7b932 100644 --- a/modules/custom_indexer/src/cursor_store.rs +++ b/modules/custom_indexer/src/cursor_store.rs @@ -46,10 +46,7 @@ impl FjallCursorStore { // Use stored point if exists or initialize with provided point match partition.get("cursor")? { - Some(bytes) => { - bincode::deserialize::(&bytes)?; - Ok(Self { cursor: partition }) - } + Some(_) => Ok(Self { cursor: partition }), None => { let raw = bincode::serialize(&point)?; partition.insert("cursor", raw)?; diff --git a/modules/custom_indexer/src/custom_indexer.rs b/modules/custom_indexer/src/custom_indexer.rs new file mode 100644 index 000000000..172b043c3 --- /dev/null +++ b/modules/custom_indexer/src/custom_indexer.rs @@ -0,0 +1,163 @@ +//! Acropolis custom indexer module for Caryatid +//! +//! This module lets downstream applications register `ChainIndex` implementations +//! that react to on-chain transactions. The indexer handles cursor persistence, +//! initial sync, and dispatching decoded transactions to user provided indices. + +pub mod chain_index; +mod configuration; +pub mod cursor_store; + +use std::sync::Arc; +use tokio::sync::Mutex; + +use anyhow::Result; +use config::Config; +use tracing::{error, info, warn}; + +use caryatid_sdk::{async_trait, Context, Module}; +use pallas::ledger::traverse::MultiEraTx; + +use acropolis_common::{ + commands::chain_sync::ChainSyncCommand, + messages::{CardanoMessage, Command, Message, StateTransitionMessage}, + Point, +}; + +use crate::{ + chain_index::ChainIndex, configuration::CustomIndexerConfig, cursor_store::CursorStore, +}; + +pub struct CustomIndexer { + index: Arc>, + cursor_store: Arc>, + tip: Arc>, +} + +impl CustomIndexer { + pub fn new(index: I, cursor_store: CS, start: Point) -> Self { + Self { + index: Arc::new(Mutex::new(index)), + cursor_store: Arc::new(Mutex::new(cursor_store)), + tip: Arc::new(Mutex::new(start)), + } + } +} + +#[async_trait] +impl Module for CustomIndexer +where + I: ChainIndex, + CS: CursorStore, +{ + fn get_name(&self) -> &'static str { + "custom-indexer" + } + + fn get_description(&self) -> &'static str { + "Single external chain indexer module" + } + + async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let cfg = CustomIndexerConfig::try_load(&config)?; + let mut subscription = context.subscribe(&cfg.txs_subscribe_topic).await?; + let run_context = context.clone(); + + // Retrieve tip from cursor store with fallback to initial sync point + let start_point = { + let saved = self.cursor_store.lock().await.load().await?; + let mut tip_guard = self.tip.lock().await; + let start_point = saved.unwrap_or_else(|| tip_guard.clone()); + *tip_guard = start_point.clone(); + start_point + }; + + let index = Arc::clone(&self.index); + let cursor_store = Arc::clone(&self.cursor_store); + let tip = Arc::clone(&self.tip); + + context.run(async move { + // Publish initial sync point + let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect( + start_point, + ))); + run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; + info!( + "Publishing initial sync command on {}", + cfg.sync_command_publisher_topic + ); + + // Forward received txs and rollback notifications to index handlers + loop { + match subscription.read().await { + Ok((_, message)) => { + match message.as_ref() { + Message::Cardano((block, CardanoMessage::ReceivedTxs(txs_msg))) => { + // Call handle_onchain_tx on the index for all decoded txs + let mut idx = index.lock().await; + for (tx_index, raw_tx) in txs_msg.txs.iter().enumerate() { + match MultiEraTx::decode(raw_tx) { + Ok(tx) => { + if let Err(e) = idx.handle_onchain_tx(block, &tx).await + { + warn!( + "Failed to index tx {} in block {}: {e:#}", + tx_index, block.number + ); + } + } + Err(_) => { + warn!( + "Failed to decode tx {} in block {}", + tx_index, block.number + ); + } + } + } + + // Update and save tip + let new_tip = Point::Specific { + hash: block.hash, + slot: block.slot, + }; + *tip.lock().await = new_tip.clone(); + cursor_store.lock().await.save(&new_tip).await?; + } + + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback( + point, + )), + )) => { + // Call handle rollback on index + { + let mut idx = index.lock().await; + if let Err(e) = idx.handle_rollback(point).await { + error!("Failed to handle rollback at {:?}: {e:#}", point); + return Err(e); + } + } + + // Rollback tip and save + { + *tip.lock().await = point.clone(); + } + cursor_store.lock().await.save(point).await?; + } + _ => (), + } + } + Err(e) => { + error!("Subscription closed: {e:#}"); + break; + } + } + } + + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) + } +} diff --git a/modules/custom_indexer/src/indexer.rs b/modules/custom_indexer/src/indexer.rs deleted file mode 100644 index 90ebb5ed9..000000000 --- a/modules/custom_indexer/src/indexer.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Acropolis indexer module for Caryatid -pub mod chain_indexer; -mod configuration; -pub mod cursor_store; -pub mod managed_index; diff --git a/processes/indexer/src/indices/fjall_pool_cost_index.rs b/processes/indexer/src/indices/fjall_pool_cost_index.rs index ece76c618..d93b03b4a 100644 --- a/processes/indexer/src/indices/fjall_pool_cost_index.rs +++ b/processes/indexer/src/indices/fjall_pool_cost_index.rs @@ -1,6 +1,7 @@ +#![allow(unused)] use acropolis_codec::map_parameters::to_pool_id; use acropolis_common::{BlockInfo, Lovelace, PoolId}; -use acropolis_module_custom_indexer::managed_index::ManagedIndex; +use acropolis_module_custom_indexer::chain_index::ChainIndex; use anyhow::Result; use caryatid_sdk::async_trait; use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; @@ -9,6 +10,7 @@ use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; use std::collections::BTreeMap; use std::path::Path; use tokio::sync::watch; +use tracing::warn; #[derive(Clone)] pub struct FjallPoolCostState { @@ -46,7 +48,7 @@ impl FjallPoolCostIndex { } #[async_trait] -impl ManagedIndex for FjallPoolCostIndex { +impl ChainIndex for FjallPoolCostIndex { fn name(&self) -> String { "pool-cost-index".into() } @@ -63,7 +65,9 @@ impl ManagedIndex for FjallPoolCostIndex { self.state.pools.insert(pool_id, *cost); self.partition.insert(key, value)?; - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } alonzo::Certificate::PoolRetirement(operator, ..) => { let pool_id = to_pool_id(operator); @@ -72,7 +76,9 @@ impl ManagedIndex for FjallPoolCostIndex { self.state.pools.remove(&pool_id); self.partition.remove(key)?; - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } _ => {} @@ -86,7 +92,9 @@ impl ManagedIndex for FjallPoolCostIndex { self.state.pools.insert(pool_id, *cost); self.partition.insert(key, value)?; - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } conway::Certificate::PoolRetirement(operator, ..) => { let pool_id = to_pool_id(operator); @@ -95,7 +103,9 @@ impl ManagedIndex for FjallPoolCostIndex { self.state.pools.remove(&to_pool_id(operator)); self.partition.remove(key)?; - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } _ => {} }, diff --git a/processes/indexer/src/indices/in_memory_pool_cost_index.rs b/processes/indexer/src/indices/in_memory_pool_cost_index.rs index f95465b4b..5dcca4424 100644 --- a/processes/indexer/src/indices/in_memory_pool_cost_index.rs +++ b/processes/indexer/src/indices/in_memory_pool_cost_index.rs @@ -1,13 +1,14 @@ #![allow(unused)] use acropolis_codec::map_parameters::to_pool_id; use acropolis_common::{BlockInfo, Lovelace, PoolId}; -use acropolis_module_custom_indexer::managed_index::ManagedIndex; +use acropolis_module_custom_indexer::chain_index::ChainIndex; use anyhow::Result; use caryatid_sdk::async_trait; use pallas::ledger::primitives::{alonzo, conway}; use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; use std::collections::BTreeMap; use tokio::sync::watch; +use tracing::warn; #[derive(Clone)] pub struct InMemoryPoolCostState { @@ -31,7 +32,7 @@ impl InMemoryPoolCostIndex { } #[async_trait] -impl ManagedIndex for InMemoryPoolCostIndex { +impl ChainIndex for InMemoryPoolCostIndex { fn name(&self) -> String { "pool-cost-index".into() } @@ -42,11 +43,15 @@ impl ManagedIndex for InMemoryPoolCostIndex { MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { alonzo::Certificate::PoolRegistration { operator, cost, .. } => { self.state.pools.insert(to_pool_id(operator), *cost); - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } alonzo::Certificate::PoolRetirement(operator, ..) => { self.state.pools.remove(&to_pool_id(operator)); - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } _ => {} @@ -54,11 +59,15 @@ impl ManagedIndex for InMemoryPoolCostIndex { MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { conway::Certificate::PoolRegistration { operator, cost, .. } => { self.state.pools.insert(to_pool_id(operator), *cost); - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } conway::Certificate::PoolRetirement(operator, ..) => { self.state.pools.remove(&to_pool_id(operator)); - let _ = self.sender.send(self.state.clone()); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } } _ => {} }, diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs index ef9343e87..bd9d2c044 100644 --- a/processes/indexer/src/main.rs +++ b/processes/indexer/src/main.rs @@ -8,7 +8,7 @@ use std::{collections::BTreeMap, str::FromStr, sync::Arc}; use tokio::sync::watch; use acropolis_module_block_unpacker::BlockUnpacker; -use acropolis_module_custom_indexer::chain_indexer::CustomIndexer; +use acropolis_module_custom_indexer::CustomIndexer; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_peer_network_interface::PeerNetworkInterface; @@ -52,6 +52,12 @@ async fn main() -> Result<()> { pools: BTreeMap::new(), }); + /* Uncomment to test in memory indexer + let (sender, receiver) = watch::channel(InMemoryPoolCostState { + pools: BTreeMap::new(), + }); + */ + // Example receiver { let mut rx = receiver.clone(); From bf3933350d62c275e8233a2842657d5587df6d3b Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 2 Dec 2025 20:46:29 +0000 Subject: [PATCH 4/8] fix: cargo shear Signed-off-by: William Hankins --- Cargo.lock | 25 ------------------------- modules/custom_indexer/Cargo.toml | 1 - processes/indexer/Cargo.toml | 1 - 3 files changed, 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3772a9df2..492ecf77a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,7 +204,6 @@ dependencies = [ "caryatid_sdk", "config", "fjall", - "futures", "pallas 0.33.0", "serde", "tokio", @@ -531,7 +530,6 @@ dependencies = [ "config", "fjall", "pallas 0.33.0", - "plutus-parser", "tokio", "tracing", "tracing-subscriber", @@ -4869,29 +4867,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "plutus-parser" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c91c35ed696191fddf770ed73605614336cfa1458c21efae905aed0783c806e" -dependencies = [ - "indexmap 2.12.0", - "pallas-primitives 0.33.0", - "plutus-parser-derive", - "thiserror 2.0.17", -] - -[[package]] -name = "plutus-parser-derive" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb90dc0af619a3872f56e62da683792c43ccb940f55f76ed0f84b7e666bf813a" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.109", -] - [[package]] name = "polling" version = "3.11.0" diff --git a/modules/custom_indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml index 4f9247473..29bcdc4a7 100644 --- a/modules/custom_indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -17,7 +17,6 @@ anyhow = { workspace = true } bincode = "1" config = { workspace = true } fjall = "2.7.0" -futures = "0.3.31" pallas = { workspace = true} serde = { workspace = true, features = ["rc"] } tokio = { workspace = true } diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml index 776e08336..e979c2741 100644 --- a/processes/indexer/Cargo.toml +++ b/processes/indexer/Cargo.toml @@ -24,7 +24,6 @@ clap = { workspace = true } config = { workspace = true } fjall = "2.7.0" pallas = { workspace = true} -plutus-parser = { version = "0.1", default-features = false, features = ["derive", "pallas-v0_33"] } tracing = { workspace = true } tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } tokio = { workspace = true } From d6e56fcc551a3cad79800e04f114b7f192c5184e Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 2 Dec 2025 20:59:46 +0000 Subject: [PATCH 5/8] fix: use pool_id instead of duplicating conversion from key hash Signed-off-by: William Hankins --- processes/indexer/src/indices/fjall_pool_cost_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/indexer/src/indices/fjall_pool_cost_index.rs b/processes/indexer/src/indices/fjall_pool_cost_index.rs index d93b03b4a..364c6208c 100644 --- a/processes/indexer/src/indices/fjall_pool_cost_index.rs +++ b/processes/indexer/src/indices/fjall_pool_cost_index.rs @@ -100,7 +100,7 @@ impl ChainIndex for FjallPoolCostIndex { let pool_id = to_pool_id(operator); let key = pool_id.as_ref(); - self.state.pools.remove(&to_pool_id(operator)); + self.state.pools.remove(&pool_id); self.partition.remove(key)?; if self.sender.send(self.state.clone()).is_err() { From 9e1b189fde9c52bd20f9d7810d80218f31a9e0dd Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Thu, 4 Dec 2025 11:27:37 -0800 Subject: [PATCH 6/8] Update .gitignore files with additional ignore patterns --- .gitignore | 4 ++++ modules/address_state/.gitignore | 1 + modules/historical_accounts_state/.gitignore | 1 + modules/snapshot_bootstrapper/.gitignore | 2 ++ processes/omnibus/.gitignore | 1 + 5 files changed, 9 insertions(+) create mode 100644 modules/snapshot_bootstrapper/.gitignore diff --git a/.gitignore b/.gitignore index f879b368f..9873e1e8a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ restore.sh startup.sh configuration/ docker-compose.yaml +.DS_Store # Nix result @@ -13,3 +14,6 @@ result-* .direnv/ .envrc.local tests/fixtures/134092758.670ca68c3de580f8469677754a725e86ca72a7be381d3108569f0704a5fca327.cbor + +downloads/ +fjall-*/ diff --git a/modules/address_state/.gitignore b/modules/address_state/.gitignore index 6aed6c1a3..839abbcc9 100644 --- a/modules/address_state/.gitignore +++ b/modules/address_state/.gitignore @@ -1,2 +1,3 @@ # fjall immutable db fjall-*/ +db/ diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore index 6aed6c1a3..839abbcc9 100644 --- a/modules/historical_accounts_state/.gitignore +++ b/modules/historical_accounts_state/.gitignore @@ -1,2 +1,3 @@ # fjall immutable db fjall-*/ +db/ diff --git a/modules/snapshot_bootstrapper/.gitignore b/modules/snapshot_bootstrapper/.gitignore new file mode 100644 index 000000000..65cd0d397 --- /dev/null +++ b/modules/snapshot_bootstrapper/.gitignore @@ -0,0 +1,2 @@ +# Ignore any downloaded NewEpochSnapshot files +data/mainnet/*.cbor diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index f7cf0f866..8d5cb4152 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -1,6 +1,7 @@ downloads cache upstream-cache +spdd_db # DB files fjall-*/ From cf24748a2bc6cbb9694d35adb933984018aed65e Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Thu, 4 Dec 2025 11:54:27 -0800 Subject: [PATCH 7/8] fix: update UTxOIdentifier initialization to use TxHash --- modules/address_state/src/state.rs | 16 +++++++------- modules/assets_state/src/state.rs | 24 +++++++++------------ modules/utxo_state/src/state.rs | 19 +++++++---------- modules/utxo_state/src/volatile_index.rs | 27 ++++++++++++------------ 4 files changed, 40 insertions(+), 46 deletions(-) diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 350f4e133..0466cf38f 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -233,7 +233,7 @@ impl State { #[cfg(test)] mod tests { use super::*; - use acropolis_common::{Address, AddressDelta, UTxOIdentifier, Value}; + use acropolis_common::{Address, AddressDelta, TxHash, UTxOIdentifier, Value}; use tempfile::tempdir; fn dummy_address() -> Address { @@ -284,7 +284,7 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo = UTxOIdentifier::new(0, 0, 0); + let utxo = UTxOIdentifier::new(TxHash::default(), 0); let tx_id = TxIdentifier::new(0, 0); let deltas = vec![delta(&addr, tx_id, vec![], vec![utxo], 0, 1)]; @@ -295,7 +295,7 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); // Drain volatile to immutable state.volatile.epoch_start_block = 1; @@ -305,7 +305,7 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); // Perisist immutable to disk state.immutable.persist_epoch(0, &state.config).await?; @@ -314,7 +314,7 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); Ok(()) } @@ -326,7 +326,7 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo = UTxOIdentifier::new(0, 0, 0); + let utxo = UTxOIdentifier::new(TxHash::default(), 0); let tx_id_create = TxIdentifier::new(0, 0); let tx_id_spend = TxIdentifier::new(1, 0); @@ -377,8 +377,8 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo_old = UTxOIdentifier::new(0, 0, 0); - let utxo_new = UTxOIdentifier::new(0, 1, 0); + let utxo_old = UTxOIdentifier::new(TxHash::default(), 0); + let utxo_new = UTxOIdentifier::new(TxHash::default(), 1); let tx_id_create_old = TxIdentifier::new(0, 0); let tx_id_spend_old_create_new = TxIdentifier::new(1, 0); diff --git a/modules/assets_state/src/state.rs b/modules/assets_state/src/state.rs index 303fd4dd7..a355a3675 100644 --- a/modules/assets_state/src/state.rs +++ b/modules/assets_state/src/state.rs @@ -688,11 +688,7 @@ mod tests { asset_registry::{AssetId, AssetRegistry}, state::{AssetsStorageConfig, State, StoreTransactions, CIP67_LABEL_222, CIP68_LABEL_100}, }; - use acropolis_common::{ - Address, AddressDelta, AssetInfoRecord, AssetMetadata, AssetMetadataStandard, AssetName, - Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxIdentifier, TxOutput, - TxUTxODeltas, UTxOIdentifier, Value, - }; + use acropolis_common::{Address, AddressDelta, AssetInfoRecord, AssetMetadata, AssetMetadataStandard, AssetName, Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, UTxOIdentifier, Value}; use serde_cbor::Value as CborValue; fn dummy_policy(byte: u8) -> PolicyId { @@ -845,7 +841,7 @@ mod tests { fn make_output(policy_id: PolicyId, asset_name: AssetName, datum: Option>) -> TxOutput { TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: dummy_address(), value: Value { lovelace: 0, @@ -1263,7 +1259,7 @@ mod tests { StoreTransactions::None, ); - let input = UTxOIdentifier::new(0, 0, 0); + let input = UTxOIdentifier::new(TxHash::default(), 0); let output = make_output(policy_id, name, None); let tx_deltas = TxUTxODeltas { @@ -1428,7 +1424,7 @@ mod tests { tx_identifier, inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..output.clone() }], }; @@ -1436,7 +1432,7 @@ mod tests { tx_identifier, inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 1), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 1), ..output }], }; @@ -1470,7 +1466,7 @@ mod tests { tx_identifier: TxIdentifier::new(9, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..out1 }], }; @@ -1478,7 +1474,7 @@ mod tests { tx_identifier: TxIdentifier::new(10, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(10, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..out2 }], }; @@ -1513,7 +1509,7 @@ mod tests { tx_identifier: TxIdentifier::new(9, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output.clone() }], }; @@ -1521,7 +1517,7 @@ mod tests { tx_identifier: TxIdentifier::new(8, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(8, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output.clone() }], }; @@ -1529,7 +1525,7 @@ mod tests { tx_identifier: TxIdentifier::new(7, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(7, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output }], }; diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index 6343281b3..2817ae632 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -399,10 +399,7 @@ struct AddressTxMap { mod tests { use super::*; use crate::InMemoryImmutableUTXOStore; - use acropolis_common::{ - Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, - TxUTxODeltas, Value, - }; + use acropolis_common::{Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, TxHash, TxUTxODeltas, Value}; use config::Config; use tokio::sync::Mutex; @@ -449,7 +446,7 @@ mod tests { let reference_script_bytes = vec![0xde, 0xad, 0xbe, 0xef]; let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -523,7 +520,7 @@ mod tests { async fn observe_input_spends_utxo() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -562,7 +559,7 @@ mod tests { async fn rollback_removes_future_created_utxos() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -603,7 +600,7 @@ mod tests { // Create the UTXO in block 10 let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -653,7 +650,7 @@ mod tests { async fn prune_shifts_new_utxos_into_immutable() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -701,7 +698,7 @@ mod tests { async fn prune_deletes_old_spent_utxos() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -820,7 +817,7 @@ mod tests { state.register_address_delta_observer(observer.clone()); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, diff --git a/modules/utxo_state/src/volatile_index.rs b/modules/utxo_state/src/volatile_index.rs index 14a853a0e..d8981ccd7 100644 --- a/modules/utxo_state/src/volatile_index.rs +++ b/modules/utxo_state/src/volatile_index.rs @@ -97,6 +97,7 @@ impl VolatileIndex { // -- Tests -- #[cfg(test)] mod tests { + use acropolis_common::TxHash; use super::*; #[test] @@ -140,47 +141,47 @@ mod tests { assert_eq!(Some(1), index.first_block); assert_eq!(2, index.blocks.len()); - let utxo = UTxOIdentifier::new(42, 42, 42); + let utxo = UTxOIdentifier::new(TxHash::default(), 422); index.add_utxo(&utxo); assert!(index.blocks[0].is_empty()); assert!(!index.blocks[1].is_empty()); - assert_eq!(42, index.blocks[1][0].output_index()); + assert_eq!(422, index.blocks[1][0].output_index); } #[test] fn prune_before_deletes_and_calls_back_with_utxos() { let mut index = VolatileIndex::new(); index.add_block(1); - index.add_utxo(&UTxOIdentifier::new(1, 1, 1)); - index.add_utxo(&UTxOIdentifier::new(2, 2, 2)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 1)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 2)); index.add_block(2); - index.add_utxo(&UTxOIdentifier::new(3, 3, 3)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 3)); let pruned = index.prune_before(2); assert_eq!(Some(2), index.first_block); assert_eq!(1, index.blocks.len()); assert_eq!(2, pruned.len()); - assert_eq!(1, pruned[0].output_index()); - assert_eq!(2, pruned[1].output_index()); + assert_eq!(1, pruned[0].output_index); + assert_eq!(2, pruned[1].output_index); } #[test] fn prune_on_or_after_deletes_and_calls_back_with_utxos() { let mut index = VolatileIndex::new(); index.add_block(1); - index.add_utxo(&UTxOIdentifier::new(1, 1, 1)); - index.add_utxo(&UTxOIdentifier::new(2, 2, 2)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 1)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 2)); index.add_block(2); - index.add_utxo(&UTxOIdentifier::new(3, 3, 3)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 3)); let pruned = index.prune_on_or_after(1); assert_eq!(Some(1), index.first_block); assert_eq!(0, index.blocks.len()); assert_eq!(3, pruned.len()); // Note reverse order of blocks - assert_eq!(3, pruned[0].output_index()); - assert_eq!(1, pruned[1].output_index()); - assert_eq!(2, pruned[2].output_index()); + assert_eq!(3, pruned[0].output_index); + assert_eq!(1, pruned[1].output_index); + assert_eq!(2, pruned[2].output_index); } } From 232cbaa8566892f35ba5daf0e70ed3885cf21a3e Mon Sep 17 00:00:00 2001 From: Matthew Hounslow Date: Thu, 4 Dec 2025 12:01:13 -0800 Subject: [PATCH 8/8] Apply rustfmt and organize imports --- common/src/queries/blocks.rs | 3 +-- common/src/types.rs | 12 +++++++++--- modules/address_state/src/state.rs | 15 ++++++++++++--- modules/assets_state/src/state.rs | 6 +++++- modules/chain_store/src/chain_store.rs | 8 +++++--- .../src/genesis_bootstrapper.rs | 4 ++-- modules/rest_blockfrost/src/handlers/accounts.rs | 4 +++- modules/rest_blockfrost/src/handlers/addresses.rs | 8 ++++++-- modules/rest_blockfrost/src/types.rs | 4 ++-- modules/utxo_state/src/state.rs | 10 +++++----- modules/utxo_state/src/volatile_index.rs | 2 +- 11 files changed, 51 insertions(+), 25 deletions(-) diff --git a/common/src/queries/blocks.rs b/common/src/queries/blocks.rs index 52b310db9..fd7aad93c 100644 --- a/common/src/queries/blocks.rs +++ b/common/src/queries/blocks.rs @@ -2,8 +2,7 @@ use crate::queries::errors::QueryError; use crate::{ queries::misc::Order, serialization::{Bech32Conversion, Bech32WithHrp}, - Address, BlockHash, GenesisDelegate, HeavyDelegate, KeyHash, TxHash, TxIdentifier, - VrfKeyHash, + Address, BlockHash, GenesisDelegate, HeavyDelegate, KeyHash, TxHash, TxIdentifier, VrfKeyHash, }; use cryptoxide::hashing::blake2b::Blake2b; use serde::ser::{Serialize, SerializeStruct, Serializer}; diff --git a/common/src/types.rs b/common/src/types.rs index 060333f91..4973e4a26 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -2410,12 +2410,18 @@ mod tests { #[test] fn test_utxo_identifier_to_bytes() -> Result<()> { - let tx_hash = TxHash::try_from(hex::decode("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f").unwrap()).unwrap(); + let tx_hash = TxHash::try_from( + hex::decode("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f") + .unwrap(), + ) + .unwrap(); let output_index = 42; let utxo = UTxOIdentifier::new(tx_hash, output_index); let bytes = utxo.to_bytes(); - assert_eq!(hex::encode(bytes), - "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f002a"); + assert_eq!( + hex::encode(bytes), + "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f002a" + ); Ok(()) } diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 0466cf38f..31d4330f0 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -295,7 +295,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); // Drain volatile to immutable state.volatile.epoch_start_block = 1; @@ -305,7 +308,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); // Perisist immutable to disk state.immutable.persist_epoch(0, &state.config).await?; @@ -314,7 +320,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(TxHash::default(), 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); Ok(()) } diff --git a/modules/assets_state/src/state.rs b/modules/assets_state/src/state.rs index a355a3675..b7fb2a7d2 100644 --- a/modules/assets_state/src/state.rs +++ b/modules/assets_state/src/state.rs @@ -688,7 +688,11 @@ mod tests { asset_registry::{AssetId, AssetRegistry}, state::{AssetsStorageConfig, State, StoreTransactions, CIP67_LABEL_222, CIP68_LABEL_100}, }; - use acropolis_common::{Address, AddressDelta, AssetInfoRecord, AssetMetadata, AssetMetadataStandard, AssetName, Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, UTxOIdentifier, Value}; + use acropolis_common::{ + Address, AddressDelta, AssetInfoRecord, AssetMetadata, AssetMetadataStandard, AssetName, + Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxHash, TxIdentifier, + TxOutput, TxUTxODeltas, UTxOIdentifier, Value, + }; use serde_cbor::Value as CborValue; fn dummy_policy(byte: u8) -> PolicyId { diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index ba805a7aa..9a3a36f87 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -434,9 +434,11 @@ impl ChainStore { // TODO! Look up TxHash to get block hash, and index of Tx in block } - Ok(BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( - block_hashes_and_indexes, - )) + Ok( + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + block_hashes_and_indexes, + ), + ) } BlocksStateQuery::GetTransactionHashesAndTimestamps { tx_ids } => { let mut tx_hashes = Vec::with_capacity(tx_ids.len()); diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 47a57a555..2c4ec8520 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -9,8 +9,8 @@ use acropolis_common::{ UTXODeltasMessage, }, Address, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, GenesisDelegates, Lovelace, - LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, - UTxOIdentifier, Value, + LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, UTxOIdentifier, + Value, }; use anyhow::Result; use blake2::{digest::consts::U32, Blake2b, Digest}; diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 819658af5..518baf99f 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -783,7 +783,9 @@ pub async fn handle_account_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 20a020782..4c520df07 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -328,7 +328,9 @@ pub async fn handle_address_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), @@ -468,7 +470,9 @@ pub async fn handle_address_asset_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 9b8d8fde1..6d4b70b91 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -9,8 +9,8 @@ use acropolis_common::{ rest_helper::ToCheckedF64, serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, Datum, KeyHash, PolicyAsset, - PoolEpochState, PoolId, PoolUpdateAction, ReferenceScript, Relay, TxHash, UTXOValue, - ValueMap, Vote, VrfKeyHash, + PoolEpochState, PoolId, PoolUpdateAction, ReferenceScript, Relay, TxHash, UTXOValue, ValueMap, + Vote, VrfKeyHash, }; use anyhow::Result; use blake2::{Blake2b512, Digest}; diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index 2817ae632..a8bebb7f3 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -239,10 +239,7 @@ impl State { self.volatile_created.add_utxo(&key); if self.volatile_utxos.insert(key, value).is_some() { - error!( - "Saw UTXO {} before", - output.utxo_identifier - ); + error!("Saw UTXO {} before", output.utxo_identifier); } } BlockStatus::Bootstrap | BlockStatus::Immutable => { @@ -399,7 +396,10 @@ struct AddressTxMap { mod tests { use super::*; use crate::InMemoryImmutableUTXOStore; - use acropolis_common::{Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, TxHash, TxUTxODeltas, Value}; + use acropolis_common::{ + Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, + TxHash, TxUTxODeltas, Value, + }; use config::Config; use tokio::sync::Mutex; diff --git a/modules/utxo_state/src/volatile_index.rs b/modules/utxo_state/src/volatile_index.rs index d8981ccd7..b853a28e0 100644 --- a/modules/utxo_state/src/volatile_index.rs +++ b/modules/utxo_state/src/volatile_index.rs @@ -97,8 +97,8 @@ impl VolatileIndex { // -- Tests -- #[cfg(test)] mod tests { - use acropolis_common::TxHash; use super::*; + use acropolis_common::TxHash; #[test] fn new_index_is_empty() {