diff --git a/common/examples/test_streaming_parser.rs b/common/examples/test_streaming_parser.rs index d5089e58..3b6d5924 100644 --- a/common/examples/test_streaming_parser.rs +++ b/common/examples/test_streaming_parser.rs @@ -2,14 +2,16 @@ // // Usage: cargo run --example test_streaming_parser --release -- +use acropolis_common::ledger_state::SPOState; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; use acropolis_common::snapshot::streaming_snapshot::GovernanceProtocolParametersCallback; use acropolis_common::snapshot::EpochCallback; use acropolis_common::snapshot::{ - AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo, - ProposalCallback, RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata, - SnapshotsCallback, StakeCallback, StreamingSnapshotParser, UtxoCallback, UtxoEntry, + AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, ProposalCallback, + RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata, SnapshotsCallback, StakeCallback, + StreamingSnapshotParser, UtxoCallback, UtxoEntry, }; +use acropolis_common::{NetworkId, PoolRegistration}; use anyhow::Result; use std::env; use std::time::Instant; @@ -24,11 +26,13 @@ struct CountingCallbacks { metadata: Option, utxo_count: u64, pool_count: usize, + future_pool_count: usize, + retiring_pool_count: usize, account_count: usize, drep_count: usize, proposal_count: usize, sample_utxos: Vec, - sample_pools: Vec, + sample_pools: Vec, sample_accounts: Vec, sample_dreps: Vec, sample_proposals: Vec, @@ -63,24 +67,32 @@ impl UtxoCallback for CountingCallbacks { } impl PoolCallback for CountingCallbacks { - fn on_pools(&mut self, pools: Vec) -> Result<()> { - self.pool_count = pools.len(); - eprintln!("Parsed {} stake pools", pools.len()); + fn on_pools(&mut self, pools: SPOState) -> Result<()> { + self.pool_count = pools.pools.len(); + self.future_pool_count = pools.updates.len(); + self.retiring_pool_count = pools.retiring.len(); + eprintln!( + "Parsed {} stake pools (future: {}, retiring: {}))", + pools.pools.len(), + pools.updates.len(), + pools.retiring.len() + ); - // Show first 10 pools - for (i, pool) in pools.iter().take(10).enumerate() { + // Keep first 10 for summary + self.sample_pools = pools.pools.into_iter().take(10).map(|(_, v)| v).collect(); + + // Show sample pools + for (i, pool) in self.sample_pools.clone().iter().enumerate() { eprintln!( - " Pool #{}: {} (pledge: {}, cost: {}, margin: {:.2}%)", + " Pool #{}: {} (pledge: {}, cost: {}, margin: {:?})", i + 1, - pool.pool_id, + pool.operator, pool.pledge, pool.cost, - pool.margin * 100.0 + pool.margin ); } - // Keep first 10 for summary - self.sample_pools = pools.into_iter().take(10).collect(); Ok(()) } } @@ -496,7 +508,7 @@ fn main() { println!("Starting parse..."); let start = Instant::now(); - match parser.parse(&mut callbacks) { + match parser.parse(&mut callbacks, NetworkId::Mainnet) { Ok(()) => { let duration = start.elapsed(); println!("Parse completed successfully in {duration:.2?}"); @@ -571,12 +583,12 @@ fn main() { println!("Sample Pools (first 10):"); for (i, pool) in callbacks.sample_pools.iter().enumerate() { println!( - " {}: {} (pledge: {}, cost: {}, margin: {:.2}%)", + " {}: {} (pledge: {}, cost: {}, margin: {:?})", i + 1, - pool.pool_id, + pool.operator, pool.pledge, pool.cost, - pool.margin * 100.0 + pool.margin ); } println!(); diff --git a/common/src/ledger_state.rs b/common/src/ledger_state.rs index 648c6595..a44849a1 100644 --- a/common/src/ledger_state.rs +++ b/common/src/ledger_state.rs @@ -38,6 +38,22 @@ pub struct SPOState { pub retiring: BTreeMap, } +impl SPOState { + pub fn new() -> Self { + Self { + pools: BTreeMap::new(), + updates: BTreeMap::new(), + retiring: BTreeMap::new(), + } + } + + pub fn extend(&mut self, extension: &Self) { + self.pools.extend(extension.pools.clone()); + self.updates.extend(extension.updates.clone()); + self.retiring.extend(extension.retiring.clone()); + } +} + pub struct DRepState {} pub struct ProposalState {} diff --git a/common/src/snapshot/mark_set_go.rs b/common/src/snapshot/mark_set_go.rs index 400557e1..d75aaaee 100644 --- a/common/src/snapshot/mark_set_go.rs +++ b/common/src/snapshot/mark_set_go.rs @@ -3,20 +3,16 @@ // ================================================================================================ use anyhow::{Context, Error, Result}; -use log::{error, info}; +use log::info; use minicbor::Decoder; use serde::Serialize; -use types::Ratio; - pub use crate::hash::Hash; -use crate::snapshot::pool_params::PoolParams; -use crate::snapshot::streaming_snapshot; +use crate::snapshot::streaming_snapshot::{SnapshotContext, SnapshotPoolRegistration}; pub use crate::stake_addresses::{AccountState, StakeAddressState}; -use crate::PoolId; pub use crate::StakeCredential; -use crate::{address::StakeAddress, types, NetworkId, PoolRegistration}; +use crate::{PoolId, PoolRegistration}; /// VMap representation for CBOR Map types #[derive(Debug, Clone, PartialEq, Serialize)] @@ -91,7 +87,11 @@ pub struct Snapshot { impl Snapshot { /// Parse a single snapshot (Mark, Set, or Go) - pub fn parse_single_snapshot(decoder: &mut Decoder, snapshot_name: &str) -> Result { + pub fn parse_single_snapshot( + decoder: &mut Decoder, + ctx: &mut SnapshotContext, + snapshot_name: &str, + ) -> Result { info!(" {snapshot_name} snapshot - checking data type..."); // Check what type we have - could be array, map, or simple value @@ -119,148 +119,20 @@ impl Snapshot { info!( " {snapshot_name} snapshot - parsing snapshot_pool_registration..." ); + // pool_registration (third element) - let pools: VMap = decoder - .decode() + let pools: VMap = decoder + .decode_with(ctx) .context("Failed to parse snapshot_pool_registration")?; - let registration = VMap( - pools - .0 - .into_iter() - .map(|(pool_id, params)| { - // Convert RewardAccount (Vec) to StakeAddress (arbitralily chosen over ScripHash) - let reward_account = - StakeAddress::from_binary(¶ms.reward_account.0) - .unwrap_or_else(|_| - { - error!("Failed to parse reward account for pool {pool_id}, using default"); - StakeAddress::default() - } - ); - - // Convert Set to Vec - let pool_owners: Vec = params - .owners - .0 - .into_iter() - .map(|keyhash| { - StakeAddress::new( - StakeCredential::AddrKeyHash(keyhash), - NetworkId::Mainnet, // TODO: Make network configurable or get it from parameters - ) - }) - .collect(); - - // Convert Vec to Vec - let relays: Vec = params - .relays - .into_iter() - .map(|relay| match relay { - streaming_snapshot::Relay::SingleHostAddr( - port, - ipv4, - ipv6, - ) => { - let port_opt = match port { - streaming_snapshot::Nullable::Some(p) => { - Some(p as u16) - } - _ => None, - }; - let ipv4_opt = match ipv4 { - streaming_snapshot::Nullable::Some(ip) - if ip.0.len() == 4 => - { - Some(std::net::Ipv4Addr::new( - ip.0[0], ip.0[1], ip.0[2], ip.0[3], - )) - } - _ => None, - }; - let ipv6_opt = match ipv6 { - streaming_snapshot::Nullable::Some(ip) - if ip.0.len() == 16 => - { - let b = &ip.0; - Some(std::net::Ipv6Addr::from([ - b[0], b[1], b[2], b[3], b[4], b[5], - b[6], b[7], b[8], b[9], b[10], b[11], - b[12], b[13], b[14], b[15], - ])) - } - _ => None, - }; - types::Relay::SingleHostAddr( - types::SingleHostAddr { - port: port_opt, - ipv4: ipv4_opt, - ipv6: ipv6_opt, - }, - ) - } - streaming_snapshot::Relay::SingleHostName( - port, - hostname, - ) => { - let port_opt = match port { - streaming_snapshot::Nullable::Some(p) => { - Some(p as u16) - } - _ => None, - }; - types::Relay::SingleHostName( - types::SingleHostName { - port: port_opt, - dns_name: hostname, - }, - ) - } - streaming_snapshot::Relay::MultiHostName(hostname) => { - types::Relay::MultiHostName(types::MultiHostName { - dns_name: hostname, - }) - } - }) - .collect(); - - // Convert Nullable to Option - let pool_metadata = match params.metadata { - streaming_snapshot::Nullable::Some(meta) => { - Some(types::PoolMetadata { - url: meta.url, - hash: meta.hash.to_vec(), - }) - } - _ => None, - }; - - ( - pool_id, - PoolRegistration { - operator: params.id, - vrf_key_hash: params.vrf, - pledge: params.pledge, - cost: params.cost, - margin: Ratio { - numerator: params.margin.numerator, - denominator: params.margin.denominator, - }, - reward_account, - pool_owners, - relays, - pool_metadata, - }, - ) - }) - .collect(), - ); info!(" {snapshot_name} snapshot - parse completed successfully."); Ok(Snapshot { snapshot_stake, snapshot_delegations: delegations, - snapshot_pool_params: registration, + snapshot_pool_params: VMap( + pools.0.into_iter().map(|(k, v)| (k, v.0)).collect(), + ), }) } other_type => { diff --git a/common/src/snapshot/mod.rs b/common/src/snapshot/mod.rs index d835b7f3..b52f1bc9 100644 --- a/common/src/snapshot/mod.rs +++ b/common/src/snapshot/mod.rs @@ -14,7 +14,6 @@ mod decode; mod error; pub mod mark_set_go; mod parser; -pub mod pool_params; pub mod protocol_parameters; pub mod streaming_snapshot; @@ -27,9 +26,9 @@ pub use parser::{compute_sha256, parse_manifest, validate_era, validate_integrit // Re-export streaming snapshot APIs pub use streaming_snapshot::{ AccountState, Anchor, CollectingCallbacks, DRepCallback, DRepInfo, EpochCallback, - GovernanceProposal, PoolCallback, PoolInfo, PoolMetadata, PotBalances, ProposalCallback, Relay, - SnapshotCallbacks, SnapshotMetadata, StakeAddressState, StakeCallback, StreamingSnapshotParser, - UtxoCallback, UtxoEntry, + GovernanceProposal, PoolCallback, PotBalances, ProposalCallback, Relay, SnapshotCallbacks, + SnapshotMetadata, StakeAddressState, StakeCallback, StreamingSnapshotParser, UtxoCallback, + UtxoEntry, }; // Re-export snapshot types diff --git a/common/src/snapshot/pool_params.rs b/common/src/snapshot/pool_params.rs deleted file mode 100644 index ac85acca..00000000 --- a/common/src/snapshot/pool_params.rs +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright 2025 PRAGMA -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use super::streaming_snapshot::{ - cbor, Coin, Nullable, PoolMetadata, Relay, RewardAccount, Set, UnitInterval, -}; -use crate::types::AddrKeyhash; -use crate::{PoolId, VrfKeyHash}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PoolParams { - pub id: PoolId, - pub vrf: VrfKeyHash, - pub pledge: Coin, - pub cost: Coin, - pub margin: UnitInterval, - pub reward_account: RewardAccount, - pub owners: Set, - pub relays: Vec, - pub metadata: Nullable, -} - -impl cbor::encode::Encode for PoolParams { - fn encode( - &self, - e: &mut cbor::Encoder, - ctx: &mut C, - ) -> Result<(), cbor::encode::Error> { - e.array(9)?; - e.encode_with(self.id, ctx)?; - e.encode_with(self.vrf, ctx)?; - e.encode_with(self.pledge, ctx)?; - e.encode_with(self.cost, ctx)?; - e.encode_with(&self.margin, ctx)?; - e.encode_with(&self.reward_account, ctx)?; - e.encode_with(&self.owners, ctx)?; - e.encode_with(&self.relays, ctx)?; - e.encode_with(&self.metadata, ctx)?; - Ok(()) - } -} - -impl<'b, C> cbor::decode::Decode<'b, C> for PoolParams { - fn decode(d: &mut cbor::Decoder<'b>, ctx: &mut C) -> Result { - let _len = d.array()?; - Ok(PoolParams { - id: d.decode_with(ctx)?, - vrf: d.decode_with(ctx)?, - pledge: d.decode_with(ctx)?, - cost: d.decode_with(ctx)?, - margin: d.decode_with(ctx)?, - reward_account: d.decode_with(ctx)?, - owners: d.decode_with(ctx)?, - relays: d.decode_with(ctx)?, - metadata: d.decode_with(ctx)?, - }) - } -} - -// Serialize implementation requires pallas_addresses which is not currently a dependency -// TODO: Add pallas_addresses or implement Bech32 encoding differently -/* -impl serde::Serialize for PoolParams { - fn serialize(&self, serializer: S) -> Result { - use pallas_addresses::Address; - use serde::ser::SerializeStruct; - use std::collections::BTreeMap; - - fn as_lovelace_map(n: u64) -> BTreeMap> { - let mut lovelace = BTreeMap::new(); - lovelace.insert("lovelace".to_string(), n); - let mut ada = BTreeMap::new(); - ada.insert("ada".to_string(), lovelace); - ada - } - - fn as_string_ratio(r: &UnitInterval) -> String { - format!("{}/{}", r.numerator, r.denominator) - } - - fn as_bech32_addr(bytes: &[u8]) -> Result { - Address::from_bytes(bytes).and_then(|addr| addr.to_bech32()) - } - - struct WrapRelay<'a>(&'a Relay); - - impl serde::Serialize for WrapRelay<'_> { - fn serialize(&self, serializer: S) -> Result { - match self.0 { - Relay::SingleHostAddr(port, ipv4, ipv6) => { - let mut s = serializer.serialize_struct("Relay::SingleHostAddr", 4)?; - s.serialize_field("type", "ipAddress")?; - if let Nullable::Some(ipv4) = ipv4 { - s.serialize_field( - "ipv4", - &format!("{}.{}.{}.{}", ipv4[0], ipv4[1], ipv4[2], ipv4[3]), - )?; - } - if let Nullable::Some(ipv6) = ipv6 { - let bytes: [u8; 16] = [ - ipv6[3], ipv6[2], ipv6[1], ipv6[0], // 1st fragment - ipv6[7], ipv6[6], ipv6[5], ipv6[4], // 2nd fragment - ipv6[11], ipv6[10], ipv6[9], ipv6[8], // 3rd fragment - ipv6[15], ipv6[14], ipv6[13], ipv6[12], // 4th fragment - ]; - s.serialize_field( - "ipv6", - &format!("{}", std::net::Ipv6Addr::from(bytes)), - )?; - } - if let Nullable::Some(port) = port { - s.serialize_field("port", port)?; - } - s.end() - } - Relay::SingleHostName(port, hostname) => { - let mut s = serializer.serialize_struct("Relay::SingleHostName", 3)?; - s.serialize_field("type", "hostname")?; - s.serialize_field("hostname", hostname)?; - if let Nullable::Some(port) = port { - s.serialize_field("port", port)?; - } - s.end() - } - Relay::MultiHostName(hostname) => { - let mut s = serializer.serialize_struct("Relay::MultiHostName", 2)?; - s.serialize_field("type", "hostname")?; - s.serialize_field("hostname", hostname)?; - s.end() - } - } - } - } - - let mut s = serializer.serialize_struct("PoolParams", 9)?; - s.serialize_field("id", &hex::encode(self.id))?; - s.serialize_field("vrfVerificationKeyHash", &hex::encode(self.vrf))?; - s.serialize_field("pledge", &as_lovelace_map(self.pledge))?; - s.serialize_field("cost", &as_lovelace_map(self.cost))?; - s.serialize_field("margin", &as_string_ratio(&self.margin))?; - s.serialize_field( - "rewardAccount", - &as_bech32_addr(&self.reward_account).map_err(serde::ser::Error::custom)?, - )?; - s.serialize_field( - "owners", - &self.owners.iter().map(hex::encode).collect::>(), - )?; - s.serialize_field( - "relays", - &self - .relays - .iter() - .map(WrapRelay) - .collect::>>(), - )?; - if let Nullable::Some(metadata) = &self.metadata { - s.serialize_field("metadata", metadata)?; - } - s.end() - } -} -*/ - -// TODO: Fix test module imports after resolving type locations -/* -#[cfg(any(test, feature = "test-utils"))] -pub mod tests { - use super::*; - use crate::{ - Hash, IPv4, IPv6, Nullable, PoolId, Port, RationalNumber, Relay, prop_cbor_roundtrip, - }; - use proptest::{prelude::*, prop_compose}; - - prop_cbor_roundtrip!(PoolParams, any_pool_params()); - - prop_compose! { - /// Generates arbitrary `PoolId` values using random 28-byte arrays. - pub fn any_pool_id()( - bytes in any::<[u8; 28]>(), - ) -> PoolId { - Hash::from(bytes) - } - } - - fn any_nullable_port() -> impl Strategy> { - prop_oneof![ - Just(Nullable::Undefined), - Just(Nullable::Null), - any::().prop_map(Nullable::Some), - ] - } - - fn any_nullable_ipv4() -> impl Strategy> { - prop_oneof![ - Just(Nullable::Undefined), - Just(Nullable::Null), - any::<[u8; 4]>().prop_map(|a| Nullable::Some(Vec::from(a).into())), - ] - } - - fn any_nullable_ipv6() -> impl Strategy> { - prop_oneof![ - Just(Nullable::Undefined), - Just(Nullable::Null), - any::<[u8; 16]>().prop_map(|a| Nullable::Some(Vec::from(a).into())), - ] - } - - prop_compose! { - fn single_host_addr()( - port in any_nullable_port(), - ipv4 in any_nullable_ipv4(), - ipv6 in any_nullable_ipv6() - ) -> Relay { - Relay::SingleHostAddr(port, ipv4, ipv6) - } - } - - prop_compose! { - fn single_host_name()( - port in any_nullable_port(), - dnsname in any::(), - ) -> Relay { - Relay::SingleHostName(port, dnsname) - } - } - - prop_compose! { - fn multi_host_name()( - dnsname in any::(), - ) -> Relay { - Relay::MultiHostName(dnsname) - } - } - - fn any_relay() -> BoxedStrategy { - prop_oneof![single_host_addr(), single_host_name(), multi_host_name(),].boxed() - } - - prop_compose! { - pub fn any_pool_params()( - id in any_pool_id(), - vrf in any::<[u8; 32]>(), - pledge in any::(), - cost in any::(), - margin in 0..100u64, - reward_account in any::<[u8; 28]>(), - owners in any::>(), - relays in proptest::collection::vec(any_relay(), 0..10), - ) -> PoolParams { - PoolParams { - id, - vrf: Hash::new(vrf), - pledge, - cost, - margin: RationalNumber { numerator: margin, denominator: 100 }, - reward_account: [&[0xF0], &reward_account[..]].concat().into(), - owners: owners.into_iter().map(|h| h.into()).collect::>>().into(), - relays, - metadata: Nullable::Null, - } - } - } -} -*/ diff --git a/common/src/snapshot/streaming_snapshot.rs b/common/src/snapshot/streaming_snapshot.rs index e231969e..727ba449 100644 --- a/common/src/snapshot/streaming_snapshot.rs +++ b/common/src/snapshot/streaming_snapshot.rs @@ -27,12 +27,17 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; +use std::net::{Ipv4Addr, Ipv6Addr}; use tracing::info; pub use crate::hash::Hash; +use crate::ledger_state::SPOState; use crate::snapshot::protocol_parameters::ProtocolParameters; pub use crate::stake_addresses::{AccountState, StakeAddressState}; -pub use crate::StakeCredential; +pub use crate::{ + Constitution, EpochBootstrapData, Lovelace, MultiHostName, NetworkId, PoolId, PoolMetadata, + PoolRegistration, Ratio, Relay, SingleHostAddr, SingleHostName, StakeAddress, StakeCredential, +}; // Import snapshot parsing support use super::mark_set_go::{RawSnapshotsContainer, SnapshotsCallback}; @@ -42,7 +47,6 @@ use super::mark_set_go::{RawSnapshotsContainer, SnapshotsCallback}; // ----------------------------------------------------------------------------- pub type Epoch = u64; -pub type Lovelace = u64; /* * This was replaced with the StakeCredential defined in types.rs, but the implementation here is much @@ -176,27 +180,27 @@ impl<'b, C> minicbor::Decode<'b, C> for Anchor { /// Set type (encoded as array, sometimes with CBOR tag 258) #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Set(pub Vec); +pub struct SnapshotSet(pub Vec); -impl Set { +impl SnapshotSet { pub fn iter(&self) -> std::slice::Iter<'_, T> { self.0.iter() } } -impl From> for Set { +impl From> for SnapshotSet { fn from(vec: Vec) -> Self { - Set(vec) + SnapshotSet(vec) } } -impl From> for Vec { - fn from(set: Set) -> Self { +impl From> for Vec { + fn from(set: SnapshotSet) -> Self { set.0 } } -impl<'b, C, T> minicbor::Decode<'b, C> for Set +impl<'b, C, T> minicbor::Decode<'b, C> for SnapshotSet where T: minicbor::Decode<'b, C>, { @@ -207,11 +211,11 @@ where } let vec: Vec = d.decode_with(ctx)?; - Ok(Set(vec)) + Ok(SnapshotSet(vec)) } } -impl minicbor::Encode for Set +impl minicbor::Encode for SnapshotSet where T: minicbor::Encode, { @@ -290,7 +294,7 @@ impl minicbor::Encode for DRep { #[derive(Debug)] pub struct Account { pub rewards_and_deposit: StrictMaybe<(Lovelace, Lovelace)>, - pub pointers: Set<(u64, u64, u64)>, + pub pointers: SnapshotSet<(u64, u64, u64)>, pub pool: StrictMaybe, pub drep: StrictMaybe, } @@ -308,180 +312,121 @@ impl<'b, C> minicbor::Decode<'b, C> for Account { } // ----------------------------------------------------------------------------- -// Type aliases for pool_params compatibility +// Type decoders for snapshot compatibility // ----------------------------------------------------------------------------- pub use crate::types::AddrKeyhash; pub use crate::types::ScriptHash; -use crate::{Constitution, EpochBootstrapData, PoolId}; -/// Alias minicbor as cbor for pool_params module -pub use minicbor as cbor; -/// Coin amount (Lovelace) -pub type Coin = u64; - -/// Reward account (stake address bytes) - wrapper to handle CBOR bytes encoding -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct RewardAccount(pub Vec); - -impl<'b, C> minicbor::Decode<'b, C> for RewardAccount { - fn decode(d: &mut Decoder<'b>, _ctx: &mut C) -> Result { - let bytes = d.bytes()?; - Ok(RewardAccount(bytes.to_vec())) - } +pub struct SnapshotContext { + pub network: NetworkId, } -impl minicbor::Encode for RewardAccount { - fn encode( - &self, - e: &mut minicbor::Encoder, - _ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - e.bytes(&self.0)?; - Ok(()) +impl AsRef for SnapshotContext { + fn as_ref(&self) -> &Self { + self } } -/// Unit interval (rational number for pool margin) -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct UnitInterval { - pub numerator: u64, - pub denominator: u64, -} - -impl<'b, C> minicbor::Decode<'b, C> for UnitInterval { - fn decode(d: &mut Decoder<'b>, _ctx: &mut C) -> Result { - // UnitInterval might be tagged (tag 30 for rational) - if matches!(d.datatype()?, Type::Tag) { - d.tag()?; - } - d.array()?; - let numerator = d.u64()?; - let denominator = d.u64()?; - Ok(UnitInterval { - numerator, - denominator, - }) - } -} - -impl minicbor::Encode for UnitInterval { - fn encode( - &self, - e: &mut minicbor::Encoder, - _ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - e.tag(minicbor::data::Tag::new(30))?; - e.array(2)?; - e.u64(self.numerator)?; - e.u64(self.denominator)?; - Ok(()) - } -} +struct SnapshotOption(pub Option); -/// Nullable type (like Maybe but with explicit null vs undefined) -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Nullable { - Undefined, - Null, - Some(T), -} - -impl<'b, C, T> minicbor::Decode<'b, C> for Nullable +impl<'b, C, T> minicbor::Decode<'b, C> for SnapshotOption where T: minicbor::Decode<'b, C>, { fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { match d.datatype()? { - Type::Null => { - d.skip()?; - Ok(Nullable::Null) - } - Type::Undefined => { + Type::Null | Type::Undefined => { d.skip()?; - Ok(Nullable::Undefined) + Ok(SnapshotOption(None)) } _ => { - let value = T::decode(d, ctx)?; - Ok(Nullable::Some(value)) + let t = T::decode(d, ctx)?; + Ok(SnapshotOption(Some(t))) } } } } -impl minicbor::Encode for Nullable +pub struct SnapshotPoolRegistration(pub PoolRegistration); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotPoolRegistration where - T: minicbor::Encode, + C: AsRef, { - fn encode( - &self, - e: &mut minicbor::Encoder, - ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - match self { - Nullable::Undefined => e.undefined()?.ok(), - Nullable::Null => e.null()?.ok(), - Nullable::Some(v) => v.encode(e, ctx), - } + fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { + let _len = d.array()?; + Ok(Self(PoolRegistration { + operator: d.decode_with(ctx)?, + vrf_key_hash: d.decode_with(ctx)?, + pledge: d.decode_with(ctx)?, + cost: d.decode_with(ctx)?, + margin: (SnapshotRatio::decode(d, ctx)?).0, + reward_account: (SnapshotStakeAddress::decode(d, ctx)?).0, + pool_owners: (SnapshotSet::::decode(d, ctx)?) + .0 + .into_iter() + .map(|a| a.0) + .collect(), + relays: (Vec::::decode(d, ctx)?).into_iter().map(|r| r.0).collect(), + pool_metadata: (SnapshotOption::::decode(d, ctx)?).0.map(|m| m.0), + })) } } -// Network types for pool relays -pub type Port = u32; - -/// IPv4 address (4 bytes, encoded as CBOR bytes) -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct IPv4(pub Vec); +struct SnapshotRatio(pub Ratio); -impl<'b, C> minicbor::Decode<'b, C> for IPv4 { +impl<'b, C> minicbor::Decode<'b, C> for SnapshotRatio { fn decode(d: &mut Decoder<'b>, _ctx: &mut C) -> Result { - let bytes = d.bytes()?; - Ok(IPv4(bytes.to_vec())) + // UnitInterval might be tagged (tag 30 for rational) + if matches!(d.datatype()?, Type::Tag) { + d.tag()?; + } + d.array()?; + let numerator = d.u64()?; + let denominator = d.u64()?; + Ok(Self(Ratio { + numerator, + denominator, + })) } } -impl minicbor::Encode for IPv4 { - fn encode( - &self, - e: &mut minicbor::Encoder, - _ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - e.bytes(&self.0)?; - Ok(()) - } -} +// Network types for pool relays +pub type SnapshotPort = u32; -/// IPv6 address (16 bytes, encoded as CBOR bytes) -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct IPv6(pub Vec); +struct SnapshotStakeAddress(pub StakeAddress); -impl<'b, C> minicbor::Decode<'b, C> for IPv6 { +impl<'b, C> minicbor::Decode<'b, C> for SnapshotStakeAddress { fn decode(d: &mut Decoder<'b>, _ctx: &mut C) -> Result { let bytes = d.bytes()?; - Ok(IPv6(bytes.to_vec())) + let bytes = bytes.to_vec(); + Ok(Self(StakeAddress::from_binary(&bytes).map_err(|e| { + minicbor::decode::Error::message(e.to_string()) + })?)) } } -impl minicbor::Encode for IPv6 { - fn encode( - &self, - e: &mut minicbor::Encoder, - _ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - e.bytes(&self.0)?; - Ok(()) +struct SnapshotStakeAddressFromCred(pub StakeAddress); + +impl<'b, C> minicbor::Decode<'b, C> for SnapshotStakeAddressFromCred +where + C: AsRef, +{ + fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { + let bytes = d.bytes()?; + let bytes = Hash::<28>::try_from(bytes) + .map_err(|e| minicbor::decode::Error::message(e.to_string()))?; + Ok(Self(StakeAddress::new( + StakeCredential::AddrKeyHash(bytes), + ctx.as_ref().network.clone(), + ))) } } -/// Pool relay types (for CBOR encoding/decoding) -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Relay { - SingleHostAddr(Nullable, Nullable, Nullable), - SingleHostName(Nullable, String), - MultiHostName(String), -} +struct SnapshotRelay(pub Relay); -impl<'b, C> minicbor::Decode<'b, C> for Relay { +impl<'b, C> minicbor::Decode<'b, C> for SnapshotRelay { fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { d.array()?; let tag = d.u32()?; @@ -489,85 +434,42 @@ impl<'b, C> minicbor::Decode<'b, C> for Relay { match tag { 0 => { // SingleHostAddr - let port = Nullable::::decode(d, ctx)?; - let ipv4 = Nullable::::decode(d, ctx)?; - let ipv6 = Nullable::::decode(d, ctx)?; - Ok(Relay::SingleHostAddr(port, ipv4, ipv6)) + let port = (Option::::decode(d, ctx)?).map(|p| p as u16); + let ipv4 = Option::::decode(d, ctx)?; + let ipv6 = Option::::decode(d, ctx)?; + Ok(Self(Relay::SingleHostAddr(SingleHostAddr { + port, + ipv4, + ipv6, + }))) } 1 => { // SingleHostName - let port = Nullable::::decode(d, ctx)?; - let hostname = d.str()?.to_string(); - Ok(Relay::SingleHostName(port, hostname)) + let port = (Option::::decode(d, ctx)?).map(|p| p as u16); + let dns_name = d.str()?.to_string(); + Ok(Self(Relay::SingleHostName(SingleHostName { + port, + dns_name, + }))) } 2 => { // MultiHostName - let hostname = d.str()?.to_string(); - Ok(Relay::MultiHostName(hostname)) + let dns_name = d.str()?.to_string(); + Ok(Self(Relay::MultiHostName(MultiHostName { dns_name }))) } _ => Err(minicbor::decode::Error::message("Invalid relay tag")), } } } -impl minicbor::Encode for Relay { - fn encode( - &self, - e: &mut minicbor::Encoder, - ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - match self { - Relay::SingleHostAddr(port, ipv4, ipv6) => { - e.array(4)?; - e.u32(0)?; - port.encode(e, ctx)?; - ipv4.encode(e, ctx)?; - ipv6.encode(e, ctx)?; - Ok(()) - } - Relay::SingleHostName(port, hostname) => { - e.array(3)?; - e.u32(1)?; - port.encode(e, ctx)?; - e.str(hostname)?; - Ok(()) - } - Relay::MultiHostName(hostname) => { - e.array(2)?; - e.u32(2)?; - e.str(hostname)?; - Ok(()) - } - } - } -} - -/// Pool metadata (for CBOR encoding/decoding) -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] -pub struct PoolMetadata { - pub url: String, - pub hash: Hash<32>, -} +struct SnapshotPoolMetadata(pub PoolMetadata); -impl<'b, C> minicbor::Decode<'b, C> for PoolMetadata { +impl<'b, C> minicbor::Decode<'b, C> for SnapshotPoolMetadata { fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { d.array()?; let url = d.str()?.to_string(); - let hash = Hash::<32>::decode(d, ctx)?; - Ok(PoolMetadata { url, hash }) - } -} - -impl minicbor::Encode for PoolMetadata { - fn encode( - &self, - e: &mut minicbor::Encoder, - ctx: &mut C, - ) -> Result<(), minicbor::encode::Error> { - e.array(2)?; - e.str(&self.url)?; - self.hash.encode(e, ctx)?; - Ok(()) + let hash = (Hash::<32>::decode(d, ctx)?).to_vec(); + Ok(SnapshotPoolMetadata(PoolMetadata { url, hash })) } } @@ -581,7 +483,7 @@ pub struct DRepState { pub expiry: Epoch, pub anchor: StrictMaybe, pub deposit: Lovelace, - pub delegators: Set, + pub delegators: SnapshotSet, } impl<'b, C> minicbor::Decode<'b, C> for DRepState { @@ -600,7 +502,7 @@ impl<'b, C> minicbor::Decode<'b, C> for DRepState { if matches!(d.datatype()?, Type::Tag) { d.tag()?; // skip the tag } - let delegators = Set::::decode(d, ctx)?; + let delegators = SnapshotSet::::decode(d, ctx)?; Ok(DRepState { expiry, @@ -662,58 +564,6 @@ impl<'b, C> minicbor::Decode<'b, C> for DRepCredential { // Data Structures (based on OpenAPI schema) // ----------------------------------------------------------------------------- -/// Stake pool information -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PoolInfo { - /// Bech32-encoded pool ID - pub pool_id: String, - /// Hex-encoded VRF key hash - pub vrf_key_hash: String, - /// Pledge amount in Lovelace - pub pledge: u64, - /// Fixed cost in Lovelace - pub cost: u64, - /// Pool margin (0.0 to 1.0) - pub margin: f64, - /// Bech32-encoded reward account - pub reward_account: String, - /// List of pool owner stake addresses - pub pool_owners: Vec, - /// Pool relay information - pub relays: Vec, - /// Pool metadata (URL and hash) - pub pool_metadata: Option, - /// Optional retirement epoch - pub retirement_epoch: Option, -} - -/// Pool relay information (for API/JSON output) -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub enum ApiRelay { - SingleHostAddr { - port: Option, - ipv4: Option, - ipv6: Option, - }, - SingleHostName { - port: Option, - dns_name: String, - }, - MultiHostName { - dns_name: String, - }, -} - -/// Pool metadata anchor (for API/JSON output) -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ApiPoolMetadata { - /// IPFS or HTTP(S) URL - pub url: String, - /// Hex-encoded hash - pub hash: String, -} - /// DRep information #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DRepInfo { @@ -796,7 +646,7 @@ pub trait EpochCallback { /// Callback invoked with bulk stake pool data pub trait PoolCallback { /// Called once with all pool data - fn on_pools(&mut self, pools: Vec) -> Result<()>; + fn on_pools(&mut self, spo_state: SPOState) -> Result<()>; } /// Callback invoked with bulk stake account data @@ -855,7 +705,7 @@ struct ParsedMetadata { epoch: u64, treasury: u64, reserves: u64, - pools: Vec, + pools: SPOState, dreps: Vec, accounts: Vec, blocks_previous_epoch: Vec, @@ -868,7 +718,7 @@ struct ParsedMetadataWithoutUtxoPosition { epoch: u64, treasury: u64, reserves: u64, - pools: Vec, + pools: SPOState, dreps: Vec, accounts: Vec, blocks_previous_epoch: Vec, @@ -948,10 +798,12 @@ impl StreamingSnapshotParser { /// 5: StakeDistr, /// ] /// ``` - pub fn parse(&self, callbacks: &mut C) -> Result<()> { + pub fn parse(&self, callbacks: &mut C, network: NetworkId) -> Result<()> { let file = File::open(&self.file_path) .context(format!("Failed to open snapshot file: {}", self.file_path))?; + let mut ctx = SnapshotContext { network }; + let mut chunked_reader = ChunkedCborReader::new(file, self.chunk_size)?; // Phase 1: Parse metadata efficiently using larger buffer to handle protocol parameters @@ -1078,8 +930,8 @@ impl StreamingSnapshotParser { Self::parse_vstate(&mut decoder).context("Failed to parse VState for DReps")?; // Parse PState [3][1][0][1] for pools - let pools = - Self::parse_pstate(&mut decoder).context("Failed to parse PState for pools")?; + let pools = Self::parse_pstate(&mut decoder, &mut ctx) + .context("Failed to parse PState for pools")?; // Parse DState [3][1][0][2] for accounts/delegations // DState is an array: [unified_rewards, fut_gen_deleg, gen_deleg, instant_rewards] @@ -1357,7 +1209,7 @@ impl StreamingSnapshotParser { // Finally, attempt to parse mark/set/go snapshots (EpochState[4]) let snapshots_result = - Self::parse_snapshots_with_hybrid_approach(&mut remainder_decoder, epoch); + Self::parse_snapshots_with_hybrid_approach(&mut remainder_decoder, &mut ctx, epoch); match &snapshots_result { Ok(raw_snapshots) => { @@ -1833,7 +1685,7 @@ impl StreamingSnapshotParser { /// Parse PState to extract stake pools /// PState = [pools_map, future_pools_map, retiring_map, deposits_map] - pub fn parse_pstate(decoder: &mut Decoder) -> Result> { + pub fn parse_pstate(decoder: &mut Decoder, ctx: &mut SnapshotContext) -> Result { // Parse PState array let pstate_len = decoder .array() @@ -1852,17 +1704,17 @@ impl StreamingSnapshotParser { decoder.tag()?; // skip tag if present } - let mut pools_map = BTreeMap::new(); + let mut pools = BTreeMap::new(); match decoder.map()? { Some(pool_count) => { // Definite-length map for i in 0..pool_count { - let pool_id: Hash<28> = - decoder.decode().context(format!("Failed to decode pool ID #{i}"))?; - let params: super::pool_params::PoolParams = decoder - .decode() - .context(format!("Failed to decode pool params for pool #{i}"))?; - pools_map.insert(pool_id, params); + let pool_id: PoolId = + decoder.decode().context(format!("Failed to decode pool id #{i}"))?; + let pool: SnapshotPoolRegistration = decoder + .decode_with(ctx) + .context(format!("Failed to decode pool for pool #{i}"))?; + pools.insert(pool_id, pool.0); } } None => { @@ -1875,13 +1727,13 @@ impl StreamingSnapshotParser { break; } _ => { - let pool_id: Hash<28> = decoder + let pool_id: PoolId = decoder .decode() - .context(format!("Failed to decode pool ID #{count}"))?; - let params: super::pool_params::PoolParams = decoder.decode().context( - format!("Failed to decode pool params for pool #{count}"), - )?; - pools_map.insert(pool_id, params); + .context(format!("Failed to decode pool id #{count}"))?; + let pool: SnapshotPoolRegistration = decoder + .decode_with(ctx) + .context(format!("Failed to decode pool for pool #{count}"))?; + pools.insert(pool_id, pool.0); count += 1; } } @@ -1893,103 +1745,25 @@ impl StreamingSnapshotParser { if matches!(decoder.datatype()?, Type::Tag) { decoder.tag()?; } - let _pools_updates: BTreeMap, super::pool_params::PoolParams> = - decoder.decode()?; + let updates: BTreeMap = decoder.decode_with(ctx)?; + let updates = updates.into_iter().map(|(id, pool)| (id, pool.0)).collect(); // Parse retiring map [2]: PoolId -> Epoch if matches!(decoder.datatype()?, Type::Tag) { decoder.tag()?; } - let pools_retirements: BTreeMap, Epoch> = decoder.decode()?; - - // Convert to PoolInfo for API compatibility - let pools = pools_map - .into_iter() - .map(|(pool_id, params)| { - // Convert relay types from ledger format to API format - let relays: Vec = params - .relays - .iter() - .map(|relay| match relay { - Relay::SingleHostAddr(port, ipv4, ipv6) => { - let port_opt = match port { - Nullable::Some(p) => u16::try_from(*p).ok(), - _ => None, - }; - let ipv4_opt = match ipv4 { - Nullable::Some(bytes) if bytes.0.len() == 4 => Some(format!( - "{}.{}.{}.{}", - bytes.0[0], bytes.0[1], bytes.0[2], bytes.0[3] - )), - _ => None, - }; - let ipv6_opt = match ipv6 { - Nullable::Some(bytes) if bytes.0.len() == 16 => { - // Convert big-endian byte array to IPv6 string - let b = &bytes.0; - let addr = std::net::Ipv6Addr::from([ - b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], - b[10], b[11], b[12], b[13], b[14], b[15], - ]); - Some(addr.to_string()) - } - _ => None, - }; - ApiRelay::SingleHostAddr { - port: port_opt, - ipv4: ipv4_opt, - ipv6: ipv6_opt, - } - } - Relay::SingleHostName(port, hostname) => { - let port_opt = match port { - Nullable::Some(p) => Some(*p as u16), - _ => None, - }; - ApiRelay::SingleHostName { - port: port_opt, - dns_name: hostname.clone(), - } - } - Relay::MultiHostName(hostname) => ApiRelay::MultiHostName { - dns_name: hostname.clone(), - }, - }) - .collect(); - - // Convert metadata from ledger format to API format - let pool_metadata = match ¶ms.metadata { - Nullable::Some(meta) => Some(ApiPoolMetadata { - url: meta.url.clone(), - hash: meta.hash.to_string(), - }), - _ => None, - }; - - // Look up retirement epoch - let retirement_epoch = pools_retirements.get(&pool_id).copied(); - - PoolInfo { - pool_id: pool_id.to_string(), - vrf_key_hash: params.vrf.to_string(), - pledge: params.pledge, - cost: params.cost, - margin: (params.margin.numerator as f64) / (params.margin.denominator as f64), - reward_account: hex::encode(¶ms.reward_account.0), - pool_owners: params.owners.iter().map(|h| h.to_string()).collect(), - relays, - pool_metadata, - retirement_epoch, - } - }) - .collect(); + let retiring: BTreeMap = decoder.decode()?; // Skip any remaining PState elements (like deposits) for i in 3..pstate_len { decoder.skip().context(format!("Failed to skip PState[{i}]"))?; } - Ok(pools) + Ok(SPOState { + pools, + updates, + retiring, + }) } /// Stream UTXOs with per-entry callback @@ -2133,6 +1907,7 @@ impl StreamingSnapshotParser { /// Epoch State / Snapshots / Fee fn parse_snapshots_with_hybrid_approach( decoder: &mut Decoder, + ctx: &mut SnapshotContext, _epoch: u64, ) -> Result { info!(" Starting snapshots parsing..."); @@ -2157,19 +1932,20 @@ impl StreamingSnapshotParser { // Parse Mark snapshot [0] info!(" Parsing Mark snapshot..."); - let mark_snapshot = super::mark_set_go::Snapshot::parse_single_snapshot(decoder, "Mark") - .context("Failed to parse Mark snapshot")?; + let mark_snapshot = + super::mark_set_go::Snapshot::parse_single_snapshot(decoder, ctx, "Mark") + .context("Failed to parse Mark snapshot")?; parsed_snapshots.push(mark_snapshot); // Parse Set snapshot [1] info!(" Parsing Set snapshot..."); - let set_snapshot = super::mark_set_go::Snapshot::parse_single_snapshot(decoder, "Set") + let set_snapshot = super::mark_set_go::Snapshot::parse_single_snapshot(decoder, ctx, "Set") .context("Failed to parse Set snapshot")?; parsed_snapshots.push(set_snapshot); // Parse Go snapshot [2] info!(" Parsing Go snapshot..."); - let go_snapshot = super::mark_set_go::Snapshot::parse_single_snapshot(decoder, "Go") + let go_snapshot = super::mark_set_go::Snapshot::parse_single_snapshot(decoder, ctx, "Go") .context("Failed to parse Go snapshot")?; parsed_snapshots.push(go_snapshot); @@ -2236,7 +2012,7 @@ pub struct SnapshotsInfo { pub struct CollectingCallbacks { pub metadata: Option, pub utxos: Vec, - pub pools: Vec, + pub pools: SPOState, pub accounts: Vec, pub dreps: Vec, pub proposals: Vec, @@ -2262,7 +2038,7 @@ impl EpochCallback for CollectingCallbacks { } impl PoolCallback for CollectingCallbacks { - fn on_pools(&mut self, pools: Vec) -> Result<()> { + fn on_pools(&mut self, pools: SPOState) -> Result<()> { self.pools = pools; Ok(()) } diff --git a/modules/snapshot_bootstrapper/src/bootstrapper.rs b/modules/snapshot_bootstrapper/src/bootstrapper.rs index 67d004b4..61eec3e9 100644 --- a/modules/snapshot_bootstrapper/src/bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/bootstrapper.rs @@ -124,7 +124,9 @@ impl SnapshotBootstrapper { let parser = StreamingSnapshotParser::new( bootstrap_ctx.snapshot_path().to_string_lossy().into_owned(), ); - parser.parse(&mut publisher).map_err(|e| BootstrapError::Parse(e.to_string()))?; + parser + .parse(&mut publisher, cfg.network.into()) + .map_err(|e| BootstrapError::Parse(e.to_string()))?; info!("Parsed snapshot in {:.2?}", start.elapsed()); publisher.publish_completion(bootstrap_ctx.block_info).await?; diff --git a/modules/snapshot_bootstrapper/src/publisher.rs b/modules/snapshot_bootstrapper/src/publisher.rs index 2dbf3f45..3c137940 100644 --- a/modules/snapshot_bootstrapper/src/publisher.rs +++ b/modules/snapshot_bootstrapper/src/publisher.rs @@ -1,16 +1,19 @@ use acropolis_common::protocol_params::{Nonces, PraosParams}; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; -use acropolis_common::snapshot::{RawSnapshotsContainer, SnapshotsCallback}; use acropolis_common::{ genesis_values::GenesisValues, + ledger_state::SPOState, messages::{ CardanoMessage, EpochBootstrapMessage, Message, SnapshotMessage, SnapshotStateMessage, }, params::EPOCH_LENGTH, - snapshot::streaming_snapshot::{ - DRepCallback, DRepInfo, EpochCallback, GovernanceProposal, - GovernanceProtocolParametersCallback, PoolCallback, PoolInfo, ProposalCallback, - SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry, + snapshot::{ + streaming_snapshot::{ + DRepCallback, DRepInfo, EpochCallback, GovernanceProposal, + GovernanceProtocolParametersCallback, PoolCallback, ProposalCallback, + SnapshotCallbacks, SnapshotMetadata, StakeCallback, UtxoCallback, UtxoEntry, + }, + RawSnapshotsContainer, SnapshotsCallback, }, stake_addresses::AccountState, BlockInfo, EpochBootstrapData, @@ -79,7 +82,7 @@ pub struct SnapshotPublisher { snapshot_topic: String, metadata: Option, utxo_count: u64, - pools: Vec, + pools: SPOState, accounts: Vec, dreps: Vec, proposals: Vec, @@ -99,7 +102,7 @@ impl SnapshotPublisher { snapshot_topic, metadata: None, utxo_count: 0, - pools: Vec::new(), + pools: SPOState::new(), accounts: Vec::new(), dreps: Vec::new(), proposals: Vec::new(), @@ -163,10 +166,30 @@ impl UtxoCallback for SnapshotPublisher { } impl PoolCallback for SnapshotPublisher { - fn on_pools(&mut self, pools: Vec) -> Result<()> { - info!("Received {} pools", pools.len()); - self.pools.extend(pools); - // TODO: Accumulate pool data if needed or send in chunks to PoolState processor + fn on_pools(&mut self, pools: SPOState) -> Result<()> { + info!( + "Received pools (current: {}, future: {}, retiring: {})", + pools.pools.len(), + pools.updates.len(), + pools.retiring.len() + ); + self.pools.extend(&pools); + + let message = Arc::new(Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::SPOState(pools), + ))); + + // Clone what we need for the async task + let context = self.context.clone(); + let snapshot_topic = self.snapshot_topic.clone(); + + // Spawn async publish task since this callback is synchronous + tokio::spawn(async move { + if let Err(e) = context.publish(&snapshot_topic, message).await { + tracing::error!("Failed to publish SPO bootstrap: {}", e); + } + }); + Ok(()) } } @@ -313,7 +336,12 @@ impl SnapshotCallbacks for SnapshotPublisher { info!("Snapshot parsing completed"); info!("Final statistics:"); info!(" - UTXOs processed: {}", self.utxo_count); - info!(" - Pools: {}", self.pools.len()); + info!( + " - Pools: {} (future: {}, retiring: {})", + self.pools.pools.len(), + self.pools.updates.len(), + self.pools.retiring.len() + ); info!(" - Accounts: {}", self.accounts.len()); info!(" - DReps: {}", self.dreps.len()); info!(" - Proposals: {}", self.proposals.len()); diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index c9a6912f..6f1c9fa0 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -733,40 +733,54 @@ impl SPOState { let mut subscription = context.subscribe(&snapshot_topic).await?; let context_snapshot = context.clone(); let history = history_snapshot.clone(); + enum SnapshotState { + Preparing, + Started, + } + let mut snapshot_state = SnapshotState::Preparing; context.run(async move { - let Ok((_, message)) = subscription.read().await else { - return; - }; - - let mut guard = history.lock().await; - match message.as_ref() { - Message::Snapshot(SnapshotMessage::Bootstrap( - SnapshotStateMessage::SPOState(spo_state), - )) => { - guard.clear(); - guard.commit_forced(spo_state.clone().into()); - } - Message::Snapshot(SnapshotMessage::DumpRequest(SnapshotDumpMessage { - block_height, - })) => { - info!("inspecting state at block height {}", block_height); - let maybe_spo_state = - guard.get_by_index_reverse(*block_height).map(LedgerSPOState::from); - - if let Some(spo_state) = maybe_spo_state { - context_snapshot - .message_bus - .publish( - &snapshot_topic, - Arc::new(Message::Snapshot(SnapshotMessage::Dump( - SnapshotStateMessage::SPOState(spo_state), - ))), - ) - .await - .unwrap_or_else(|e| error!("failed to publish snapshot dump: {e}")) + loop { + let Ok((_, message)) = subscription.read().await else { + return; + }; + + let mut guard = history.lock().await; + match message.as_ref() { + Message::Snapshot(SnapshotMessage::Startup) => { + match snapshot_state { + SnapshotState::Preparing => snapshot_state = SnapshotState::Started, + _ => error!("Snapshot Startup message received but we have already left preparing state"), + } + } + Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::SPOState(spo_state), + )) => { + guard.clear(); + guard.commit_forced(spo_state.clone().into()); } + Message::Snapshot(SnapshotMessage::DumpRequest(SnapshotDumpMessage { + block_height, + })) => { + info!("inspecting state at block height {}", block_height); + let maybe_spo_state = + guard.get_by_index_reverse(*block_height).map(LedgerSPOState::from); + + if let Some(spo_state) = maybe_spo_state { + context_snapshot + .message_bus + .publish( + &snapshot_topic, + Arc::new(Message::Snapshot(SnapshotMessage::Dump( + SnapshotStateMessage::SPOState(spo_state), + ))), + ) + .await + .unwrap_or_else(|e| error!("failed to publish snapshot dump: {e}")) + } + } + // There will be other snapshot messages that we're not interested in + _ => () } - _ => error!("Unexpected message type: {message:?}"), } }); }