Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 4 additions & 15 deletions common/examples/test_streaming_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use acropolis_common::ledger_state::SPOState;
use acropolis_common::snapshot::protocol_parameters::ProtocolParameters;
use acropolis_common::snapshot::streaming_snapshot::{
AccountsCallback, DRepCallback, DRepRecord, GovernanceProtocolParametersCallback, UtxoCallback,
UtxoEntry,
};
use acropolis_common::snapshot::utxo::UtxoEntry;
use acropolis_common::snapshot::EpochCallback;
use acropolis_common::snapshot::{
AccountState, GovernanceProposal, PoolCallback, ProposalCallback, SnapshotCallbacks,
Expand Down Expand Up @@ -52,12 +52,8 @@ impl UtxoCallback for CountingCallbacks {
if self.sample_utxos.len() < 10 {
if self.sample_utxos.len() < 10 {
eprintln!(
" UTXO #{}: {}:{} → {} ({} lovelace)",
self.utxo_count,
&utxo.tx_hash[..16],
utxo.output_index,
&utxo.address[..utxo.address.len().min(32)],
utxo.value
" UTXO #{}: {} → {:?}",
self.utxo_count, utxo.id, utxo.value
);
}
self.sample_utxos.push(utxo);
Expand Down Expand Up @@ -506,14 +502,7 @@ fn main() {
if !callbacks.sample_utxos.is_empty() {
println!("Sample UTXOs (first 10):");
for (i, utxo) in callbacks.sample_utxos.iter().enumerate() {
println!(
" {}: {}:{} → {} ({} lovelace)",
i + 1,
&utxo.tx_hash[..16],
utxo.output_index,
&utxo.address[..32],
utxo.value
);
println!(" {}: {} → {:?}", i + 1, utxo.id, utxo.value);
}
println!();
}
Expand Down
11 changes: 7 additions & 4 deletions common/src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ impl ByronAddress {
Ok(bs58::encode(buf).into_string())
}

pub fn from_string(s: &str) -> Result<Self> {
let bytes = bs58::decode(s).into_vec()?;
let mut dec = minicbor::Decoder::new(&bytes);

pub fn from_cbor(dec: &mut minicbor::Decoder) -> Result<Self> {
let len = dec.array()?.unwrap_or(0);
if len != 2 {
anyhow::bail!("Invalid Byron address CBOR array length");
Expand All @@ -66,6 +63,12 @@ impl ByronAddress {
Ok(address)
}

pub fn from_string(s: &str) -> Result<Self> {
let bytes = bs58::decode(s).into_vec()?;
let mut dec = minicbor::Decoder::new(&bytes);
Self::from_cbor(&mut dec)
}

pub fn to_bytes_key(&self) -> Result<Vec<u8>> {
let crc = self.compute_crc32();

Expand Down
9 changes: 9 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,21 @@ pub struct AccountsBootstrapMessage {
pub bootstrap_snapshots: Option<SnapshotsContainer>,
}

/// UTxO bootstrap message containing partial UTxO state
/// All data is in internal format, ready for direct use by the state module
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UTxOPartialState {
/// UTxOs
pub utxos: Vec<(UTxOIdentifier, UTXOValue)>,
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SnapshotStateMessage {
SPOState(SPOState),
EpochState(EpochBootstrapMessage),
AccountsState(AccountsBootstrapMessage),
UTxOPartialState(UTxOPartialState),
DRepState(DRepBootstrapMessage),
}

Expand Down
3 changes: 2 additions & 1 deletion common/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ pub mod mark_set_go;
mod parser;
pub mod protocol_parameters;
pub mod streaming_snapshot;
pub mod utxo;
pub use error::SnapshotError;

pub use parser::{compute_sha256, parse_manifest, validate_era, validate_integrity};

pub use streaming_snapshot::{
AccountState, AccountsBootstrapData, AccountsCallback, Anchor, DRepCallback, DRepInfo,
EpochCallback, GovernanceProposal, PoolCallback, ProposalCallback, SnapshotCallbacks,
SnapshotMetadata, StakeAddressState, StreamingSnapshotParser, UtxoCallback, UtxoEntry,
SnapshotMetadata, StakeAddressState, StreamingSnapshotParser, UtxoCallback,
};

pub use mark_set_go::{RawSnapshot, RawSnapshotsContainer, SnapshotsCallback, VMap};
195 changes: 17 additions & 178 deletions common/src/snapshot/streaming_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use std::net::{Ipv4Addr, Ipv6Addr};
use tracing::info;

use crate::epoch_snapshot::SnapshotsContainer;
pub use crate::hash::Hash;
use crate::hash::Hash;
use crate::ledger_state::SPOState;
use crate::snapshot::protocol_parameters::ProtocolParameters;
use crate::snapshot::utxo::{SnapshotUTxO, UtxoEntry};
pub use crate::stake_addresses::{AccountState, StakeAddressState};
pub use crate::{
Constitution, DRepChoice, DRepCredential, DRepRecord, EpochBootstrapData, Lovelace,
Expand Down Expand Up @@ -512,27 +513,6 @@ impl<'b, C> minicbor::Decode<'b, C> for DRepState {
}
}

// -----------------------------------------------------------------------------
// Data Structures (based on OpenAPI schema)
// -----------------------------------------------------------------------------

/// UTXO entry with transaction hash, index, address, and value
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UtxoEntry {
/// Transaction hash (hex-encoded)
pub tx_hash: String,
/// Output index
pub output_index: u64,
/// Hex encoded Cardano addresses
pub address: String,
/// Lovelace amount
pub value: u64,
/// Optional inline datum (hex-encoded CBOR)
pub datum: Option<String>,
/// Optional script reference (hex-encoded CBOR)
pub script_ref: Option<String>,
}

// -----------------------------------------------------------------------------
// Ledger types for DState parsing
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1596,23 +1576,8 @@ impl StreamingSnapshotParser {
fn parse_single_utxo(decoder: &mut Decoder) -> Result<UtxoEntry> {
// Parse key: TransactionInput (array [tx_hash, output_index])
decoder.array().context("Failed to parse TxIn array")?;

let tx_hash_bytes = decoder.bytes().context("Failed to parse tx_hash")?;
let output_index = decoder.u64().context("Failed to parse output_index")?;
let tx_hash = hex::encode(tx_hash_bytes);

// Parse value: TransactionOutput
let (address, value) = Self::parse_transaction_output(decoder)
.context("Failed to parse transaction output")?;

Ok(UtxoEntry {
tx_hash,
output_index,
address,
value,
datum: None, // TODO: Extract from TxOut
script_ref: None, // TODO: Extract from TxOut
})
let utxo: SnapshotUTxO = decoder.decode().context("Failed to parse UTxO")?;
Ok(utxo.0)
}

/// VState = [dreps_map, committee_state, dormant_epoch]
Expand Down Expand Up @@ -1743,138 +1708,6 @@ impl StreamingSnapshotParser {
})
}

/// Stream UTXOs with per-entry callback
///
/// Parse a single TxOut from the CBOR decoder
fn parse_transaction_output(dec: &mut Decoder) -> Result<(String, u64)> {
// TxOut is typically an array [address, value, ...]
// or a map for Conway with optional fields

// Try array format first (most common)
match dec.datatype().context("Failed to read TxOut datatype")? {
Type::Array | Type::ArrayIndef => {
let arr_len = dec.array().context("Failed to parse TxOut array")?;
if arr_len == Some(0) {
return Err(anyhow!("empty TxOut array"));
}

// Element 0: Address (bytes)
let address_bytes = dec.bytes().context("Failed to parse address bytes")?;
let hex_address = hex::encode(address_bytes);

// Element 1: Value (coin or map)
let value = match dec.datatype().context("Failed to read value datatype")? {
Type::U8 | Type::U16 | Type::U32 | Type::U64 => {
// Simple ADA-only value
dec.u64().context("Failed to parse u64 value")?
}
Type::Array | Type::ArrayIndef => {
// Multi-asset: [coin, assets_map]
dec.array().context("Failed to parse value array")?;
let coin = dec.u64().context("Failed to parse coin amount")?;
// Skip the assets map
dec.skip().context("Failed to skip assets map")?;
coin
}
_ => {
return Err(anyhow!("unexpected value type"));
}
};

// Skip remaining fields (datum, script_ref)
if let Some(len) = arr_len {
for _ in 2..len {
dec.skip().context("Failed to skip TxOut field")?;
}
}

Ok((hex_address, value))
}
Type::Map | Type::MapIndef => {
// Map format (Conway with optional fields)
// Map keys: 0=address, 1=value, 2=datum, 3=script_ref
let map_len = dec.map().context("Failed to parse TxOut map")?;

let mut address = String::new();
let mut value = 0u64;
let mut found_address = false;
let mut found_value = false;

let entries = map_len.unwrap_or(4); // Assume max 4 entries if indefinite
for _ in 0..entries {
// Check for break in indefinite map
if map_len.is_none() && matches!(dec.datatype(), Ok(Type::Break)) {
dec.skip().ok(); // consume break
break;
}

// Read key
let key = match dec.u32() {
Ok(k) => k,
Err(_) => {
// Skip both key and value if key is not u32
dec.skip().ok();
dec.skip().ok();
continue;
}
};

// Read value based on key
match key {
0 => {
// Address
if let Ok(addr_bytes) = dec.bytes() {
address = hex::encode(addr_bytes);
found_address = true;
} else {
dec.skip().ok();
}
}
1 => {
// Value (coin or multi-asset)
match dec.datatype() {
Ok(Type::U8) | Ok(Type::U16) | Ok(Type::U32) | Ok(Type::U64) => {
if let Ok(coin) = dec.u64() {
value = coin;
found_value = true;
} else {
dec.skip().ok();
}
}
Ok(Type::Array) | Ok(Type::ArrayIndef) => {
// Multi-asset: [coin, assets_map]
if dec.array().is_ok() {
if let Ok(coin) = dec.u64() {
value = coin;
found_value = true;
}
dec.skip().ok(); // skip assets map
} else {
dec.skip().ok();
}
}
_ => {
dec.skip().ok();
}
}
}
_ => {
// datum (2), script_ref (3), or unknown - skip
dec.skip().ok();
}
}
}

if found_address && found_value {
Ok((address, value))
} else {
Err(anyhow!("map-based TxOut missing required fields"))
}
}
_ => Err(anyhow!("unexpected TxOut type")),
}
}

/// Parse snapshots using hybrid approach with memory-based parsing
/// Uses snapshot.rs functions to parse mark/set/go snapshots from buffer
/// We expect the following structure:
Expand Down Expand Up @@ -2041,6 +1874,8 @@ impl SnapshotsCallback for CollectingCallbacks {

#[cfg(test)]
mod tests {
use crate::{Address, NativeAssets, TxHash, UTXOValue, UTxOIdentifier, Value};

use super::*;

#[test]
Expand Down Expand Up @@ -2071,16 +1906,20 @@ mod tests {
// Test UTXO callback
callbacks
.on_utxo(UtxoEntry {
tx_hash: "abc123".to_string(),
output_index: 0,
address: "addr1...".to_string(),
value: 5000000,
datum: None,
script_ref: None,
id: UTxOIdentifier::new(TxHash::new(<[u8; 32]>::default()), 0),
value: UTXOValue {
address: Address::None,
value: Value {
lovelace: 5000000,
assets: NativeAssets::default(),
},
datum: None,
reference_script: None,
},
})
.unwrap();

assert_eq!(callbacks.utxos.len(), 1);
assert_eq!(callbacks.utxos[0].value, 5000000);
assert_eq!(callbacks.utxos[0].value.value.lovelace, 5000000);
}
}
Loading