From e56a0bfec7640fd94abe7744165aec038ccfc923 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 9 Dec 2025 18:35:42 +0000 Subject: [PATCH 1/2] feat: add raw-bytes transaction handler to allow indexes to use independent Pallas versions Signed-off-by: William Hankins --- modules/custom_indexer/src/chain_index.rs | 34 +++++++++++++++++++ modules/custom_indexer/src/index_actor.rs | 40 +++++++++++++++-------- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/modules/custom_indexer/src/chain_index.rs b/modules/custom_indexer/src/chain_index.rs index 3bc151d52..474af25e7 100644 --- a/modules/custom_indexer/src/chain_index.rs +++ b/modules/custom_indexer/src/chain_index.rs @@ -5,17 +5,51 @@ use pallas::ledger::traverse::MultiEraTx; #[async_trait] pub trait ChainIndex: Send + Sync + 'static { + /// A human-readable identifier for the index. + /// Used for logging, error messages, and cursor store keys. fn name(&self) -> String; + /// High-level transaction handler. + /// + /// The indexer runtime calls this when `wants_raw_bytes() == false`. + /// Implementors receive a fully decoded `MultiEraTx` using Acropolis’s + /// Pallas dependency. Most indexes should override this unless they + /// need to control decoding themselves. async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { let _ = (info, tx); Ok(()) } + /// Low-level transaction handler that receives raw CBOR bytes. + /// + /// The indexer runtime calls this when `wants_raw_bytes() == true`. + /// Implementors can parse the bytes using their own Pallas versions, + /// bypass decoding in the runtime, or operate directly on the CBOR. + async fn handle_onchain_tx_bytes(&mut self, info: &BlockInfo, raw_tx: &[u8]) -> Result<()> { + let _ = (info, raw_tx); + Ok(()) + } + + /// Called when the chain rolls back to a point. + /// + /// Implementations must remove or revert any state derived from slots + /// greater than `point`. Failing to do so will corrupt index state. async fn handle_rollback(&mut self, point: &Point) -> Result<()> { let _ = point; Ok(()) } + /// Selects between decoded-transaction mode and raw-bytes mode. + /// + /// `false` (default): runtime decodes transactions and calls `handle_onchain_tx`. + /// `true`: runtime skips decoding and calls `handle_onchain_tx_bytes`. + fn wants_raw_bytes(&self) -> bool { + false + } + + /// Resets the index to a known chain point. + /// + /// Most implementations return `start` unchanged. However, more advanced + /// indexes may choose a different reset point based on internal state. async fn reset(&mut self, start: &Point) -> Result; } diff --git a/modules/custom_indexer/src/index_actor.rs b/modules/custom_indexer/src/index_actor.rs index 6f7177d97..fc15ed535 100644 --- a/modules/custom_indexer/src/index_actor.rs +++ b/modules/custom_indexer/src/index_actor.rs @@ -70,12 +70,12 @@ async fn handle_apply_txs( } // Decode the transactions and call handle_onchain_tx for each, halting if decode or the handler return an error + let raw_mode = wrapper.index.wants_raw_bytes(); for raw in txs { - let decoded = match MultiEraTx::decode(raw.as_ref()) { - Ok(tx) => tx, - Err(e) => { + if raw_mode { + if let Err(e) = wrapper.index.handle_onchain_tx_bytes(&block, &raw).await { wrapper.halted = true; - return IndexResult::DecodeError { + return IndexResult::HandleError { entry: CursorEntry { tip: wrapper.tip.clone(), halted: true, @@ -83,16 +83,30 @@ async fn handle_apply_txs( reason: e.to_string(), }; } - }; - if let Err(e) = wrapper.index.handle_onchain_tx(&block, &decoded).await { - wrapper.halted = true; - return IndexResult::HandleError { - entry: CursorEntry { - tip: wrapper.tip.clone(), - halted: true, - }, - reason: e.to_string(), + } else { + let decoded = match MultiEraTx::decode(raw.as_ref()) { + Ok(tx) => tx, + Err(e) => { + wrapper.halted = true; + return IndexResult::DecodeError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), + }; + } }; + if let Err(e) = wrapper.index.handle_onchain_tx(&block, &decoded).await { + wrapper.halted = true; + return IndexResult::HandleError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), + }; + } } } From d823216c9e22bcaf839180819d1df9042f5e99cd Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 9 Dec 2025 19:47:05 +0000 Subject: [PATCH 2/2] refactor: unify tx handling by decoding in default handle_onchain_tx_bytes Signed-off-by: William Hankins --- modules/custom_indexer/src/chain_index.rs | 27 +++------ modules/custom_indexer/src/custom_indexer.rs | 7 --- modules/custom_indexer/src/index_actor.rs | 64 +++----------------- 3 files changed, 17 insertions(+), 81 deletions(-) diff --git a/modules/custom_indexer/src/chain_index.rs b/modules/custom_indexer/src/chain_index.rs index 474af25e7..e86fd83d0 100644 --- a/modules/custom_indexer/src/chain_index.rs +++ b/modules/custom_indexer/src/chain_index.rs @@ -11,23 +11,22 @@ pub trait ChainIndex: Send + Sync + 'static { /// High-level transaction handler. /// - /// The indexer runtime calls this when `wants_raw_bytes() == false`. - /// Implementors receive a fully decoded `MultiEraTx` using Acropolis’s - /// Pallas dependency. Most indexes should override this unless they - /// need to control decoding themselves. + /// Most indexes override this async fn handle_onchain_tx(&mut self, info: &BlockInfo, tx: &MultiEraTx<'_>) -> Result<()> { let _ = (info, tx); Ok(()) } - /// Low-level transaction handler that receives raw CBOR bytes. + /// Low-level raw-bytes handler. /// - /// The indexer runtime calls this when `wants_raw_bytes() == true`. - /// Implementors can parse the bytes using their own Pallas versions, - /// bypass decoding in the runtime, or operate directly on the CBOR. + /// Default behavior: + /// - decode the tx using Pallas + /// - call the high-level handler + /// + /// Indexes that want raw bytes override this and bypass decoding entirely. async fn handle_onchain_tx_bytes(&mut self, info: &BlockInfo, raw_tx: &[u8]) -> Result<()> { - let _ = (info, raw_tx); - Ok(()) + let tx = MultiEraTx::decode(raw_tx)?; + self.handle_onchain_tx(info, &tx).await } /// Called when the chain rolls back to a point. @@ -39,14 +38,6 @@ pub trait ChainIndex: Send + Sync + 'static { Ok(()) } - /// Selects between decoded-transaction mode and raw-bytes mode. - /// - /// `false` (default): runtime decodes transactions and calls `handle_onchain_tx`. - /// `true`: runtime skips decoding and calls `handle_onchain_tx_bytes`. - fn wants_raw_bytes(&self) -> bool { - false - } - /// Resets the index to a known chain point. /// /// Most implementations return `start` unchanged. However, more advanced diff --git a/modules/custom_indexer/src/custom_indexer.rs b/modules/custom_indexer/src/custom_indexer.rs index 80431ceaa..bb5088708 100644 --- a/modules/custom_indexer/src/custom_indexer.rs +++ b/modules/custom_indexer/src/custom_indexer.rs @@ -247,13 +247,6 @@ async fn process_tx_responses + Send> Ok(IndexResult::Success { entry }) => { new_tips.insert(name, entry); } - Ok(IndexResult::DecodeError { entry, reason }) => { - error!( - "Failed to decode tx at slot {} for index '{}': {}", - block_slot, name, reason - ); - new_tips.insert(name, entry); - } Ok(IndexResult::HandleError { entry, reason }) => { error!( "Failed to handle tx at slot {} for index '{}': {}", diff --git a/modules/custom_indexer/src/index_actor.rs b/modules/custom_indexer/src/index_actor.rs index fc15ed535..5e2fb33a9 100644 --- a/modules/custom_indexer/src/index_actor.rs +++ b/modules/custom_indexer/src/index_actor.rs @@ -3,8 +3,6 @@ use std::sync::Arc; use acropolis_common::{BlockInfo, Point}; use tokio::sync::{mpsc, oneshot}; -use pallas::ledger::traverse::MultiEraTx; - use crate::{cursor_store::CursorEntry, IndexWrapper}; pub enum IndexCommand { @@ -22,7 +20,6 @@ pub enum IndexCommand { #[derive(Debug)] pub enum IndexResult { Success { entry: CursorEntry }, - DecodeError { entry: CursorEntry, reason: String }, HandleError { entry: CursorEntry, reason: String }, Reset { entry: CursorEntry }, Halted, @@ -70,43 +67,16 @@ async fn handle_apply_txs( } // Decode the transactions and call handle_onchain_tx for each, halting if decode or the handler return an error - let raw_mode = wrapper.index.wants_raw_bytes(); for raw in txs { - if raw_mode { - if let Err(e) = wrapper.index.handle_onchain_tx_bytes(&block, &raw).await { - wrapper.halted = true; - return IndexResult::HandleError { - entry: CursorEntry { - tip: wrapper.tip.clone(), - halted: true, - }, - reason: e.to_string(), - }; - } - } else { - let decoded = match MultiEraTx::decode(raw.as_ref()) { - Ok(tx) => tx, - Err(e) => { - wrapper.halted = true; - return IndexResult::DecodeError { - entry: CursorEntry { - tip: wrapper.tip.clone(), - halted: true, - }, - reason: e.to_string(), - }; - } + if let Err(e) = wrapper.index.handle_onchain_tx_bytes(&block, &raw).await { + wrapper.halted = true; + return IndexResult::HandleError { + entry: CursorEntry { + tip: wrapper.tip.clone(), + halted: true, + }, + reason: e.to_string(), }; - if let Err(e) = wrapper.index.handle_onchain_tx(&block, &decoded).await { - wrapper.halted = true; - return IndexResult::HandleError { - entry: CursorEntry { - tip: wrapper.tip.clone(), - halted: true, - }, - reason: e.to_string(), - }; - } } } @@ -319,24 +289,6 @@ mod tests { } } - #[tokio::test] - async fn apply_txs_decode_error_sets_halt() { - let mock = MockIndex { - on_tx: None, - ..Default::default() - }; - - let (_indexer, sender) = setup_indexer(mock).await; - - match send_apply(&sender, test_block(1), vec![Arc::from([0u8; 1].as_slice())]).await { - IndexResult::DecodeError { entry, reason } => { - assert!(entry.halted); - assert!(!reason.is_empty()); - } - other => panic!("Expected DecodeError, got {:?}", other), - } - } - #[tokio::test] async fn apply_txs_skips_when_halted() { let mock = MockIndex {