Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions modules/custom_indexer/src/chain_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,42 @@ use pallas::ledger::traverse::MultiEraTx;

#[async_trait]
pub trait ChainIndex: Send + Sync + 'static {
/// A human-readable identifier for the index.
/// Used for logging, error messages, and cursor store keys.
fn name(&self) -> String;

/// High-level transaction handler.
///
/// Most indexes override this
async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> {
let _ = (info, tx);
Ok(())
}

/// Low-level raw-bytes handler.
///
/// Default behavior:
/// - decode the tx using Pallas
/// - call the high-level handler
///
/// Indexes that want raw bytes override this and bypass decoding entirely.
async fn handle_onchain_tx_bytes(&mut self, info: &BlockInfo, raw_tx: &[u8]) -> Result<()> {
let tx = MultiEraTx::decode(raw_tx)?;
self.handle_onchain_tx(info, &tx).await
}

/// Called when the chain rolls back to a point.
///
/// Implementations must remove or revert any state derived from slots
/// greater than `point`. Failing to do so will corrupt index state.
async fn handle_rollback(&mut self, point: &Point) -> Result<()> {
let _ = point;
Ok(())
}

/// Resets the index to a known chain point.
///
/// Most implementations return `start` unchanged. However, more advanced
/// indexes may choose a different reset point based on internal state.
async fn reset(&mut self, start: &Point) -> Result<Point>;
}
7 changes: 0 additions & 7 deletions modules/custom_indexer/src/custom_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,6 @@ async fn process_tx_responses<F: futures::Future<Output = IndexResponse> + Send>
Ok(IndexResult::Success { entry }) => {
new_tips.insert(name, entry);
}
Ok(IndexResult::DecodeError { entry, reason }) => {
error!(
"Failed to decode tx at slot {} for index '{}': {}",
block_slot, name, reason
);
new_tips.insert(name, entry);
}
Ok(IndexResult::HandleError { entry, reason }) => {
error!(
"Failed to handle tx at slot {} for index '{}': {}",
Expand Down
36 changes: 1 addition & 35 deletions modules/custom_indexer/src/index_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::sync::Arc;
use acropolis_common::{BlockInfo, Point};
use tokio::sync::{mpsc, oneshot};

use pallas::ledger::traverse::MultiEraTx;

use crate::{cursor_store::CursorEntry, IndexWrapper};

pub enum IndexCommand {
Expand All @@ -22,7 +20,6 @@ pub enum IndexCommand {
#[derive(Debug)]
pub enum IndexResult {
Success { entry: CursorEntry },
DecodeError { entry: CursorEntry, reason: String },
HandleError { entry: CursorEntry, reason: String },
Reset { entry: CursorEntry },
Halted,
Expand Down Expand Up @@ -71,20 +68,7 @@ async fn handle_apply_txs(

// Decode the transactions and call handle_onchain_tx for each, halting if decode or the handler return an error
for raw in txs {
let decoded = match MultiEraTx::decode(raw.as_ref()) {
Ok(tx) => tx,
Err(e) => {
wrapper.halted = true;
return IndexResult::DecodeError {
entry: CursorEntry {
tip: wrapper.tip.clone(),
halted: true,
},
reason: e.to_string(),
};
}
};
if let Err(e) = wrapper.index.handle_onchain_tx(&block, &decoded).await {
if let Err(e) = wrapper.index.handle_onchain_tx_bytes(&block, &raw).await {
wrapper.halted = true;
return IndexResult::HandleError {
entry: CursorEntry {
Expand Down Expand Up @@ -305,24 +289,6 @@ mod tests {
}
}

#[tokio::test]
async fn apply_txs_decode_error_sets_halt() {
let mock = MockIndex {
on_tx: None,
..Default::default()
};

let (_indexer, sender) = setup_indexer(mock).await;

match send_apply(&sender, test_block(1), vec![Arc::from([0u8; 1].as_slice())]).await {
IndexResult::DecodeError { entry, reason } => {
assert!(entry.halted);
assert!(!reason.is_empty());
}
other => panic!("Expected DecodeError, got {:?}", other),
}
}

#[tokio::test]
async fn apply_txs_skips_when_halted() {
let mock = MockIndex {
Expand Down