Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions common/examples/test_streaming_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
//
// Usage: cargo run --example test_streaming_parser --release -- <snapshot_path>

use acropolis_common::ledger_state::SPOState;
use acropolis_common::snapshot::protocol_parameters::ProtocolParameters;
use acropolis_common::snapshot::streaming_snapshot::GovernanceProtocolParametersCallback;
use acropolis_common::snapshot::EpochCallback;
use acropolis_common::snapshot::{
AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, PoolInfo,
ProposalCallback, RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata,
SnapshotsCallback, StakeCallback, StreamingSnapshotParser, UtxoCallback, UtxoEntry,
AccountState, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, ProposalCallback,
RawSnapshotsContainer, SnapshotCallbacks, SnapshotMetadata, SnapshotsCallback, StakeCallback,
StreamingSnapshotParser, UtxoCallback, UtxoEntry,
};
use acropolis_common::{NetworkId, PoolRegistration};
use anyhow::Result;
use std::env;
use std::time::Instant;
Expand All @@ -24,11 +26,13 @@ struct CountingCallbacks {
metadata: Option<SnapshotMetadata>,
utxo_count: u64,
pool_count: usize,
future_pool_count: usize,
retiring_pool_count: usize,
account_count: usize,
drep_count: usize,
proposal_count: usize,
sample_utxos: Vec<UtxoEntry>,
sample_pools: Vec<PoolInfo>,
sample_pools: Vec<PoolRegistration>,
sample_accounts: Vec<AccountState>,
sample_dreps: Vec<DRepInfo>,
sample_proposals: Vec<GovernanceProposal>,
Expand Down Expand Up @@ -63,24 +67,32 @@ impl UtxoCallback for CountingCallbacks {
}

impl PoolCallback for CountingCallbacks {
fn on_pools(&mut self, pools: Vec<PoolInfo>) -> Result<()> {
self.pool_count = pools.len();
eprintln!("Parsed {} stake pools", pools.len());
fn on_pools(&mut self, pools: SPOState) -> Result<()> {
self.pool_count = pools.pools.len();
self.future_pool_count = pools.updates.len();
self.retiring_pool_count = pools.retiring.len();
eprintln!(
"Parsed {} stake pools (future: {}, retiring: {}))",
pools.pools.len(),
pools.updates.len(),
pools.retiring.len()
);

// Show first 10 pools
for (i, pool) in pools.iter().take(10).enumerate() {
// Keep first 10 for summary
self.sample_pools = pools.pools.into_iter().take(10).map(|(_, v)| v).collect();

// Show sample pools
for (i, pool) in self.sample_pools.clone().iter().enumerate() {
eprintln!(
" Pool #{}: {} (pledge: {}, cost: {}, margin: {:.2}%)",
" Pool #{}: {} (pledge: {}, cost: {}, margin: {:?})",
i + 1,
pool.pool_id,
pool.operator,
pool.pledge,
pool.cost,
pool.margin * 100.0
pool.margin
);
}

// Keep first 10 for summary
self.sample_pools = pools.into_iter().take(10).collect();
Ok(())
}
}
Expand Down Expand Up @@ -496,7 +508,7 @@ fn main() {
println!("Starting parse...");
let start = Instant::now();

match parser.parse(&mut callbacks) {
match parser.parse(&mut callbacks, NetworkId::Mainnet) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulling the networkid out is nice.

Ok(()) => {
let duration = start.elapsed();
println!("Parse completed successfully in {duration:.2?}");
Expand Down Expand Up @@ -571,12 +583,12 @@ fn main() {
println!("Sample Pools (first 10):");
for (i, pool) in callbacks.sample_pools.iter().enumerate() {
println!(
" {}: {} (pledge: {}, cost: {}, margin: {:.2}%)",
" {}: {} (pledge: {}, cost: {}, margin: {:?})",
i + 1,
pool.pool_id,
pool.operator,
pool.pledge,
pool.cost,
pool.margin * 100.0
pool.margin
);
}
println!();
Expand Down
16 changes: 16 additions & 0 deletions common/src/ledger_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ pub struct SPOState {
pub retiring: BTreeMap<PoolId, u64>,
}

impl SPOState {
pub fn new() -> Self {
Self {
pools: BTreeMap::new(),
updates: BTreeMap::new(),
retiring: BTreeMap::new(),
}
}

pub fn extend(&mut self, extension: &Self) {
self.pools.extend(extension.pools.clone());
self.updates.extend(extension.updates.clone());
self.retiring.extend(extension.retiring.clone());
}
}

pub struct DRepState {}

pub struct ProposalState {}
Expand Down
156 changes: 14 additions & 142 deletions common/src/snapshot/mark_set_go.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,16 @@
// ================================================================================================

use anyhow::{Context, Error, Result};
use log::{error, info};
use log::info;

use minicbor::Decoder;
use serde::Serialize;

use types::Ratio;

pub use crate::hash::Hash;
use crate::snapshot::pool_params::PoolParams;
use crate::snapshot::streaming_snapshot;
use crate::snapshot::streaming_snapshot::{SnapshotContext, SnapshotPoolRegistration};
pub use crate::stake_addresses::{AccountState, StakeAddressState};
use crate::PoolId;
pub use crate::StakeCredential;
use crate::{address::StakeAddress, types, NetworkId, PoolRegistration};
use crate::{PoolId, PoolRegistration};

/// VMap<K, V> representation for CBOR Map types
#[derive(Debug, Clone, PartialEq, Serialize)]
Expand Down Expand Up @@ -91,7 +87,11 @@ pub struct Snapshot {

impl Snapshot {
/// Parse a single snapshot (Mark, Set, or Go)
pub fn parse_single_snapshot(decoder: &mut Decoder, snapshot_name: &str) -> Result<Snapshot> {
pub fn parse_single_snapshot(
decoder: &mut Decoder,
ctx: &mut SnapshotContext,
snapshot_name: &str,
) -> Result<Snapshot> {
info!(" {snapshot_name} snapshot - checking data type...");

// Check what type we have - could be array, map, or simple value
Expand Down Expand Up @@ -119,148 +119,20 @@ impl Snapshot {
info!(
" {snapshot_name} snapshot - parsing snapshot_pool_registration..."
);

// pool_registration (third element)
let pools: VMap<PoolId, PoolParams> = decoder
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to see all this ugly code go away.

.decode()
let pools: VMap<PoolId, SnapshotPoolRegistration> = decoder
.decode_with(ctx)
.context("Failed to parse snapshot_pool_registration")?;
let registration = VMap(
pools
.0
.into_iter()
.map(|(pool_id, params)| {
// Convert RewardAccount (Vec<u8>) to StakeAddress (arbitralily chosen over ScripHash)
let reward_account =
StakeAddress::from_binary(&params.reward_account.0)
.unwrap_or_else(|_|
{
error!("Failed to parse reward account for pool {pool_id}, using default");
StakeAddress::default()
}
);

// Convert Set<AddrKeyhash> to Vec<StakeAddress>
let pool_owners: Vec<StakeAddress> = params
.owners
.0
.into_iter()
.map(|keyhash| {
StakeAddress::new(
StakeCredential::AddrKeyHash(keyhash),
NetworkId::Mainnet, // TODO: Make network configurable or get it from parameters
)
})
.collect();

// Convert Vec<streaming_snapshot::Relay> to Vec<types::Relay>
let relays: Vec<types::Relay> = params
.relays
.into_iter()
.map(|relay| match relay {
streaming_snapshot::Relay::SingleHostAddr(
port,
ipv4,
ipv6,
) => {
let port_opt = match port {
streaming_snapshot::Nullable::Some(p) => {
Some(p as u16)
}
_ => None,
};
let ipv4_opt = match ipv4 {
streaming_snapshot::Nullable::Some(ip)
if ip.0.len() == 4 =>
{
Some(std::net::Ipv4Addr::new(
ip.0[0], ip.0[1], ip.0[2], ip.0[3],
))
}
_ => None,
};
let ipv6_opt = match ipv6 {
streaming_snapshot::Nullable::Some(ip)
if ip.0.len() == 16 =>
{
let b = &ip.0;
Some(std::net::Ipv6Addr::from([
b[0], b[1], b[2], b[3], b[4], b[5],
b[6], b[7], b[8], b[9], b[10], b[11],
b[12], b[13], b[14], b[15],
]))
}
_ => None,
};
types::Relay::SingleHostAddr(
types::SingleHostAddr {
port: port_opt,
ipv4: ipv4_opt,
ipv6: ipv6_opt,
},
)
}
streaming_snapshot::Relay::SingleHostName(
port,
hostname,
) => {
let port_opt = match port {
streaming_snapshot::Nullable::Some(p) => {
Some(p as u16)
}
_ => None,
};
types::Relay::SingleHostName(
types::SingleHostName {
port: port_opt,
dns_name: hostname,
},
)
}
streaming_snapshot::Relay::MultiHostName(hostname) => {
types::Relay::MultiHostName(types::MultiHostName {
dns_name: hostname,
})
}
})
.collect();

// Convert Nullable<PoolMetadata> to Option<PoolMetadata>
let pool_metadata = match params.metadata {
streaming_snapshot::Nullable::Some(meta) => {
Some(types::PoolMetadata {
url: meta.url,
hash: meta.hash.to_vec(),
})
}
_ => None,
};

(
pool_id,
PoolRegistration {
operator: params.id,
vrf_key_hash: params.vrf,
pledge: params.pledge,
cost: params.cost,
margin: Ratio {
numerator: params.margin.numerator,
denominator: params.margin.denominator,
},
reward_account,
pool_owners,
relays,
pool_metadata,
},
)
})
.collect(),
);

info!(" {snapshot_name} snapshot - parse completed successfully.");

Ok(Snapshot {
snapshot_stake,
snapshot_delegations: delegations,
snapshot_pool_params: registration,
snapshot_pool_params: VMap(
pools.0.into_iter().map(|(k, v)| (k, v.0)).collect(),
),
})
}
other_type => {
Expand Down
7 changes: 3 additions & 4 deletions common/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod decode;
mod error;
pub mod mark_set_go;
mod parser;
pub mod pool_params;
pub mod protocol_parameters;
pub mod streaming_snapshot;

Expand All @@ -27,9 +26,9 @@ pub use parser::{compute_sha256, parse_manifest, validate_era, validate_integrit
// Re-export streaming snapshot APIs
pub use streaming_snapshot::{
AccountState, Anchor, CollectingCallbacks, DRepCallback, DRepInfo, EpochCallback,
GovernanceProposal, PoolCallback, PoolInfo, PoolMetadata, PotBalances, ProposalCallback, Relay,
SnapshotCallbacks, SnapshotMetadata, StakeAddressState, StakeCallback, StreamingSnapshotParser,
UtxoCallback, UtxoEntry,
GovernanceProposal, PoolCallback, PotBalances, ProposalCallback, Relay, SnapshotCallbacks,
SnapshotMetadata, StakeAddressState, StakeCallback, StreamingSnapshotParser, UtxoCallback,
UtxoEntry,
};

// Re-export snapshot types
Expand Down
Loading