diff --git a/Cargo.lock b/Cargo.lock index f9e59df57..e564abca3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,7 +240,6 @@ dependencies = [ "caryatid_sdk", "config", "serde", - "serde_with 3.16.1", "tokio", "tracing", ] diff --git a/common/examples/test_streaming_parser.rs b/common/examples/test_streaming_parser.rs index 7f98d96a9..c5cef744b 100644 --- a/common/examples/test_streaming_parser.rs +++ b/common/examples/test_streaming_parser.rs @@ -7,8 +7,8 @@ use acropolis_common::ledger_state::SPOState; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; use acropolis_common::snapshot::streaming_snapshot::{ AccountsCallback, DRepCallback, DRepRecord, GovernanceProtocolParametersCallback, UtxoCallback, - UtxoEntry, }; +use acropolis_common::snapshot::utxo::UtxoEntry; use acropolis_common::snapshot::EpochCallback; use acropolis_common::snapshot::{ AccountState, GovernanceProposal, PoolCallback, ProposalCallback, SnapshotCallbacks, @@ -52,12 +52,8 @@ impl UtxoCallback for CountingCallbacks { if self.sample_utxos.len() < 10 { if self.sample_utxos.len() < 10 { eprintln!( - " UTXO #{}: {}:{} → {} ({} lovelace)", - self.utxo_count, - &utxo.tx_hash[..16], - utxo.output_index, - &utxo.address[..utxo.address.len().min(32)], - utxo.value + " UTXO #{}: {} → {:?}", + self.utxo_count, utxo.id, utxo.value ); } self.sample_utxos.push(utxo); @@ -506,14 +502,7 @@ fn main() { if !callbacks.sample_utxos.is_empty() { println!("Sample UTXOs (first 10):"); for (i, utxo) in callbacks.sample_utxos.iter().enumerate() { - println!( - " {}: {}:{} → {} ({} lovelace)", - i + 1, - &utxo.tx_hash[..16], - utxo.output_index, - &utxo.address[..32], - utxo.value - ); + println!(" {}: {} → {:?}", i + 1, utxo.id, utxo.value); } println!(); } diff --git a/common/src/address.rs b/common/src/address.rs index f770d2c1a..638c17da3 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -39,10 +39,7 @@ impl ByronAddress { Ok(bs58::encode(buf).into_string()) } - pub fn from_string(s: &str) -> Result { - let bytes = bs58::decode(s).into_vec()?; - let mut dec = minicbor::Decoder::new(&bytes); - + pub fn from_cbor(dec: &mut minicbor::Decoder) -> Result { let len = dec.array()?.unwrap_or(0); if len != 2 { anyhow::bail!("Invalid Byron address CBOR array length"); @@ -66,6 +63,12 @@ impl ByronAddress { Ok(address) } + pub fn from_string(s: &str) -> Result { + let bytes = bs58::decode(s).into_vec()?; + let mut dec = minicbor::Decoder::new(&bytes); + Self::from_cbor(&mut dec) + } + pub fn to_bytes_key(&self) -> Result> { let crc = self.compute_crc32(); diff --git a/common/src/messages.rs b/common/src/messages.rs index 959b6cb68..aef0d1e32 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -436,12 +436,21 @@ pub struct AccountsBootstrapMessage { pub bootstrap_snapshots: Option, } +/// UTxO bootstrap message containing partial UTxO state +/// All data is in internal format, ready for direct use by the state module +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct UTxOPartialState { + /// UTxOs + pub utxos: Vec<(UTxOIdentifier, UTXOValue)>, +} + #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum SnapshotStateMessage { SPOState(SPOState), EpochState(EpochBootstrapMessage), AccountsState(AccountsBootstrapMessage), + UTxOPartialState(UTxOPartialState), DRepState(DRepBootstrapMessage), } diff --git a/common/src/snapshot/mod.rs b/common/src/snapshot/mod.rs index cfd52ffd3..2bcedf84b 100644 --- a/common/src/snapshot/mod.rs +++ b/common/src/snapshot/mod.rs @@ -16,6 +16,7 @@ pub mod mark_set_go; mod parser; pub mod protocol_parameters; pub mod streaming_snapshot; +pub mod utxo; pub use error::SnapshotError; pub use parser::{compute_sha256, parse_manifest, validate_era, validate_integrity}; @@ -23,7 +24,7 @@ pub use parser::{compute_sha256, parse_manifest, validate_era, validate_integrit pub use streaming_snapshot::{ AccountState, AccountsBootstrapData, AccountsCallback, Anchor, DRepCallback, DRepInfo, EpochCallback, GovernanceProposal, PoolCallback, ProposalCallback, SnapshotCallbacks, - SnapshotMetadata, StakeAddressState, StreamingSnapshotParser, UtxoCallback, UtxoEntry, + SnapshotMetadata, StakeAddressState, StreamingSnapshotParser, UtxoCallback, }; pub use mark_set_go::{RawSnapshot, RawSnapshotsContainer, SnapshotsCallback, VMap}; diff --git a/common/src/snapshot/streaming_snapshot.rs b/common/src/snapshot/streaming_snapshot.rs index 4d4ec4815..0157ee414 100644 --- a/common/src/snapshot/streaming_snapshot.rs +++ b/common/src/snapshot/streaming_snapshot.rs @@ -31,9 +31,10 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use tracing::info; use crate::epoch_snapshot::SnapshotsContainer; -pub use crate::hash::Hash; +use crate::hash::Hash; use crate::ledger_state::SPOState; use crate::snapshot::protocol_parameters::ProtocolParameters; +use crate::snapshot::utxo::{SnapshotUTxO, UtxoEntry}; pub use crate::stake_addresses::{AccountState, StakeAddressState}; pub use crate::{ Constitution, DRepChoice, DRepCredential, DRepRecord, EpochBootstrapData, Lovelace, @@ -512,27 +513,6 @@ impl<'b, C> minicbor::Decode<'b, C> for DRepState { } } -// ----------------------------------------------------------------------------- -// Data Structures (based on OpenAPI schema) -// ----------------------------------------------------------------------------- - -/// UTXO entry with transaction hash, index, address, and value -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct UtxoEntry { - /// Transaction hash (hex-encoded) - pub tx_hash: String, - /// Output index - pub output_index: u64, - /// Hex encoded Cardano addresses - pub address: String, - /// Lovelace amount - pub value: u64, - /// Optional inline datum (hex-encoded CBOR) - pub datum: Option, - /// Optional script reference (hex-encoded CBOR) - pub script_ref: Option, -} - // ----------------------------------------------------------------------------- // Ledger types for DState parsing // ----------------------------------------------------------------------------- @@ -1596,23 +1576,8 @@ impl StreamingSnapshotParser { fn parse_single_utxo(decoder: &mut Decoder) -> Result { // Parse key: TransactionInput (array [tx_hash, output_index]) decoder.array().context("Failed to parse TxIn array")?; - - let tx_hash_bytes = decoder.bytes().context("Failed to parse tx_hash")?; - let output_index = decoder.u64().context("Failed to parse output_index")?; - let tx_hash = hex::encode(tx_hash_bytes); - - // Parse value: TransactionOutput - let (address, value) = Self::parse_transaction_output(decoder) - .context("Failed to parse transaction output")?; - - Ok(UtxoEntry { - tx_hash, - output_index, - address, - value, - datum: None, // TODO: Extract from TxOut - script_ref: None, // TODO: Extract from TxOut - }) + let utxo: SnapshotUTxO = decoder.decode().context("Failed to parse UTxO")?; + Ok(utxo.0) } /// VState = [dreps_map, committee_state, dormant_epoch] @@ -1743,138 +1708,6 @@ impl StreamingSnapshotParser { }) } - /// Stream UTXOs with per-entry callback - /// - /// Parse a single TxOut from the CBOR decoder - fn parse_transaction_output(dec: &mut Decoder) -> Result<(String, u64)> { - // TxOut is typically an array [address, value, ...] - // or a map for Conway with optional fields - - // Try array format first (most common) - match dec.datatype().context("Failed to read TxOut datatype")? { - Type::Array | Type::ArrayIndef => { - let arr_len = dec.array().context("Failed to parse TxOut array")?; - if arr_len == Some(0) { - return Err(anyhow!("empty TxOut array")); - } - - // Element 0: Address (bytes) - let address_bytes = dec.bytes().context("Failed to parse address bytes")?; - let hex_address = hex::encode(address_bytes); - - // Element 1: Value (coin or map) - let value = match dec.datatype().context("Failed to read value datatype")? { - Type::U8 | Type::U16 | Type::U32 | Type::U64 => { - // Simple ADA-only value - dec.u64().context("Failed to parse u64 value")? - } - Type::Array | Type::ArrayIndef => { - // Multi-asset: [coin, assets_map] - dec.array().context("Failed to parse value array")?; - let coin = dec.u64().context("Failed to parse coin amount")?; - // Skip the assets map - dec.skip().context("Failed to skip assets map")?; - coin - } - _ => { - return Err(anyhow!("unexpected value type")); - } - }; - - // Skip remaining fields (datum, script_ref) - if let Some(len) = arr_len { - for _ in 2..len { - dec.skip().context("Failed to skip TxOut field")?; - } - } - - Ok((hex_address, value)) - } - Type::Map | Type::MapIndef => { - // Map format (Conway with optional fields) - // Map keys: 0=address, 1=value, 2=datum, 3=script_ref - let map_len = dec.map().context("Failed to parse TxOut map")?; - - let mut address = String::new(); - let mut value = 0u64; - let mut found_address = false; - let mut found_value = false; - - let entries = map_len.unwrap_or(4); // Assume max 4 entries if indefinite - for _ in 0..entries { - // Check for break in indefinite map - if map_len.is_none() && matches!(dec.datatype(), Ok(Type::Break)) { - dec.skip().ok(); // consume break - break; - } - - // Read key - let key = match dec.u32() { - Ok(k) => k, - Err(_) => { - // Skip both key and value if key is not u32 - dec.skip().ok(); - dec.skip().ok(); - continue; - } - }; - - // Read value based on key - match key { - 0 => { - // Address - if let Ok(addr_bytes) = dec.bytes() { - address = hex::encode(addr_bytes); - found_address = true; - } else { - dec.skip().ok(); - } - } - 1 => { - // Value (coin or multi-asset) - match dec.datatype() { - Ok(Type::U8) | Ok(Type::U16) | Ok(Type::U32) | Ok(Type::U64) => { - if let Ok(coin) = dec.u64() { - value = coin; - found_value = true; - } else { - dec.skip().ok(); - } - } - Ok(Type::Array) | Ok(Type::ArrayIndef) => { - // Multi-asset: [coin, assets_map] - if dec.array().is_ok() { - if let Ok(coin) = dec.u64() { - value = coin; - found_value = true; - } - dec.skip().ok(); // skip assets map - } else { - dec.skip().ok(); - } - } - _ => { - dec.skip().ok(); - } - } - } - _ => { - // datum (2), script_ref (3), or unknown - skip - dec.skip().ok(); - } - } - } - - if found_address && found_value { - Ok((address, value)) - } else { - Err(anyhow!("map-based TxOut missing required fields")) - } - } - _ => Err(anyhow!("unexpected TxOut type")), - } - } - /// Parse snapshots using hybrid approach with memory-based parsing /// Uses snapshot.rs functions to parse mark/set/go snapshots from buffer /// We expect the following structure: @@ -2041,6 +1874,8 @@ impl SnapshotsCallback for CollectingCallbacks { #[cfg(test)] mod tests { + use crate::{Address, NativeAssets, TxHash, UTXOValue, UTxOIdentifier, Value}; + use super::*; #[test] @@ -2071,16 +1906,20 @@ mod tests { // Test UTXO callback callbacks .on_utxo(UtxoEntry { - tx_hash: "abc123".to_string(), - output_index: 0, - address: "addr1...".to_string(), - value: 5000000, - datum: None, - script_ref: None, + id: UTxOIdentifier::new(TxHash::new(<[u8; 32]>::default()), 0), + value: UTXOValue { + address: Address::None, + value: Value { + lovelace: 5000000, + assets: NativeAssets::default(), + }, + datum: None, + reference_script: None, + }, }) .unwrap(); assert_eq!(callbacks.utxos.len(), 1); - assert_eq!(callbacks.utxos[0].value, 5000000); + assert_eq!(callbacks.utxos[0].value.value.lovelace, 5000000); } } diff --git a/common/src/snapshot/utxo.rs b/common/src/snapshot/utxo.rs new file mode 100644 index 000000000..7f60d71b4 --- /dev/null +++ b/common/src/snapshot/utxo.rs @@ -0,0 +1,506 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright © 2025, Acropolis team. + +//! UTXO types and CBOR decoding for snapshot parsing. +//! +//! This module handles the UTXO structures from the NewEpochState ledger state. +//! +//! CDDL specification: +//! ```cddl +//! utxo = {* transaction_input => transaction_output } +//! +//! transaction_input = [txin_transaction_id : transaction_id, txin_index : uint .size 2] +//! transaction_id = bytes +//! +//! transaction_output = shelley_transaction_output / babbage_transaction_output +//! shelley_transaction_output = [address, amount : value, ? hash32] +//! babbage_transaction_output = {0 : address, 1 : value, ? 2 : datum_option, ? 3 : script_ref} +//! +//! address = bytes +//! value = coin / [coin, multiasset] +//! coin = uint +//! multiasset = {* policy_id => {* asset_name => a0 } } +//! +//! datum_option = [0, hash32] / [1, data] +//! script_ref = #6.24(bytes .cbor script) +//! ``` + +use minicbor::data::Type; +use minicbor::Decoder; +use serde::{Deserialize, Serialize}; + +use crate::{ + Address, AssetName, ByronAddress, Datum, NativeAsset, NativeAssets, PolicyId, ReferenceScript, + ShelleyAddress, StakeAddress, TxHash, UTXOValue, UTxOIdentifier, Value, +}; + +// ============================================================================= +// Public Types +// ============================================================================= + +/// UTXO entry combining transaction input reference and output value. +/// +/// This is the primary type exposed to consumers of the snapshot parser. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UtxoEntry { + /// UTxO identifier (transaction hash + output index) + pub id: UTxOIdentifier, + /// UTxO value (address, lovelace, assets, datum, script_ref) + pub value: UTXOValue, +} + +// ============================================================================= +// CBOR Decoding Wrappers +// ============================================================================= +// +// These wrapper types provide minicbor::Decode implementations for parsing +// the snapshot CBOR format. They wrap the public types from crate::types. +// +// The wrappers are crate-private since consumers should use UtxoEntry directly. + +/// Wrapper for decoding a complete UTXO entry (input + output pair) +pub(crate) struct SnapshotUTxO(pub UtxoEntry); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotUTxO { + fn decode(d: &mut Decoder<'b>, _: &mut C) -> Result { + let id: SnapshotUTxOIdentifier = d.decode()?; + let value: SnapshotUTXOValue = d.decode()?; + Ok(Self(UtxoEntry { + id: id.0, + value: value.0, + })) + } +} + +// ============================================================================= +// Transaction Input Decoding +// ============================================================================= + +/// Wrapper for decoding transaction input (TxIn) +/// CDDL: transaction_input = [txin_transaction_id, txin_index] +struct SnapshotUTxOIdentifier(pub UTxOIdentifier); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotUTxOIdentifier { + fn decode(d: &mut Decoder<'b>, _: &mut C) -> Result { + let tx_hash = TxHash::try_from(d.bytes()?) + .map_err(|_| minicbor::decode::Error::message("Invalid TxHash (wrong size?)"))?; + let output_index = d.u64()? as u16; + Ok(Self(UTxOIdentifier { + tx_hash, + output_index, + })) + } +} + +// ============================================================================= +// Transaction Output Decoding +// ============================================================================= + +/// Wrapper for decoding transaction output (TxOut) +/// CDDL: transaction_output = shelley_transaction_output / babbage_transaction_output +struct SnapshotUTXOValue(pub UTXOValue); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotUTXOValue { + fn decode(d: &mut Decoder<'b>, _: &mut C) -> Result { + let datatype = d + .datatype() + .map_err(|_| minicbor::decode::Error::message("Failed to read TxOut datatype"))?; + + match datatype { + // Shelley format: [address, value, ? datum_hash] + Type::Array | Type::ArrayIndef => Self::decode_array_format(d), + // Babbage/Conway format: {0: address, 1: value, ? 2: datum, ? 3: script_ref} + Type::Map | Type::MapIndef => Self::decode_map_format(d), + _ => Err(minicbor::decode::Error::message("unexpected TxOut type")), + } + } +} + +impl SnapshotUTXOValue { + /// Decode Shelley-era array format: [address, value, ? datum_hash] + fn decode_array_format(d: &mut Decoder) -> Result { + let arr_len = d + .array() + .map_err(|_| minicbor::decode::Error::message("Failed to parse TxOut array"))?; + + if arr_len == Some(0) { + return Err(minicbor::decode::Error::message("empty TxOut array")); + } + + // Element 0: Address + let address: SnapshotAddress = d.decode()?; + + // Element 1: Value + let value: SnapshotValue = d.decode()?; + + // Element 2 (optional): Datum hash (Shelley style - just the hash, not datum_option) + let datum = if arr_len.map(|l| l > 2).unwrap_or(true) { + match d.datatype() { + Ok(Type::Bytes) => { + let hash_bytes = d.bytes()?; + if hash_bytes.len() == 32 { + Some(Datum::Hash(hash_bytes.to_vec())) + } else { + None + } + } + Ok(Type::Break) => None, + _ => { + d.skip().ok(); + None + } + } + } else { + None + }; + + // Skip remaining fields + if let Some(len) = arr_len { + for _ in 3..len { + d.skip() + .map_err(|_| minicbor::decode::Error::message("Failed to skip TxOut field"))?; + } + } + + Ok(Self(UTXOValue { + address: address.0, + value: value.0, + datum, + reference_script: None, + })) + } + + /// Decode Babbage/Conway-era map format: {0: address, 1: value, ? 2: datum, ? 3: script_ref} + fn decode_map_format(d: &mut Decoder) -> Result { + let map_len = + d.map().map_err(|_| minicbor::decode::Error::message("Failed to parse TxOut map"))?; + + let mut address: Option
= None; + let mut value: Option = None; + let mut datum: Option = None; + let mut reference_script: Option = None; + + let entries = map_len.unwrap_or(4); + for _ in 0..entries { + // Check for break in indefinite map + if map_len.is_none() && matches!(d.datatype(), Ok(Type::Break)) { + d.skip().ok(); + break; + } + + let key = match d.u32() { + Ok(k) => k, + Err(_) => { + d.skip().ok(); + d.skip().ok(); + continue; + } + }; + + match key { + 0 => address = Some(d.decode::()?.0), + 1 => value = Some(d.decode::()?.0), + 2 => datum = decode_datum_option(d)?, + 3 => reference_script = decode_script_ref(d)?, + _ => { + d.skip().ok(); + } + } + } + + match (address, value) { + (Some(address), Some(value)) => Ok(Self(UTXOValue { + address, + value, + datum, + reference_script, + })), + _ => Err(minicbor::decode::Error::message( + "map-based TxOut missing required fields", + )), + } + } +} + +// ============================================================================= +// Datum Decoding +// ============================================================================= + +/// Decode datum_option: [0, hash32] / [1, data] +fn decode_datum_option(d: &mut Decoder) -> Result, minicbor::decode::Error> { + d.array()?; + let variant = d.u8()?; + + match variant { + 0 => { + // Datum hash: [0, hash32] + let hash_bytes = d.bytes()?; + Ok(Some(Datum::Hash(hash_bytes.to_vec()))) + } + 1 => { + // Inline datum: [1, #6.24(bytes)] + // The datum may be wrapped in CBOR tag 24 (encoded CBOR) + if matches!(d.datatype(), Ok(Type::Tag)) { + let tag = d.tag()?; + if tag.as_u64() == 24 { + let datum_bytes = d.bytes()?.to_vec(); + return Ok(Some(Datum::Inline(datum_bytes))); + } + } + // Not tagged, read raw bytes or skip + match d.datatype() { + Ok(Type::Bytes) => { + let datum_bytes = d.bytes()?.to_vec(); + Ok(Some(Datum::Inline(datum_bytes))) + } + _ => { + // Complex inline datum - capture the CBOR + // For now, skip it + d.skip()?; + Ok(None) + } + } + } + _ => { + d.skip()?; + Ok(None) + } + } +} + +// ============================================================================= +// Script Reference Decoding +// ============================================================================= + +/// Decode script_ref: #6.24(bytes .cbor script) +/// Script format: [script_type, script_bytes] +/// script_type: 0 = Native, 1 = PlutusV1, 2 = PlutusV2, 3 = PlutusV3 +fn decode_script_ref(d: &mut Decoder) -> Result, minicbor::decode::Error> { + // Script ref is wrapped in CBOR tag 24 (encoded CBOR) + if !matches!(d.datatype(), Ok(Type::Tag)) { + d.skip()?; + return Ok(None); + } + + let tag = d.tag()?; + if tag.as_u64() != 24 { + d.skip()?; + return Ok(None); + } + + // The content is CBOR-encoded bytes containing [script_type, script_bytes] + let script_cbor = d.bytes()?; + let mut script_decoder = Decoder::new(script_cbor); + + // Parse [script_type, script_bytes] + if script_decoder.array().is_err() { + return Ok(None); + } + + let script_type = match script_decoder.u8() { + Ok(t) => t, + Err(_) => return Ok(None), + }; + + let script_bytes = match script_decoder.bytes() { + Ok(b) => b.to_vec(), + Err(_) => return Ok(None), + }; + + let reference_script = match script_type { + 0 => ReferenceScript::Native(script_bytes), + 1 => ReferenceScript::PlutusV1(script_bytes), + 2 => ReferenceScript::PlutusV2(script_bytes), + 3 => ReferenceScript::PlutusV3(script_bytes), + _ => return Ok(None), + }; + + Ok(Some(reference_script)) +} + +// ============================================================================= +// Address Decoding +// ============================================================================= + +/// Wrapper for decoding addresses from raw bytes +/// Handles Byron, Shelley, and Stake address formats +struct SnapshotAddress(pub Address); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotAddress { + fn decode(d: &mut Decoder<'b>, _: &mut C) -> Result { + let bytes = d + .bytes() + .map_err(|_| minicbor::decode::Error::message("Failed to read address bytes"))?; + + if bytes.is_empty() { + return Err(minicbor::decode::Error::message("Empty utxo address")); + } + + Self::parse_address_bytes(bytes) + } +} + +impl SnapshotAddress { + fn parse_address_bytes(bytes: &[u8]) -> Result { + match bytes[0] { + // Byron addresses start with 0x82 (CBOR array of 2) + 0x82 => Self::decode_byron(bytes), + // Shelley addresses: check header nibble + _ => Self::decode_shelley(bytes), + } + } + + fn decode_byron(bytes: &[u8]) -> Result { + let mut dec = minicbor::Decoder::new(bytes); + let byron = ByronAddress::from_cbor(&mut dec) + .map_err(|_| minicbor::decode::Error::message("Failed to read Byron address"))?; + Ok(Self(Address::Byron(byron))) + } + + fn decode_shelley(bytes: &[u8]) -> Result { + let header_type = (bytes[0] >> 4) & 0x0F; + + match header_type { + // Stake/reward addresses (type 14, 15) + 0b1110 | 0b1111 => { + let stake = StakeAddress::from_binary(bytes).map_err(|_| { + minicbor::decode::Error::message("Failed to read stake address") + })?; + Ok(Self(Address::Stake(stake))) + } + // Base, enterprise, pointer addresses (types 0-7) + _ => { + let shelley = ShelleyAddress::from_bytes_key(bytes).map_err(|_| { + minicbor::decode::Error::message("Failed to read Shelley address") + })?; + Ok(Self(Address::Shelley(shelley))) + } + } + } +} + +// ============================================================================= +// Value Decoding +// ============================================================================= + +/// Wrapper for decoding value (coin or multi-asset) +/// CDDL: value = coin / [coin, multiasset] +struct SnapshotValue(pub Value); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotValue { + fn decode(d: &mut Decoder<'b>, _: &mut C) -> Result { + let datatype = d + .datatype() + .map_err(|_| minicbor::decode::Error::message("Failed to read Value datatype"))?; + + match datatype { + // Simple coin-only value + Type::U8 | Type::U16 | Type::U32 | Type::U64 => { + let lovelace = d + .u64() + .map_err(|_| minicbor::decode::Error::message("Failed to parse coin amount"))?; + Ok(Self(Value { + lovelace, + assets: NativeAssets::default(), + })) + } + // Multi-asset: [coin, multiasset] + Type::Array | Type::ArrayIndef => { + d.array() + .map_err(|_| minicbor::decode::Error::message("Failed to parse value array"))?; + + let lovelace = d + .u64() + .map_err(|_| minicbor::decode::Error::message("Failed to parse coin amount"))?; + + let assets = decode_multiasset(d)?; + + Ok(Self(Value { lovelace, assets })) + } + _ => Err(minicbor::decode::Error::message( + "Unexpected Value datatype", + )), + } + } +} + +// ============================================================================= +// Multi-Asset Decoding +// ============================================================================= + +/// Decode multiasset: {* policy_id => {* asset_name => amount } } +fn decode_multiasset(d: &mut Decoder) -> Result { + let mut assets: NativeAssets = Vec::new(); + + let policy_map_len = d.map()?; + + match policy_map_len { + Some(len) => { + for _ in 0..len { + let (policy_id, policy_assets) = decode_policy_assets(d)?; + assets.push((policy_id, policy_assets)); + } + } + None => { + // Indefinite-length map + while !matches!(d.datatype(), Ok(Type::Break)) { + let (policy_id, policy_assets) = decode_policy_assets(d)?; + assets.push((policy_id, policy_assets)); + } + d.skip()?; // consume break + } + } + + Ok(assets) +} + +/// Decode a single policy's assets: policy_id => {* asset_name => amount } +fn decode_policy_assets( + d: &mut Decoder, +) -> Result<(PolicyId, Vec), minicbor::decode::Error> { + // Decode policy ID (28 bytes) + let policy_bytes = d.bytes()?; + if policy_bytes.len() != 28 { + return Err(minicbor::decode::Error::message(format!( + "invalid policy_id length: expected 28, got {}", + policy_bytes.len() + ))); + } + let policy_id: PolicyId = policy_bytes + .try_into() + .map_err(|_| minicbor::decode::Error::message("Failed to convert policy_id bytes"))?; + + // Decode asset map: {* asset_name => amount } + let mut policy_assets: Vec = Vec::new(); + let asset_map_len = d.map()?; + + match asset_map_len { + Some(len) => { + for _ in 0..len { + let asset = decode_native_asset(d)?; + policy_assets.push(asset); + } + } + None => { + // Indefinite-length map + while !matches!(d.datatype(), Ok(Type::Break)) { + let asset = decode_native_asset(d)?; + policy_assets.push(asset); + } + d.skip()?; // consume break + } + } + + Ok((policy_id, policy_assets)) +} + +/// Decode a single native asset: asset_name => amount +fn decode_native_asset(d: &mut Decoder) -> Result { + let name_bytes = d.bytes()?; + let name = AssetName::new(name_bytes) + .ok_or_else(|| minicbor::decode::Error::message("Asset name too long (max 32 bytes)"))?; + + let amount = d.u64()?; + + Ok(NativeAsset { name, amount }) +} diff --git a/modules/snapshot_bootstrapper/src/publisher.rs b/modules/snapshot_bootstrapper/src/publisher.rs index aa192b55d..fcf6fb7ee 100644 --- a/modules/snapshot_bootstrapper/src/publisher.rs +++ b/modules/snapshot_bootstrapper/src/publisher.rs @@ -2,22 +2,23 @@ use acropolis_common::epoch_snapshot::SnapshotsContainer; use acropolis_common::messages::DRepBootstrapMessage; use acropolis_common::protocol_params::{Nonces, PraosParams}; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; +use acropolis_common::snapshot::utxo::UtxoEntry; use acropolis_common::snapshot::{AccountsCallback, SnapshotsCallback}; use acropolis_common::{ genesis_values::GenesisValues, ledger_state::SPOState, messages::{ AccountsBootstrapMessage, CardanoMessage, EpochBootstrapMessage, Message, SnapshotMessage, - SnapshotStateMessage, + SnapshotStateMessage, UTxOPartialState, }, params::EPOCH_LENGTH, snapshot::streaming_snapshot::{ DRepCallback, DRepRecord, EpochCallback, GovernanceProposal, GovernanceProtocolParametersCallback, PoolCallback, ProposalCallback, SnapshotCallbacks, - SnapshotMetadata, UtxoCallback, UtxoEntry, + SnapshotMetadata, UtxoCallback, }, stake_addresses::AccountState, - BlockInfo, DRepCredential, EpochBootstrapData, + BlockInfo, DRepCredential, EpochBootstrapData, UTXOValue, UTxOIdentifier, }; use anyhow::Result; use caryatid_sdk::Context; @@ -25,6 +26,8 @@ use std::collections::HashMap; use std::sync::Arc; use tracing::info; +const UTXO_BATCH_SIZE: usize = 10_000; + /// External epoch context containing nonces and timing information. /// /// This data comes from bootstrap configuration files (nonces.json, headers/{slot}.{block_header_hash}.cbor, etc) @@ -84,6 +87,8 @@ pub struct SnapshotPublisher { snapshot_topic: String, metadata: Option, utxo_count: u64, + utxo_batch: Vec<(UTxOIdentifier, UTXOValue)>, + utxo_batches_published: u64, pools: SPOState, accounts: Vec, dreps_len: usize, @@ -104,6 +109,8 @@ impl SnapshotPublisher { snapshot_topic, metadata: None, utxo_count: 0, + utxo_batch: Vec::with_capacity(UTXO_BATCH_SIZE), + utxo_batches_published: 0, pools: SPOState::new(), accounts: Vec::new(), dreps_len: 0, @@ -156,17 +163,64 @@ impl SnapshotPublisher { praos_params: Some(PraosParams::mainnet()), } } + + fn complete_batchers(&mut self) { + if !self.utxo_batch.is_empty() { + self.publish_utxo_batch(); + } + } + + fn publish_utxo_batch(&mut self) { + let batch_size = self.utxo_batch.len(); + self.utxo_batches_published += 1; + + if self.utxo_batches_published == 1 { + info!( + "Publishing first UTXO batch with {} UTXOs to topic '{}'", + batch_size, self.snapshot_topic + ); + } else if self.utxo_batches_published.is_multiple_of(100) { + info!( + "Published {} UTXO batches ({} UTXOs total)", + self.utxo_batches_published, self.utxo_count + ); + } + + let message = Arc::new(Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::UTxOPartialState(UTxOPartialState { + utxos: self.utxo_batch.clone(), + }), + ))); + + // Clone what we need for the async task + let context = self.context.clone(); + let snapshot_topic = self.snapshot_topic.clone(); + + // Block on async publish since this callback is synchronous + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async move { + if let Err(e) = context.publish(&snapshot_topic, message).await { + tracing::error!("Failed to publish UTXO batch: {}", e); + } + }) + }); + self.utxo_batch.clear(); + } } impl UtxoCallback for SnapshotPublisher { - fn on_utxo(&mut self, _utxo: UtxoEntry) -> Result<()> { + fn on_utxo(&mut self, utxo: UtxoEntry) -> Result<()> { self.utxo_count += 1; // Log progress every million UTXOs if self.utxo_count.is_multiple_of(1_000_000) { info!("Processed {} UTXOs", self.utxo_count); } - // TODO: Accumulate UTXO data if needed or send in chunks to UTXOState processor + + self.utxo_batch.push((utxo.id, utxo.value)); + if self.utxo_batch.len() >= UTXO_BATCH_SIZE { + self.publish_utxo_batch(); + } Ok(()) } } @@ -234,6 +288,10 @@ impl AccountsCallback for SnapshotPublisher { let context = self.context.clone(); let snapshot_topic = self.snapshot_topic.clone(); + // IMPORTANT: Complete batching senders now before what is to come, to ensure all + // batched data is flushed. See next, more detailed, comment + self.complete_batchers(); + // IMPORTANT: We use block_in_place + block_on to ensure each publish completes // before the callback returns. This guarantees message ordering. // diff --git a/modules/utxo_state/src/utxo_state.rs b/modules/utxo_state/src/utxo_state.rs index c5f398539..ace861887 100644 --- a/modules/utxo_state/src/utxo_state.rs +++ b/modules/utxo_state/src/utxo_state.rs @@ -2,7 +2,10 @@ //! Accepts UTXO events and derives the current ledger state in memory use acropolis_common::{ - messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage}, + messages::{ + CardanoMessage, Message, SnapshotMessage, SnapshotStateMessage, StateQuery, + StateQueryResponse, StateTransitionMessage, + }, queries::utxos::{UTxOStateQuery, UTxOStateQueryResponse, DEFAULT_UTXOS_QUERY_TOPIC}, }; use caryatid_sdk::{module, Context}; @@ -37,6 +40,8 @@ use fake_immutable_utxo_store::FakeImmutableUTXOStore; const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.utxo.deltas"; const DEFAULT_STORE: &str = "memory"; +const DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC: (&str, &str) = + ("snapshot-subscribe-topic", "cardano.snapshot"); /// UTXO state module #[module( @@ -54,6 +59,11 @@ impl UTXOState { config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); info!("Creating subscriber on '{subscribe_topic}'"); + let snapshot_topic = config + .get_string(DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating snapshot subscriber on '{snapshot_topic}'"); + let utxos_query_topic = config .get_string(DEFAULT_UTXOS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_UTXOS_QUERY_TOPIC.1.to_string()); @@ -70,6 +80,7 @@ impl UTXOState { "fake" => Arc::new(FakeImmutableUTXOStore::new(config.clone())), _ => return Err(anyhow!("Unknown store type {store_type}")), }; + let snapshot_store = store.clone(); let mut state = State::new(store); // Create address delta publisher and pass it observations @@ -118,6 +129,60 @@ impl UTXOState { } }); + // Subscribe for snapshot messages + { + let mut subscription = context.subscribe(&snapshot_topic).await?; + let context = context.clone(); + let store = snapshot_store.clone(); + enum SnapshotState { + Preparing, + Started, + } + let mut snapshot_state = SnapshotState::Preparing; + let mut total_utxos_received = 0u64; + let mut batch_count = 0u64; + context.run(async move { + loop { + let Ok((_, message)) = subscription.read().await else { + return; + }; + + match message.as_ref() { + Message::Snapshot(SnapshotMessage::Startup) => { + info!("UTXO state received Snapshot Startup message"); + match snapshot_state { + SnapshotState::Preparing => snapshot_state = SnapshotState::Started, + _ => error!("Snapshot Startup message received but we have already left preparing state"), + } + } + Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::UTxOPartialState(utxo_state), + )) => { + let batch_size = utxo_state.utxos.len(); + batch_count += 1; + total_utxos_received += batch_size as u64; + + if batch_count == 1 { + info!("UTXO state received first UTxO batch with {} UTxOs", batch_size); + } else if batch_count.is_multiple_of(100) { + info!("UTXO state received {} batches, {} total UTxOs so far", batch_count, total_utxos_received); + } + + for (key, value) in &utxo_state.utxos { + if store.add_utxo(*key, value.clone()).await.is_err() { + error!("Failed to add snapshot utxo to state store"); + } + } + } + other => { + info!("UTXO state received other snapshot message: {:?} (total so far: {} UTxOs in {} batches)", + std::mem::discriminant(other), total_utxos_received, batch_count); + } + } + } + }); + } + // Query handler let state_query = state.clone(); context.handle(&utxos_query_topic, move |message| {