diff --git a/common/examples/test_streaming_parser.rs b/common/examples/test_streaming_parser.rs index b25c29a7..7f98d96a 100644 --- a/common/examples/test_streaming_parser.rs +++ b/common/examples/test_streaming_parser.rs @@ -5,15 +5,18 @@ use acropolis_common::epoch_snapshot::SnapshotsContainer; use acropolis_common::ledger_state::SPOState; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; -use acropolis_common::snapshot::streaming_snapshot::GovernanceProtocolParametersCallback; +use acropolis_common::snapshot::streaming_snapshot::{ + AccountsCallback, DRepCallback, DRepRecord, GovernanceProtocolParametersCallback, UtxoCallback, + UtxoEntry, +}; use acropolis_common::snapshot::EpochCallback; use acropolis_common::snapshot::{ - AccountState, AccountsCallback, DRepCallback, DRepInfo, GovernanceProposal, PoolCallback, - ProposalCallback, SnapshotCallbacks, SnapshotMetadata, SnapshotsCallback, - StreamingSnapshotParser, UtxoCallback, UtxoEntry, + AccountState, GovernanceProposal, PoolCallback, ProposalCallback, SnapshotCallbacks, + SnapshotMetadata, SnapshotsCallback, StreamingSnapshotParser, }; -use acropolis_common::{NetworkId, PoolRegistration}; +use acropolis_common::{DRepCredential, NetworkId, PoolRegistration}; use anyhow::Result; +use std::collections::HashMap; use std::env; use std::time::Instant; use tracing::info; @@ -35,7 +38,7 @@ struct CountingCallbacks { sample_utxos: Vec, sample_pools: Vec, sample_accounts: Vec, - sample_dreps: Vec, + sample_dreps: Vec<(DRepCredential, DRepRecord)>, sample_proposals: Vec, gs_previous_params: Option, gs_current_params: Option, @@ -137,26 +140,27 @@ impl AccountsCallback for CountingCallbacks { } impl DRepCallback for CountingCallbacks { - fn on_dreps(&mut self, dreps: Vec) -> Result<()> { + fn on_dreps(&mut self, epoch: u64, dreps: HashMap) -> Result<()> { self.drep_count = dreps.len(); - eprintln!("Parsed {} DReps", self.drep_count); + eprintln!("Parsed {} DReps for epoch {}", self.drep_count, epoch); // Show first 10 DReps - for (i, drep) in dreps.iter().take(10).enumerate() { - if let Some(anchor) = &drep.anchor { + for (i, (cred, record)) in dreps.iter().take(10).enumerate() { + let drep_id = cred.to_drep_bech32().unwrap_or_else(|_| "invalid_cred".to_string()); + if let Some(anchor) = &record.anchor { eprintln!( " DRep #{}: {} (deposit: {}) - {}", i + 1, - drep.drep_id.to_drep_bech32().unwrap(), - drep.deposit, + drep_id, + record.deposit, anchor.url ); } else { eprintln!( " DRep #{}: {} (deposit: {})", i + 1, - drep.drep_id.to_drep_bech32().unwrap(), - drep.deposit + drep_id, + record.deposit ); } } @@ -548,14 +552,16 @@ fn main() { // Show sample DReps if !callbacks.sample_dreps.is_empty() { println!("Sample DReps (first 10):"); - for (i, drep) in callbacks.sample_dreps.iter().enumerate() { + for (i, (cred, record)) in callbacks.sample_dreps.iter().enumerate() { + let drep_id = + cred.to_drep_bech32().unwrap_or_else(|_| "invalid_cred".to_string()); print!( " {}: {} (deposit: {} lovelace)", i + 1, - drep.drep_id.to_drep_bech32().unwrap(), - drep.deposit + drep_id, + record.deposit ); - if let Some(anchor) = &drep.anchor { + if let Some(anchor) = &record.anchor { println!(" - {}", anchor.url); } else { println!(); diff --git a/common/src/drep.rs b/common/src/drep.rs new file mode 100644 index 00000000..699f8050 --- /dev/null +++ b/common/src/drep.rs @@ -0,0 +1,138 @@ +//! DRep (Delegated Representative) types and structures + +use crate::rational_number::RationalNumber; +use crate::types::{Credential, Lovelace}; +use serde_with::{hex::Hex, serde_as}; + +pub type DRepCredential = Credential; + +/// Anchor - verifiable link on-chain identifiers with off-chain content, +/// typically metadata that describes a DRep's identity, platform, or governance +/// philosophy. +#[serde_as] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct Anchor { + /// Metadata URL + pub url: String, + + /// Metadata hash + #[serde_as(as = "Hex")] + pub data_hash: Vec, +} + +impl<'b, C> minicbor::Decode<'b, C> for Anchor { + fn decode( + d: &mut minicbor::Decoder<'b>, + _ctx: &mut C, + ) -> Result { + d.array()?; + + // URL can be either bytes or text string (snapshot format uses bytes) + let url = match d.datatype()? { + minicbor::data::Type::Bytes => { + let url_bytes = d.bytes()?; + String::from_utf8_lossy(url_bytes).to_string() + } + minicbor::data::Type::String => d.str()?.to_string(), + _ => { + return Err(minicbor::decode::Error::message( + "Expected bytes or string for URL", + )) + } + }; + + // data_hash is encoded as direct bytes, not an array + let data_hash = d.bytes()?.to_vec(); + + Ok(Self { url, data_hash }) + } +} + +/// DRep Record - represents the current state of a DRep in the ledger +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepRecord { + /// Deposit amount in lovelace + pub deposit: Lovelace, + /// Optional anchor (metadata reference) + pub anchor: Option, +} + +impl DRepRecord { + pub fn new(deposit: Lovelace, anchor: Option) -> Self { + Self { deposit, anchor } + } +} + +/// DRepChoice (=CDDL drep, badly named) +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum DRepChoice { + /// Address key + Key(crate::KeyHash), + + /// Script key + Script(crate::KeyHash), + + /// Abstain + Abstain, + + /// No confidence + NoConfidence, +} + +/// DRep Registration = reg_drep_cert +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepRegistration { + /// DRep credential + pub credential: DRepCredential, + + /// Deposit paid + pub deposit: Lovelace, + + /// Optional anchor + pub anchor: Option, +} + +/// DRep Deregistration = unreg_drep_cert +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepDeregistration { + /// DRep credential + pub credential: DRepCredential, + + /// Deposit to refund + pub refund: Lovelace, +} + +/// DRep Update = update_drep_cert +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepUpdate { + /// DRep credential + pub credential: DRepCredential, + + /// Optional anchor + pub anchor: Option, +} + +/// DRep voting thresholds for governance actions +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone, minicbor::Decode)] +pub struct DRepVotingThresholds { + #[n(0)] + pub motion_no_confidence: RationalNumber, + #[n(1)] + pub committee_normal: RationalNumber, + #[n(2)] + pub committee_no_confidence: RationalNumber, + #[n(3)] + pub update_constitution: RationalNumber, + #[n(4)] + pub hard_fork_initiation: RationalNumber, + #[n(5)] + pub pp_network_group: RationalNumber, + #[n(6)] + pub pp_economic_group: RationalNumber, + #[n(7)] + pub pp_technical_group: RationalNumber, + #[n(8)] + pub pp_governance_group: RationalNumber, + #[n(9)] + pub treasury_withdrawal: RationalNumber, +} diff --git a/common/src/lib.rs b/common/src/lib.rs index fd505690..73eab19a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -8,6 +8,7 @@ pub mod cip19; pub mod commands; pub mod configuration; pub mod crypto; +pub mod drep; pub mod epoch_snapshot; pub mod genesis_values; pub mod hash; @@ -32,5 +33,6 @@ pub mod validation; // Flattened re-exports pub use self::address::*; +pub use self::drep::*; pub use self::metadata::*; pub use self::types::*; diff --git a/common/src/messages.rs b/common/src/messages.rs index 75babc2b..959b6cb6 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -28,8 +28,8 @@ use crate::Pots; use std::collections::HashMap; use crate::cbor::u128_cbor_codec; -use crate::types::*; use crate::validation::ValidationStatus; +use crate::{types::*, DRepRecord}; // Caryatid core messages which we re-export use crate::epoch_snapshot::SnapshotsContainer; @@ -350,6 +350,12 @@ pub enum SnapshotMessage { Dump(SnapshotStateMessage), } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepBootstrapMessage { + pub epoch: u64, + pub dreps: HashMap, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SnapshotDumpMessage { pub block_height: u64, @@ -436,6 +442,7 @@ pub enum SnapshotStateMessage { SPOState(SPOState), EpochState(EpochBootstrapMessage), AccountsState(AccountsBootstrapMessage), + DRepState(DRepBootstrapMessage), } // === Global message enum === diff --git a/common/src/snapshot/mark_set_go.rs b/common/src/snapshot/mark_set_go.rs index 9f3ac1eb..2cf8cf2d 100644 --- a/common/src/snapshot/mark_set_go.rs +++ b/common/src/snapshot/mark_set_go.rs @@ -75,7 +75,7 @@ impl RawSnapshot { ctx: &mut SnapshotContext, snapshot_name: &str, ) -> Result { - info!("Parsing snapshot {}", snapshot_name); + info!("Parsing snapshot {snapshot_name}"); match decoder.datatype().context("Failed to read snapshot datatype")? { minicbor::data::Type::Array => { decoder.array().context("Failed to parse snapshot array")?; diff --git a/common/src/snapshot/streaming_snapshot.rs b/common/src/snapshot/streaming_snapshot.rs index 90701e41..679d8e7b 100644 --- a/common/src/snapshot/streaming_snapshot.rs +++ b/common/src/snapshot/streaming_snapshot.rs @@ -24,7 +24,7 @@ use anyhow::{anyhow, Context, Result}; use minicbor::data::Type; use minicbor::Decoder; use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -35,14 +35,12 @@ pub use crate::hash::Hash; use crate::ledger_state::SPOState; use crate::snapshot::protocol_parameters::ProtocolParameters; pub use crate::stake_addresses::{AccountState, StakeAddressState}; -use crate::{ - Constitution, DRepChoice, DRepCredential, EpochBootstrapData, PoolBlockProduction, PoolId, - PoolMetadata, Pots, Relay, -}; pub use crate::{ - Lovelace, MultiHostName, NetworkId, PoolRegistration, Ratio, SingleHostAddr, SingleHostName, - StakeAddress, StakeCredential, + Constitution, DRepChoice, DRepCredential, DRepRecord, EpochBootstrapData, Lovelace, + MultiHostName, NetworkId, PoolId, PoolMetadata, PoolRegistration, Ratio, Relay, SingleHostAddr, + SingleHostName, StakeAddress, StakeCredential, }; +use crate::{PoolBlockProduction, Pots}; // Import snapshot parsing support use super::mark_set_go::{RawSnapshotsContainer, SnapshotsCallback}; @@ -539,34 +537,6 @@ pub struct UtxoEntry { // Ledger types for DState parsing // ----------------------------------------------------------------------------- -/// Local newtype wrapper for DRepCredential to provide custom CBOR decoding -/// without conflicting with the main Credential type's Decode implementation. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -struct LocalDRepCredential(DRepCredential); - -impl<'b, C> minicbor::Decode<'b, C> for LocalDRepCredential { - fn decode(d: &mut Decoder<'b>, ctx: &mut C) -> Result { - d.array()?; - let variant = d.u16()?; - - match variant { - 0 => Ok(LocalDRepCredential(DRepCredential::AddrKeyHash( - d.decode_with(ctx)?, - ))), - 1 => Ok(LocalDRepCredential(DRepCredential::ScriptHash( - d.decode_with(ctx)?, - ))), - _ => Err(minicbor::decode::Error::message( - "invalid variant id for DRepCredential", - )), - } - } -} - -// ----------------------------------------------------------------------------- -// Data Structures (based on OpenAPI schema) -// ----------------------------------------------------------------------------- - /// DRep information #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DRepInfo { @@ -666,8 +636,8 @@ pub trait AccountsCallback { /// Callback invoked with bulk DRep data pub trait DRepCallback { - /// Called once with all DRep info - fn on_dreps(&mut self, dreps: Vec) -> Result<()>; + /// Called once with all DRep data + fn on_dreps(&mut self, epoch: u64, dreps: HashMap) -> Result<()>; } /// Callback invoked with bulk governance proposal data @@ -1255,7 +1225,7 @@ impl StreamingSnapshotParser { // Convert DRepInfo to (credential, deposit) tuples let drep_deposits: Vec<(DRepCredential, u64)> = - dreps.iter().map(|d| (d.drep_id.clone(), d.deposit)).collect(); + dreps.iter().map(|(cred, record)| (cred.clone(), record.deposit)).collect(); // Build the accounts bootstrap data let accounts_bootstrap_data = AccountsBootstrapData { @@ -1274,7 +1244,7 @@ impl StreamingSnapshotParser { // Emit bulk callbacks callbacks.on_pools(pools)?; - callbacks.on_dreps(dreps)?; + callbacks.on_dreps(epoch, dreps)?; callbacks.on_accounts(accounts_bootstrap_data)?; callbacks.on_proposals(Vec::new())?; // TODO: Parse from GovState @@ -1655,7 +1625,7 @@ impl StreamingSnapshotParser { } /// VState = [dreps_map, committee_state, dormant_epoch] - fn parse_vstate(decoder: &mut Decoder) -> Result> { + fn parse_vstate(decoder: &mut Decoder) -> Result> { // Parse VState array let vstate_len = decoder .array() @@ -1670,25 +1640,24 @@ impl StreamingSnapshotParser { // Parse DReps map [0]: StakeCredential -> DRepState // Using minicbor's Decode trait - much simpler than manual parsing! - let dreps_map: BTreeMap = decoder.decode()?; - - // Convert to DRepInfo - let dreps = dreps_map + let dreps_map: BTreeMap = decoder.decode()?; + let dreps: HashMap = dreps_map .into_iter() - .map(|(LocalDRepCredential(drep_id), state)| { + .map(|(cred, state)| { let anchor = match state.anchor { - StrictMaybe::Just(a) => Some(AnchorInfo { + StrictMaybe::Just(a) => Some(crate::Anchor { url: a.url, - data_hash: a.content_hash.to_string(), + data_hash: a.content_hash.to_vec(), }), StrictMaybe::Nothing => None, }; - DRepInfo { - drep_id, + let record = DRepRecord { deposit: state.deposit, anchor, - } + }; + + (cred, record) }) .collect(); @@ -1990,7 +1959,7 @@ pub struct CollectingCallbacks { pub utxos: Vec, pub pools: SPOState, pub accounts: Vec, - pub dreps: Vec, + pub dreps: HashMap, pub proposals: Vec, pub epoch: EpochBootstrapData, pub snapshots: Option, @@ -2028,7 +1997,7 @@ impl AccountsCallback for CollectingCallbacks { } impl DRepCallback for CollectingCallbacks { - fn on_dreps(&mut self, dreps: Vec) -> Result<()> { + fn on_dreps(&mut self, _epoch: u64, dreps: HashMap) -> Result<()> { self.dreps = dreps; Ok(()) } diff --git a/common/src/types.rs b/common/src/types.rs index ce38c32a..a0c64b87 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -2,6 +2,9 @@ // We don't use these types in the acropolis_common crate itself #![allow(dead_code)] +use crate::drep::{ + Anchor, DRepChoice, DRepDeregistration, DRepRegistration, DRepUpdate, DRepVotingThresholds, +}; use crate::hash::Hash; use crate::serialization::Bech32Conversion; use crate::{ @@ -1320,22 +1323,6 @@ pub struct Deregistration { pub refund: Lovelace, } -/// DRepChoice (=CDDL drep, badly named) -#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] -pub enum DRepChoice { - /// Address key - Key(KeyHash), - - /// Script key - Script(KeyHash), - - /// Abstain - Abstain, - - /// No confidence - NoConfidence, -} - /// Vote delegation (simple, existing registration) = vote_deleg_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct VoteDelegation { @@ -1403,82 +1390,8 @@ pub struct StakeRegistrationAndStakeAndVoteDelegation { pub deposit: Lovelace, } -/// Anchor -#[serde_as] -#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] -pub struct Anchor { - /// Metadata URL - pub url: String, - - /// Metadata hash - #[serde_as(as = "Hex")] - pub data_hash: DataHash, -} - -impl<'b, C> minicbor::Decode<'b, C> for Anchor { - fn decode( - d: &mut minicbor::Decoder<'b>, - _ctx: &mut C, - ) -> Result { - d.array()?; - - // URL can be either bytes or text string (snapshot format uses bytes) - let url = match d.datatype()? { - minicbor::data::Type::Bytes => { - let url_bytes = d.bytes()?; - String::from_utf8_lossy(url_bytes).to_string() - } - minicbor::data::Type::String => d.str()?.to_string(), - _ => { - return Err(minicbor::decode::Error::message( - "Expected bytes or string for Anchor URL", - )) - } - }; - - // data_hash is encoded as direct bytes, not an array - let data_hash = d.bytes()?.to_vec(); - - Ok(Self { url, data_hash }) - } -} - -pub type DRepCredential = Credential; - -/// DRep Registration = reg_drep_cert -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepRegistration { - /// DRep credential - pub credential: DRepCredential, - - /// Deposit paid - pub deposit: Lovelace, - - /// Optional anchor - pub anchor: Option, -} - -/// DRep Deregistration = unreg_drep_cert -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepDeregistration { - /// DRep credential - pub credential: DRepCredential, - - /// Deposit to refund - pub refund: Lovelace, -} - -/// DRep Update = update_drep_cert -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepUpdate { - /// DRep credential - pub credential: DRepCredential, - - /// Optional anchor - pub anchor: Option, -} - pub type CommitteeCredential = Credential; +pub use crate::drep::DRepCredential; /// Authorise a committee hot credential #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -1638,30 +1551,6 @@ pub struct PoolVotingThresholds { pub security_voting_threshold: RationalNumber, } -#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone, minicbor::Decode)] -pub struct DRepVotingThresholds { - #[n(0)] - pub motion_no_confidence: RationalNumber, - #[n(1)] - pub committee_normal: RationalNumber, - #[n(2)] - pub committee_no_confidence: RationalNumber, - #[n(3)] - pub update_constitution: RationalNumber, - #[n(4)] - pub hard_fork_initiation: RationalNumber, - #[n(5)] - pub pp_network_group: RationalNumber, - #[n(6)] - pub pp_economic_group: RationalNumber, - #[n(7)] - pub pp_technical_group: RationalNumber, - #[n(8)] - pub pp_governance_group: RationalNumber, - #[n(9)] - pub treasury_withdrawal: RationalNumber, -} - #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub struct SoftForkRule { pub init_thd: u64, diff --git a/modules/drep_state/Cargo.toml b/modules/drep_state/Cargo.toml index 047fe37a..13b31167 100644 --- a/modules/drep_state/Cargo.toml +++ b/modules/drep_state/Cargo.toml @@ -16,7 +16,6 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } serde = { workspace = true } -serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index 51d20277..2c17c182 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -2,7 +2,8 @@ //! Accepts certificate events and derives the DRep State in memory use acropolis_common::caryatid::SubscriptionExt; -use acropolis_common::messages::StateTransitionMessage; +use acropolis_common::configuration::StartupMethod; +use acropolis_common::messages::{SnapshotMessage, SnapshotStateMessage, StateTransitionMessage}; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, @@ -34,6 +35,9 @@ const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = ("governance-subscribe-topic", "cardano.governance"); const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); +/// Topic for receiving bootstrap data when starting from a CBOR dump snapshot +const DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC: (&str, &str) = + ("snapshot-subscribe-topic", "cardano.snapshot"); // Publisher topic const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; @@ -72,6 +76,7 @@ impl DRepState { info!("Consumed initial genesis params from params_subscription"); } } + // Main loop of synchronised messages loop { // Get the current state snapshot @@ -273,6 +278,51 @@ impl DRepState { let ticker_history = history.clone(); let ctx_run = context.clone(); + // Subscribe for snapshot messages, if allowed + let snapshot_subscribe_topic = config + .get_string(DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_SNAPSHOT_SUBSCRIBE_TOPIC.1.to_string()); + + let snapshot_subscription = if StartupMethod::from_config(config.as_ref()).is_snapshot() { + info!("Creating subscriber on '{snapshot_subscribe_topic}'"); + Some(context.subscribe(&snapshot_subscribe_topic).await?) + } else { + None + }; + + if let Some(mut subscription) = snapshot_subscription { + context.run(async move { + loop { + // Get the current state snapshot + let mut state = { + let mut h = history.lock().await; + h.get_or_init_with(|| State::new(storage_config)) + }; + + let Ok((_, message)) = subscription.read().await else { + return; + }; + + match message.as_ref() { + Message::Snapshot(SnapshotMessage::Startup) => { + info!("DRepState: Snapshot Startup message received"); + } + Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::DRepState(drep_msg), + )) => { + let drep_count = state.dreps.len(); + info!("DRepState: Snapshot Bootstrap message received {drep_count} DReps loaded"); + state.bootstrap(drep_msg); + // Commit the bootstrapped state to history to persist changes + history.lock().await.commit(drep_msg.epoch, state); + } + // There will be other snapshot messages that we're not interested in + _ => (), + } + } + }); + } + // Query handler context.handle(&drep_query_topic, move |message| { let history = query_history.clone(); diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index f2056721..30899fb7 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -1,28 +1,20 @@ //! Acropolis DRepState: State storage use acropolis_common::{ - messages::{Message, StateQuery, StateQueryResponse}, + messages::{DRepBootstrapMessage, Message, StateQuery, StateQueryResponse}, queries::{ accounts::{AccountsStateQuery, AccountsStateQueryResponse, DEFAULT_ACCOUNTS_QUERY_TOPIC}, get_query_topic, governance::{DRepActionUpdate, DRepUpdateEvent, VoteRecord}, }, - Anchor, DRepChoice, DRepCredential, Lovelace, StakeAddress, TxCertificate, + Anchor, DRepChoice, DRepCredential, DRepRecord, Lovelace, StakeAddress, TxCertificate, TxCertificateWithPos, TxHash, Voter, VotingProcedures, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; -use serde_with::serde_as; use std::{collections::HashMap, sync::Arc}; use tracing::{error, info}; -#[serde_as] -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepRecord { - pub deposit: Lovelace, - pub anchor: Option, -} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct HistoricalDRepState { // Populated from the reg field in: @@ -65,12 +57,6 @@ pub struct DRepRecordExtended { pub last_active_epoch: u64, } -impl DRepRecord { - pub fn new(deposit: Lovelace, anchor: Option) -> Self { - Self { deposit, anchor } - } -} - #[derive(Debug, Copy, Clone, Default)] pub struct DRepStorageConfig { pub store_info: bool, @@ -554,6 +540,29 @@ impl State { } } +impl State { + /// Initialize state from snapshot data + pub fn bootstrap(&mut self, drep_msg: &DRepBootstrapMessage) { + for (cred, record) in &drep_msg.dreps { + self.dreps.insert(cred.clone(), record.clone()); + // update historical state if enabled + if let Some(hist_map) = self.historical_dreps.as_mut() { + let cfg = self.config; + let entry = hist_map + .entry(cred.clone()) + .or_insert_with(|| HistoricalDRepState::from_config(&cfg)); + if let Some(info) = entry.info.as_mut() { + info.deposit = record.deposit; + info.expired = false; + info.retired = false; + info.active_epoch = None; + info.last_active_epoch = 0; // unknown from snapshot + } + } + } + } +} + fn drep_choice_to_credential(choice: &DRepChoice) -> Option { match choice { DRepChoice::Key(k) => Some(DRepCredential::AddrKeyHash(*k)), diff --git a/modules/snapshot_bootstrapper/src/publisher.rs b/modules/snapshot_bootstrapper/src/publisher.rs index 64b3b11c..21f32ff3 100644 --- a/modules/snapshot_bootstrapper/src/publisher.rs +++ b/modules/snapshot_bootstrapper/src/publisher.rs @@ -1,4 +1,5 @@ use acropolis_common::epoch_snapshot::SnapshotsContainer; +use acropolis_common::messages::DRepBootstrapMessage; use acropolis_common::protocol_params::{Nonces, PraosParams}; use acropolis_common::snapshot::protocol_parameters::ProtocolParameters; use acropolis_common::snapshot::{AccountsCallback, SnapshotsCallback}; @@ -11,14 +12,16 @@ use acropolis_common::{ }, params::EPOCH_LENGTH, snapshot::streaming_snapshot::{ - DRepCallback, DRepInfo, EpochCallback, GovernanceProposal, + DRepCallback, DRepRecord, EpochCallback, GovernanceProposal, GovernanceProtocolParametersCallback, PoolCallback, ProposalCallback, SnapshotCallbacks, SnapshotMetadata, UtxoCallback, UtxoEntry, }, - BlockInfo, EpochBootstrapData, + stake_addresses::AccountState, + BlockInfo, DRepCredential, EpochBootstrapData, }; use anyhow::Result; use caryatid_sdk::Context; +use std::collections::HashMap; use std::sync::Arc; use tracing::info; @@ -82,7 +85,8 @@ pub struct SnapshotPublisher { metadata: Option, utxo_count: u64, pools: SPOState, - dreps: Vec, + accounts: Vec, + dreps_len: usize, proposals: Vec, epoch_context: EpochContext, } @@ -101,7 +105,8 @@ impl SnapshotPublisher { metadata: None, utxo_count: 0, pools: SPOState::new(), - dreps: Vec::new(), + accounts: Vec::new(), + dreps_len: 0, proposals: Vec::new(), epoch_context, } @@ -258,10 +263,26 @@ impl AccountsCallback for SnapshotPublisher { } impl DRepCallback for SnapshotPublisher { - fn on_dreps(&mut self, dreps: Vec) -> Result<()> { - info!("Received {} DReps", dreps.len()); - self.dreps.extend(dreps); - // TODO: Accumulate DRep data if needed or send in chunks to DRepState processor + fn on_dreps(&mut self, epoch: u64, dreps: HashMap) -> Result<()> { + info!("Received {} DReps for epoch {}", dreps.len(), epoch); + self.dreps_len += dreps.len(); + // Send a message to the DRepState + let message = Arc::new(Message::Snapshot(SnapshotMessage::Bootstrap( + SnapshotStateMessage::DRepState(DRepBootstrapMessage { dreps, epoch }), + ))); + + // Clone what we need for the async task + let context = self.context.clone(); + let snapshot_topic = self.snapshot_topic.clone(); + + // See comment in AccountsCallback::on_accounts for why we block here. + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async move { + context.publish(&snapshot_topic, message).await.unwrap_or_else(|e| { + tracing::error!("Failed to publish DRepBootstrap message: {}", e) + }); + }) + }); Ok(()) } } @@ -403,7 +424,8 @@ impl SnapshotCallbacks for SnapshotPublisher { self.pools.updates.len(), self.pools.retiring.len() ); - info!(" - DReps: {}", self.dreps.len()); + info!(" - Accounts: {}", self.accounts.len()); + info!(" - DReps: {}", self.dreps_len); info!(" - Proposals: {}", self.proposals.len()); // Note: AccountsBootstrapMessage is now published via on_accounts callback