diff --git a/.gitignore b/.gitignore index f879b368f..9873e1e8a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ restore.sh startup.sh configuration/ docker-compose.yaml +.DS_Store # Nix result @@ -13,3 +14,6 @@ result-* .direnv/ .envrc.local tests/fixtures/134092758.670ca68c3de580f8469677754a725e86ca72a7be381d3108569f0704a5fca327.cbor + +downloads/ +fjall-*/ diff --git a/Cargo.lock b/Cargo.lock index 03198dc67..dee8c9cea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_custom_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "bincode 1.3.3", + "caryatid_sdk", + "config", + "fjall", + "pallas 0.33.0", + "serde", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_drdd_state" version = "0.1.0" @@ -306,18 +322,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "acropolis_module_indexer" -version = "0.1.0" -dependencies = [ - "acropolis_common", - "anyhow", - "caryatid_sdk", - "config", - "serde", - "tracing", -] - [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -518,16 +522,22 @@ dependencies = [ name = "acropolis_process_indexer" version = "0.1.0" dependencies = [ + "acropolis_codec", "acropolis_common", "acropolis_module_block_unpacker", + "acropolis_module_custom_indexer", "acropolis_module_genesis_bootstrapper", - "acropolis_module_indexer", "acropolis_module_peer_network_interface", "anyhow", + "bincode 1.3.3", "caryatid_process", + "caryatid_sdk", "clap 4.5.51", "config", + "fjall", + "pallas 0.33.0", "tokio", + "tracing", "tracing-subscriber", ] @@ -1219,6 +1229,15 @@ dependencies = [ "paste", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bincode" version = "2.0.1" @@ -3796,7 +3815,7 @@ dependencies = [ "anyhow", "async-trait", "bech32 0.11.0", - "bincode", + "bincode 2.0.1", "blake2 0.10.6", "chrono", "ciborium", diff --git a/common/src/queries/blocks.rs b/common/src/queries/blocks.rs index 52b310db9..fd7aad93c 100644 --- a/common/src/queries/blocks.rs +++ b/common/src/queries/blocks.rs @@ -2,8 +2,7 @@ use crate::queries::errors::QueryError; use crate::{ queries::misc::Order, serialization::{Bech32Conversion, Bech32WithHrp}, - Address, BlockHash, GenesisDelegate, HeavyDelegate, KeyHash, TxHash, TxIdentifier, - VrfKeyHash, + Address, BlockHash, GenesisDelegate, HeavyDelegate, KeyHash, TxHash, TxIdentifier, VrfKeyHash, }; use cryptoxide::hashing::blake2b::Blake2b; use serde::ser::{Serialize, SerializeStruct, Serializer}; diff --git a/common/src/types.rs b/common/src/types.rs index 060333f91..4973e4a26 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -2410,12 +2410,18 @@ mod tests { #[test] fn test_utxo_identifier_to_bytes() -> Result<()> { - let tx_hash = TxHash::try_from(hex::decode("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f").unwrap()).unwrap(); + let tx_hash = TxHash::try_from( + hex::decode("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f") + .unwrap(), + ) + .unwrap(); let output_index = 42; let utxo = UTxOIdentifier::new(tx_hash, output_index); let bytes = utxo.to_bytes(); - assert_eq!(hex::encode(bytes), - "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f002a"); + assert_eq!( + hex::encode(bytes), + "000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f002a" + ); Ok(()) } diff --git a/modules/address_state/.gitignore b/modules/address_state/.gitignore index 6aed6c1a3..839abbcc9 100644 --- a/modules/address_state/.gitignore +++ b/modules/address_state/.gitignore @@ -1,2 +1,3 @@ # fjall immutable db fjall-*/ +db/ diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 350f4e133..31d4330f0 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -233,7 +233,7 @@ impl State { #[cfg(test)] mod tests { use super::*; - use acropolis_common::{Address, AddressDelta, UTxOIdentifier, Value}; + use acropolis_common::{Address, AddressDelta, TxHash, UTxOIdentifier, Value}; use tempfile::tempdir; fn dummy_address() -> Address { @@ -284,7 +284,7 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo = UTxOIdentifier::new(0, 0, 0); + let utxo = UTxOIdentifier::new(TxHash::default(), 0); let tx_id = TxIdentifier::new(0, 0); let deltas = vec![delta(&addr, tx_id, vec![], vec![utxo], 0, 1)]; @@ -295,7 +295,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); // Drain volatile to immutable state.volatile.epoch_start_block = 1; @@ -305,7 +308,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); // Perisist immutable to disk state.immutable.persist_epoch(0, &state.config).await?; @@ -314,7 +320,10 @@ mod tests { let utxos = state.get_address_utxos(&addr).await?; assert!(utxos.is_some()); assert_eq!(utxos.as_ref().unwrap().len(), 1); - assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + assert_eq!( + utxos.as_ref().unwrap()[0], + UTxOIdentifier::new(TxHash::default(), 0) + ); Ok(()) } @@ -326,7 +335,7 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo = UTxOIdentifier::new(0, 0, 0); + let utxo = UTxOIdentifier::new(TxHash::default(), 0); let tx_id_create = TxIdentifier::new(0, 0); let tx_id_spend = TxIdentifier::new(1, 0); @@ -377,8 +386,8 @@ mod tests { let mut state = setup_state_and_store().await?; let addr = dummy_address(); - let utxo_old = UTxOIdentifier::new(0, 0, 0); - let utxo_new = UTxOIdentifier::new(0, 1, 0); + let utxo_old = UTxOIdentifier::new(TxHash::default(), 0); + let utxo_new = UTxOIdentifier::new(TxHash::default(), 1); let tx_id_create_old = TxIdentifier::new(0, 0); let tx_id_spend_old_create_new = TxIdentifier::new(1, 0); diff --git a/modules/assets_state/src/state.rs b/modules/assets_state/src/state.rs index 303fd4dd7..b7fb2a7d2 100644 --- a/modules/assets_state/src/state.rs +++ b/modules/assets_state/src/state.rs @@ -690,8 +690,8 @@ mod tests { }; use acropolis_common::{ Address, AddressDelta, AssetInfoRecord, AssetMetadata, AssetMetadataStandard, AssetName, - Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxIdentifier, TxOutput, - TxUTxODeltas, UTxOIdentifier, Value, + Datum, NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxHash, TxIdentifier, + TxOutput, TxUTxODeltas, UTxOIdentifier, Value, }; use serde_cbor::Value as CborValue; @@ -845,7 +845,7 @@ mod tests { fn make_output(policy_id: PolicyId, asset_name: AssetName, datum: Option>) -> TxOutput { TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: dummy_address(), value: Value { lovelace: 0, @@ -1263,7 +1263,7 @@ mod tests { StoreTransactions::None, ); - let input = UTxOIdentifier::new(0, 0, 0); + let input = UTxOIdentifier::new(TxHash::default(), 0); let output = make_output(policy_id, name, None); let tx_deltas = TxUTxODeltas { @@ -1428,7 +1428,7 @@ mod tests { tx_identifier, inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..output.clone() }], }; @@ -1436,7 +1436,7 @@ mod tests { tx_identifier, inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 1), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 1), ..output }], }; @@ -1470,7 +1470,7 @@ mod tests { tx_identifier: TxIdentifier::new(9, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..out1 }], }; @@ -1478,7 +1478,7 @@ mod tests { tx_identifier: TxIdentifier::new(10, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(10, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..out2 }], }; @@ -1513,7 +1513,7 @@ mod tests { tx_identifier: TxIdentifier::new(9, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output.clone() }], }; @@ -1521,7 +1521,7 @@ mod tests { tx_identifier: TxIdentifier::new(8, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(8, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output.clone() }], }; @@ -1529,7 +1529,7 @@ mod tests { tx_identifier: TxIdentifier::new(7, 0), inputs: Vec::new(), outputs: vec![TxOutput { - utxo_identifier: UTxOIdentifier::new(7, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), ..base_output }], }; diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index ba805a7aa..9a3a36f87 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -434,9 +434,11 @@ impl ChainStore { // TODO! Look up TxHash to get block hash, and index of Tx in block } - Ok(BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( - block_hashes_and_indexes, - )) + Ok( + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + block_hashes_and_indexes, + ), + ) } BlocksStateQuery::GetTransactionHashesAndTimestamps { tx_ids } => { let mut tx_hashes = Vec::with_capacity(tx_ids.len()); diff --git a/modules/indexer/Cargo.toml b/modules/custom_indexer/Cargo.toml similarity index 72% rename from modules/indexer/Cargo.toml rename to modules/custom_indexer/Cargo.toml index aef2fcd75..29bcdc4a7 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/custom_indexer/Cargo.toml @@ -1,7 +1,7 @@ # Acropolis indexer module [package] -name = "acropolis_module_indexer" +name = "acropolis_module_custom_indexer" version = "0.1.0" edition = "2021" authors = ["William Hankins "] @@ -14,9 +14,13 @@ acropolis_common = { path = "../../common" } caryatid_sdk = { workspace = true } anyhow = { workspace = true } +bincode = "1" config = { workspace = true } +fjall = "2.7.0" +pallas = { workspace = true} serde = { workspace = true, features = ["rc"] } +tokio = { workspace = true } tracing = { workspace = true } [lib] -path = "src/indexer.rs" +path = "src/custom_indexer.rs" diff --git a/modules/custom_indexer/config.default.toml b/modules/custom_indexer/config.default.toml new file mode 100644 index 000000000..a30c059a7 --- /dev/null +++ b/modules/custom_indexer/config.default.toml @@ -0,0 +1,4 @@ +# The topic to publish sync commands on +sync-command-publisher-topic = "cardano.sync.command" +# The topic to receive txs on +txs-subscribe-topic = "cardano.txs" \ No newline at end of file diff --git a/modules/custom_indexer/src/chain_index.rs b/modules/custom_indexer/src/chain_index.rs new file mode 100644 index 000000000..f77bd697a --- /dev/null +++ b/modules/custom_indexer/src/chain_index.rs @@ -0,0 +1,19 @@ +use acropolis_common::{BlockInfo, Point}; +use anyhow::Result; +use caryatid_sdk::async_trait; +use pallas::ledger::traverse::MultiEraTx; + +#[async_trait] +pub trait ChainIndex: Send + Sync + 'static { + fn name(&self) -> String; + + async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + let _ = (info, tx); + Ok(()) + } + + async fn handle_rollback(&mut self, point: &Point) -> Result<()> { + let _ = point; + Ok(()) + } +} diff --git a/modules/indexer/src/configuration.rs b/modules/custom_indexer/src/configuration.rs similarity index 77% rename from modules/indexer/src/configuration.rs rename to modules/custom_indexer/src/configuration.rs index 6b2b57310..023c5b920 100644 --- a/modules/indexer/src/configuration.rs +++ b/modules/custom_indexer/src/configuration.rs @@ -3,11 +3,12 @@ use config::Config; #[derive(serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub struct IndexerConfig { - pub sync_command_topic: String, +pub struct CustomIndexerConfig { + pub sync_command_publisher_topic: String, + pub txs_subscribe_topic: String, } -impl IndexerConfig { +impl CustomIndexerConfig { pub fn try_load(config: &Config) -> Result { let full_config = Config::builder() .add_source(config::File::from_str( diff --git a/modules/custom_indexer/src/cursor_store.rs b/modules/custom_indexer/src/cursor_store.rs new file mode 100644 index 000000000..1f0b7b932 --- /dev/null +++ b/modules/custom_indexer/src/cursor_store.rs @@ -0,0 +1,79 @@ +use std::{future::Future, path::Path, sync::Mutex}; + +use acropolis_common::Point; +use anyhow::Result; +use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; + +pub trait CursorStore: Send + Sync + 'static { + fn load(&self) -> impl Future>> + Send; + fn save(&self, point: &Point) -> impl Future> + Send; +} + +// In memory cursor store (Not persisted across runs) +pub struct InMemoryCursorStore { + cursor: Mutex>, +} +impl InMemoryCursorStore { + pub fn new(point: Point) -> Self { + Self { + cursor: Mutex::new(Some(point)), + } + } +} +impl CursorStore for InMemoryCursorStore { + async fn load(&self) -> Result> { + let guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; + Ok(guard.as_ref().cloned()) + } + + async fn save(&self, point: &Point) -> Result<()> { + let mut guard = self.cursor.lock().map_err(|_| anyhow::anyhow!("cursor mutex poisoned"))?; + *guard = Some(point.clone()); + Ok(()) + } +} + +// Fjall backed cursor store (Retains last stored point) +pub struct FjallCursorStore { + cursor: Partition, +} + +impl FjallCursorStore { + pub fn new(path: impl AsRef, point: Point) -> Result { + let cfg = Config::new(path); + let keyspace = Keyspace::open(cfg)?; + let partition = keyspace.open_partition("cursor", PartitionCreateOptions::default())?; + + // Use stored point if exists or initialize with provided point + match partition.get("cursor")? { + Some(_) => Ok(Self { cursor: partition }), + None => { + let raw = bincode::serialize(&point)?; + partition.insert("cursor", raw)?; + Ok(Self { cursor: partition }) + } + } + } +} + +impl CursorStore for FjallCursorStore { + async fn load(&self) -> Result> { + let raw = self.cursor.get("cursor")?; + + let Some(bytes) = raw else { + return Ok(None); + }; + + let point: Point = bincode::deserialize(&bytes)?; + + Ok(Some(point)) + } + + async fn save(&self, point: &Point) -> Result<()> { + let raw = bincode::serialize(point)?; + + self.cursor.insert("cursor", raw)?; + + Ok(()) + } +} diff --git a/modules/custom_indexer/src/custom_indexer.rs b/modules/custom_indexer/src/custom_indexer.rs new file mode 100644 index 000000000..172b043c3 --- /dev/null +++ b/modules/custom_indexer/src/custom_indexer.rs @@ -0,0 +1,163 @@ +//! Acropolis custom indexer module for Caryatid +//! +//! This module lets downstream applications register `ChainIndex` implementations +//! that react to on-chain transactions. The indexer handles cursor persistence, +//! initial sync, and dispatching decoded transactions to user provided indices. + +pub mod chain_index; +mod configuration; +pub mod cursor_store; + +use std::sync::Arc; +use tokio::sync::Mutex; + +use anyhow::Result; +use config::Config; +use tracing::{error, info, warn}; + +use caryatid_sdk::{async_trait, Context, Module}; +use pallas::ledger::traverse::MultiEraTx; + +use acropolis_common::{ + commands::chain_sync::ChainSyncCommand, + messages::{CardanoMessage, Command, Message, StateTransitionMessage}, + Point, +}; + +use crate::{ + chain_index::ChainIndex, configuration::CustomIndexerConfig, cursor_store::CursorStore, +}; + +pub struct CustomIndexer { + index: Arc>, + cursor_store: Arc>, + tip: Arc>, +} + +impl CustomIndexer { + pub fn new(index: I, cursor_store: CS, start: Point) -> Self { + Self { + index: Arc::new(Mutex::new(index)), + cursor_store: Arc::new(Mutex::new(cursor_store)), + tip: Arc::new(Mutex::new(start)), + } + } +} + +#[async_trait] +impl Module for CustomIndexer +where + I: ChainIndex, + CS: CursorStore, +{ + fn get_name(&self) -> &'static str { + "custom-indexer" + } + + fn get_description(&self) -> &'static str { + "Single external chain indexer module" + } + + async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let cfg = CustomIndexerConfig::try_load(&config)?; + let mut subscription = context.subscribe(&cfg.txs_subscribe_topic).await?; + let run_context = context.clone(); + + // Retrieve tip from cursor store with fallback to initial sync point + let start_point = { + let saved = self.cursor_store.lock().await.load().await?; + let mut tip_guard = self.tip.lock().await; + let start_point = saved.unwrap_or_else(|| tip_guard.clone()); + *tip_guard = start_point.clone(); + start_point + }; + + let index = Arc::clone(&self.index); + let cursor_store = Arc::clone(&self.cursor_store); + let tip = Arc::clone(&self.tip); + + context.run(async move { + // Publish initial sync point + let msg = Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect( + start_point, + ))); + run_context.publish(&cfg.sync_command_publisher_topic, Arc::new(msg)).await?; + info!( + "Publishing initial sync command on {}", + cfg.sync_command_publisher_topic + ); + + // Forward received txs and rollback notifications to index handlers + loop { + match subscription.read().await { + Ok((_, message)) => { + match message.as_ref() { + Message::Cardano((block, CardanoMessage::ReceivedTxs(txs_msg))) => { + // Call handle_onchain_tx on the index for all decoded txs + let mut idx = index.lock().await; + for (tx_index, raw_tx) in txs_msg.txs.iter().enumerate() { + match MultiEraTx::decode(raw_tx) { + Ok(tx) => { + if let Err(e) = idx.handle_onchain_tx(block, &tx).await + { + warn!( + "Failed to index tx {} in block {}: {e:#}", + tx_index, block.number + ); + } + } + Err(_) => { + warn!( + "Failed to decode tx {} in block {}", + tx_index, block.number + ); + } + } + } + + // Update and save tip + let new_tip = Point::Specific { + hash: block.hash, + slot: block.slot, + }; + *tip.lock().await = new_tip.clone(); + cursor_store.lock().await.save(&new_tip).await?; + } + + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback( + point, + )), + )) => { + // Call handle rollback on index + { + let mut idx = index.lock().await; + if let Err(e) = idx.handle_rollback(point).await { + error!("Failed to handle rollback at {:?}: {e:#}", point); + return Err(e); + } + } + + // Rollback tip and save + { + *tip.lock().await = point.clone(); + } + cursor_store.lock().await.save(point).await?; + } + _ => (), + } + } + Err(e) => { + error!("Subscription closed: {e:#}"); + break; + } + } + } + + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) + } +} diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index 47a57a555..2c4ec8520 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -9,8 +9,8 @@ use acropolis_common::{ UTXODeltasMessage, }, Address, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, GenesisDelegates, Lovelace, - LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, - UTxOIdentifier, Value, + LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutput, TxUTxODeltas, UTxOIdentifier, + Value, }; use anyhow::Result; use blake2::{digest::consts::U32, Blake2b, Digest}; diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore index 6aed6c1a3..839abbcc9 100644 --- a/modules/historical_accounts_state/.gitignore +++ b/modules/historical_accounts_state/.gitignore @@ -1,2 +1,3 @@ # fjall immutable db fjall-*/ +db/ diff --git a/modules/indexer/config.default.toml b/modules/indexer/config.default.toml deleted file mode 100644 index e8549e768..000000000 --- a/modules/indexer/config.default.toml +++ /dev/null @@ -1,2 +0,0 @@ -# The topic to publish sync commands on -sync-command-topic = "cardano.sync.command" \ No newline at end of file diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs deleted file mode 100644 index 7199c47fd..000000000 --- a/modules/indexer/src/indexer.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! Acropolis indexer module for Caryatid -mod configuration; - -use acropolis_common::{ - commands::chain_sync::ChainSyncCommand, - hash::Hash, - messages::{Command, Message}, - Point, -}; -use anyhow::Result; -use caryatid_sdk::{module, Context}; -use config::Config; -use std::{str::FromStr, sync::Arc}; -use tracing::info; - -use crate::configuration::IndexerConfig; - -/// Indexer module -#[module( - message_type(Message), - name = "indexer", - description = "Core indexer module for indexer process" -)] -pub struct Indexer; - -impl Indexer { - /// Async initialisation - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - let cfg = IndexerConfig::try_load(&config)?; - info!( - "Creating sync command publisher on '{}'", - cfg.sync_command_topic - ); - - let ctx = context.clone(); - - // This is a placeholder to test dynamic sync - context.run(async move { - let example = ChainSyncCommand::FindIntersect(Point::Specific { - slot: 4492799, - hash: Hash::from_str( - "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", - ) - .expect("Valid hash"), - }); - - // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) - ctx.message_bus - .publish( - &cfg.sync_command_topic, - Arc::new(Message::Command(Command::ChainSync(example.clone()))), - ) - .await - .unwrap(); - - // Simulate a later sync command to reset sync point to where we started - - ctx.message_bus - .publish( - &cfg.sync_command_topic, - Arc::new(Message::Command(Command::ChainSync(example))), - ) - .await - .unwrap(); - }); - Ok(()) - } -} diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 535eabf4b..f01748422 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -104,6 +104,12 @@ impl NetworkManager { self.block_sink.last_epoch = Some(epoch); } + if let Some((&peer_id, _)) = self.peers.iter().next() { + self.set_preferred_upstream(peer_id); + } else { + warn!("Sync requested but no upstream peers available"); + } + self.sync_to_point(point); } } diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 819658af5..518baf99f 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -783,7 +783,9 @@ pub async fn handle_account_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 20a020782..4c520df07 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -328,7 +328,9 @@ pub async fn handle_address_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), @@ -468,7 +470,9 @@ pub async fn handle_address_asset_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes(hashes_and_indexes), + BlocksStateQueryResponse::BlockHashesAndIndexOfTransactionHashes( + hashes_and_indexes, + ), )) => Ok(hashes_and_indexes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 9b8d8fde1..6d4b70b91 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -9,8 +9,8 @@ use acropolis_common::{ rest_helper::ToCheckedF64, serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, Datum, KeyHash, PolicyAsset, - PoolEpochState, PoolId, PoolUpdateAction, ReferenceScript, Relay, TxHash, UTXOValue, - ValueMap, Vote, VrfKeyHash, + PoolEpochState, PoolId, PoolUpdateAction, ReferenceScript, Relay, TxHash, UTXOValue, ValueMap, + Vote, VrfKeyHash, }; use anyhow::Result; use blake2::{Blake2b512, Digest}; diff --git a/modules/snapshot_bootstrapper/.gitignore b/modules/snapshot_bootstrapper/.gitignore new file mode 100644 index 000000000..65cd0d397 --- /dev/null +++ b/modules/snapshot_bootstrapper/.gitignore @@ -0,0 +1,2 @@ +# Ignore any downloaded NewEpochSnapshot files +data/mainnet/*.cbor diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index 6343281b3..a8bebb7f3 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -239,10 +239,7 @@ impl State { self.volatile_created.add_utxo(&key); if self.volatile_utxos.insert(key, value).is_some() { - error!( - "Saw UTXO {} before", - output.utxo_identifier - ); + error!("Saw UTXO {} before", output.utxo_identifier); } } BlockStatus::Bootstrap | BlockStatus::Immutable => { @@ -401,7 +398,7 @@ mod tests { use crate::InMemoryImmutableUTXOStore; use acropolis_common::{ Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, - TxUTxODeltas, Value, + TxHash, TxUTxODeltas, Value, }; use config::Config; use tokio::sync::Mutex; @@ -449,7 +446,7 @@ mod tests { let reference_script_bytes = vec![0xde, 0xad, 0xbe, 0xef]; let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -523,7 +520,7 @@ mod tests { async fn observe_input_spends_utxo() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -562,7 +559,7 @@ mod tests { async fn rollback_removes_future_created_utxos() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -603,7 +600,7 @@ mod tests { // Create the UTXO in block 10 let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -653,7 +650,7 @@ mod tests { async fn prune_shifts_new_utxos_into_immutable() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -701,7 +698,7 @@ mod tests { async fn prune_deletes_old_spent_utxos() { let mut state = new_state(); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, @@ -820,7 +817,7 @@ mod tests { state.register_address_delta_observer(observer.clone()); let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), + utxo_identifier: UTxOIdentifier::new(TxHash::default(), 0), address: create_address(99), value: Value::new( 42, diff --git a/modules/utxo_state/src/volatile_index.rs b/modules/utxo_state/src/volatile_index.rs index 14a853a0e..b853a28e0 100644 --- a/modules/utxo_state/src/volatile_index.rs +++ b/modules/utxo_state/src/volatile_index.rs @@ -98,6 +98,7 @@ impl VolatileIndex { #[cfg(test)] mod tests { use super::*; + use acropolis_common::TxHash; #[test] fn new_index_is_empty() { @@ -140,47 +141,47 @@ mod tests { assert_eq!(Some(1), index.first_block); assert_eq!(2, index.blocks.len()); - let utxo = UTxOIdentifier::new(42, 42, 42); + let utxo = UTxOIdentifier::new(TxHash::default(), 422); index.add_utxo(&utxo); assert!(index.blocks[0].is_empty()); assert!(!index.blocks[1].is_empty()); - assert_eq!(42, index.blocks[1][0].output_index()); + assert_eq!(422, index.blocks[1][0].output_index); } #[test] fn prune_before_deletes_and_calls_back_with_utxos() { let mut index = VolatileIndex::new(); index.add_block(1); - index.add_utxo(&UTxOIdentifier::new(1, 1, 1)); - index.add_utxo(&UTxOIdentifier::new(2, 2, 2)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 1)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 2)); index.add_block(2); - index.add_utxo(&UTxOIdentifier::new(3, 3, 3)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 3)); let pruned = index.prune_before(2); assert_eq!(Some(2), index.first_block); assert_eq!(1, index.blocks.len()); assert_eq!(2, pruned.len()); - assert_eq!(1, pruned[0].output_index()); - assert_eq!(2, pruned[1].output_index()); + assert_eq!(1, pruned[0].output_index); + assert_eq!(2, pruned[1].output_index); } #[test] fn prune_on_or_after_deletes_and_calls_back_with_utxos() { let mut index = VolatileIndex::new(); index.add_block(1); - index.add_utxo(&UTxOIdentifier::new(1, 1, 1)); - index.add_utxo(&UTxOIdentifier::new(2, 2, 2)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 1)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 2)); index.add_block(2); - index.add_utxo(&UTxOIdentifier::new(3, 3, 3)); + index.add_utxo(&UTxOIdentifier::new(TxHash::default(), 3)); let pruned = index.prune_on_or_after(1); assert_eq!(Some(1), index.first_block); assert_eq!(0, index.blocks.len()); assert_eq!(3, pruned.len()); // Note reverse order of blocks - assert_eq!(3, pruned[0].output_index()); - assert_eq!(1, pruned[1].output_index()); - assert_eq!(2, pruned[2].output_index()); + assert_eq!(3, pruned[0].output_index); + assert_eq!(1, pruned[1].output_index); + assert_eq!(2, pruned[2].output_index); } } diff --git a/processes/indexer/.gitignore b/processes/indexer/.gitignore new file mode 100644 index 000000000..6e36ceea7 --- /dev/null +++ b/processes/indexer/.gitignore @@ -0,0 +1 @@ +fjall-*/ \ No newline at end of file diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml index 3a368efbc..e979c2741 100644 --- a/processes/indexer/Cargo.toml +++ b/processes/indexer/Cargo.toml @@ -8,17 +8,23 @@ description = "Acropolis indexer process containing core modules" license = "Apache-2.0" [dependencies] +acropolis_codec = { path = "../../codec" } acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } -acropolis_module_indexer = { path = "../../modules/indexer" } +acropolis_module_custom_indexer = { path = "../../modules/custom_indexer" } caryatid_process = { workspace = true } +caryatid_sdk = { workspace = true } anyhow = { workspace = true } +bincode = "1" clap = { workspace = true } config = { workspace = true } +fjall = "2.7.0" +pallas = { workspace = true} +tracing = { workspace = true } tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } tokio = { workspace = true } diff --git a/processes/indexer/indexer.toml b/processes/indexer/indexer.toml index 013c8dac1..0a4e358b1 100644 --- a/processes/indexer/indexer.toml +++ b/processes/indexer/indexer.toml @@ -1,8 +1,13 @@ # Top-level configuration for Acropolis indexer process +[global.startup] +method = "snapshot" # Options: "mithril" | "snapshot" +topic = "cardano.sequence.start" + [module.genesis-bootstrapper] [module.peer-network-interface] +block-topic = "cardano.block.proposed" # Skip validation for indexer sync-point = "dynamic" node-addresses = [ "backbone.cardano.iog.io:3001", @@ -13,7 +18,7 @@ magic-number = 764824073 [module.block-unpacker] -[module.indexer] +[module.custom-indexer] [startup] topic = "cardano.sequence.start" diff --git a/processes/indexer/src/indices/fjall_pool_cost_index.rs b/processes/indexer/src/indices/fjall_pool_cost_index.rs new file mode 100644 index 000000000..364c6208c --- /dev/null +++ b/processes/indexer/src/indices/fjall_pool_cost_index.rs @@ -0,0 +1,117 @@ +#![allow(unused)] +use acropolis_codec::map_parameters::to_pool_id; +use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_module_custom_indexer::chain_index::ChainIndex; +use anyhow::Result; +use caryatid_sdk::async_trait; +use fjall::{Config, Keyspace, Partition, PartitionCreateOptions}; +use pallas::ledger::primitives::{alonzo, conway}; +use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; +use std::collections::BTreeMap; +use std::path::Path; +use tokio::sync::watch; +use tracing::warn; + +#[derive(Clone)] +pub struct FjallPoolCostState { + pub pools: BTreeMap, +} + +pub struct FjallPoolCostIndex { + state: FjallPoolCostState, + sender: watch::Sender, + partition: Partition, +} + +impl FjallPoolCostIndex { + pub fn new(path: impl AsRef, sender: watch::Sender) -> Result { + // Open DB + let cfg = Config::new(path).max_write_buffer_size(512 * 1024 * 1024); + let keyspace = Keyspace::open(cfg)?; + let partition = keyspace.open_partition("pools", PartitionCreateOptions::default())?; + + // Read existing state into memory + let mut pools = BTreeMap::new(); + for item in partition.iter() { + let (key, val) = item?; + let pool_id = PoolId::try_from(key.as_ref())?; + let cost: Lovelace = bincode::deserialize(&val)?; + pools.insert(pool_id, cost); + } + + Ok(Self { + state: FjallPoolCostState { pools }, + sender, + partition, + }) + } +} + +#[async_trait] +impl ChainIndex for FjallPoolCostIndex { + fn name(&self) -> String { + "pool-cost-index".into() + } + + async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + for cert in tx.certs().iter() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::PoolRegistration { operator, cost, .. } => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + let value = bincode::serialize(cost)?; + + self.state.pools.insert(pool_id, *cost); + self.partition.insert(key, value)?; + + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + alonzo::Certificate::PoolRetirement(operator, ..) => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + + self.state.pools.remove(&pool_id); + self.partition.remove(key)?; + + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + + _ => {} + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::PoolRegistration { operator, cost, .. } => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + let value = bincode::serialize(cost)?; + + self.state.pools.insert(pool_id, *cost); + self.partition.insert(key, value)?; + + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + conway::Certificate::PoolRetirement(operator, ..) => { + let pool_id = to_pool_id(operator); + let key = pool_id.as_ref(); + + self.state.pools.remove(&pool_id); + self.partition.remove(key)?; + + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + _ => {} + }, + _ => {} + } + } + Ok(()) + } +} diff --git a/processes/indexer/src/indices/in_memory_pool_cost_index.rs b/processes/indexer/src/indices/in_memory_pool_cost_index.rs new file mode 100644 index 000000000..5dcca4424 --- /dev/null +++ b/processes/indexer/src/indices/in_memory_pool_cost_index.rs @@ -0,0 +1,79 @@ +#![allow(unused)] +use acropolis_codec::map_parameters::to_pool_id; +use acropolis_common::{BlockInfo, Lovelace, PoolId}; +use acropolis_module_custom_indexer::chain_index::ChainIndex; +use anyhow::Result; +use caryatid_sdk::async_trait; +use pallas::ledger::primitives::{alonzo, conway}; +use pallas::ledger::traverse::{MultiEraCert, MultiEraTx}; +use std::collections::BTreeMap; +use tokio::sync::watch; +use tracing::warn; + +#[derive(Clone)] +pub struct InMemoryPoolCostState { + pub pools: BTreeMap, +} + +pub struct InMemoryPoolCostIndex { + state: InMemoryPoolCostState, + sender: watch::Sender, +} + +impl InMemoryPoolCostIndex { + pub fn new(sender: watch::Sender) -> Self { + Self { + state: InMemoryPoolCostState { + pools: BTreeMap::new(), + }, + sender, + } + } +} + +#[async_trait] +impl ChainIndex for InMemoryPoolCostIndex { + fn name(&self) -> String { + "pool-cost-index".into() + } + + async fn handle_onchain_tx(&mut self, _info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { + for cert in tx.certs().iter() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::PoolRegistration { operator, cost, .. } => { + self.state.pools.insert(to_pool_id(operator), *cost); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + alonzo::Certificate::PoolRetirement(operator, ..) => { + self.state.pools.remove(&to_pool_id(operator)); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + + _ => {} + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::PoolRegistration { operator, cost, .. } => { + self.state.pools.insert(to_pool_id(operator), *cost); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + conway::Certificate::PoolRetirement(operator, ..) => { + self.state.pools.remove(&to_pool_id(operator)); + if self.sender.send(self.state.clone()).is_err() { + warn!("Pool cost state receiver dropped"); + } + } + _ => {} + }, + _ => {} + } + } + Ok(()) + } +} diff --git a/processes/indexer/src/indices/mod.rs b/processes/indexer/src/indices/mod.rs new file mode 100644 index 000000000..72ffb5978 --- /dev/null +++ b/processes/indexer/src/indices/mod.rs @@ -0,0 +1,2 @@ +pub mod fjall_pool_cost_index; +pub mod in_memory_pool_cost_index; diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs index 50a3b458b..bd9d2c044 100644 --- a/processes/indexer/src/main.rs +++ b/processes/indexer/src/main.rs @@ -1,15 +1,26 @@ -use acropolis_common::messages::Message; -use acropolis_module_indexer::Indexer; +use acropolis_common::{hash::Hash, messages::Message, Point}; use anyhow::Result; use caryatid_process::Process; +use caryatid_sdk::module_registry::ModuleRegistry; use clap::Parser; use config::{Config, Environment, File}; -use std::sync::Arc; +use std::{collections::BTreeMap, str::FromStr, sync::Arc}; +use tokio::sync::watch; use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_custom_indexer::CustomIndexer; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_peer_network_interface::PeerNetworkInterface; +mod indices; + +#[allow(unused_imports)] +use crate::indices::fjall_pool_cost_index::{FjallPoolCostIndex, FjallPoolCostState}; +#[allow(unused_imports)] +use crate::indices::in_memory_pool_cost_index::{InMemoryPoolCostIndex, InMemoryPoolCostState}; +#[allow(unused_imports)] +use acropolis_module_custom_indexer::cursor_store::{FjallCursorStore, InMemoryCursorStore}; + #[derive(Debug, clap::Parser)] struct Args { #[arg(long, value_name = "PATH", default_value = "indexer.toml")] @@ -18,10 +29,9 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + // Get arguments and config let args = Args::parse(); - tracing_subscriber::fmt().with_env_filter("info").init(); - let config = Arc::new( Config::builder() .add_source(File::with_name(&args.config)) @@ -32,11 +42,57 @@ async fn main() -> Result<()> { let mut process = Process::::create(config).await; + // Core modules to fetch blocks and publish decoded transactions GenesisBootstrapper::register(&mut process); BlockUnpacker::register(&mut process); PeerNetworkInterface::register(&mut process); - Indexer::register(&mut process); + // watch channel to send latest state to consumer on index change + let (sender, receiver) = watch::channel(FjallPoolCostState { + pools: BTreeMap::new(), + }); + + /* Uncomment to test in memory indexer + let (sender, receiver) = watch::channel(InMemoryPoolCostState { + pools: BTreeMap::new(), + }); + */ + + // Example receiver + { + let mut rx = receiver.clone(); + tokio::spawn(async move { + while rx.changed().await.is_ok() { + let snapshot = rx.borrow().clone(); + tracing::info!("New PoolCostIndex state: {:?}", snapshot.pools); + } + }); + } + + // Initialize and register indexer + let shelley_start = Point::Specific { + hash: Hash::from_str("4e9bbbb67e3ae262133d94c3da5bffce7b1127fc436e7433b87668dba34c354a")?, + slot: 16588737, + }; + + // Fjall backed example indexer + let indexer = CustomIndexer::new( + FjallPoolCostIndex::new("fjall-pool-cost-index", sender)?, + FjallCursorStore::new("fjall-cursor-store", shelley_start.clone())?, + shelley_start, + ); + + // In memory example indexer + /* + let indexer = CustomIndexer::new( + InMemoryPoolCostIndex::new(sender), + InMemoryCursorStore::new(shelley_start.clone()), + shelley_start, + ); + */ + + process.register(Arc::new(indexer)); process.run().await?; + Ok(()) } diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index f7cf0f866..8d5cb4152 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -1,6 +1,7 @@ downloads cache upstream-cache +spdd_db # DB files fjall-*/