From 31323605a221e5dae1d47b8394abd26316acf405 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 24 Nov 2025 15:34:21 +0100 Subject: [PATCH 1/8] refactor: publish epoch nonce as separate message --- common/src/messages.rs | 1 + common/src/validation.rs | 13 ++++-- .../src/block_vrf_validator.rs | 36 ++++++++-------- .../src/ouroboros/praos.rs | 1 + .../src/ouroboros/tpraos.rs | 7 +++- .../src/ouroboros/vrf_validation.rs | 11 +++-- modules/block_vrf_validator/src/state.rs | 8 ++-- .../epochs_state/src/epoch_nonce_publisher.rs | 41 +++++++++++++++++++ modules/epochs_state/src/epochs_state.rs | 28 ++++++++++++- modules/epochs_state/src/state.rs | 7 +++- .../rest_blockfrost/src/handlers/epochs.rs | 2 +- 11 files changed, 123 insertions(+), 32 deletions(-) create mode 100644 modules/epochs_state/src/epoch_nonce_publisher.rs diff --git a/common/src/messages.rs b/common/src/messages.rs index 674187dc..39cd3f99 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -315,6 +315,7 @@ pub enum CardanoMessage { PotDeltas(PotDeltasMessage), // Changes to pot balances BlockInfoMessage(BlockTxsMessage), // Transaction Info (total count, total output, total fees in a block) EpochActivity(EpochActivityMessage), // Total fees and VRF keys for an epoch + EpochNonce(Option), // Epoch nonce for the current epoch DRepState(DRepStateMessage), // Active DReps at epoch end SPOState(SPOStateMessage), // Active SPOs at epoch end GovernanceProcedures(GovernanceProceduresMessage), // Governance procedures received diff --git a/common/src/validation.rs b/common/src/validation.rs index d92c06bd..139e8526 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -7,7 +7,10 @@ use std::array::TryFromSliceError; use thiserror::Error; -use crate::{protocol_params::Nonce, GenesisKeyhash, PoolId, Slot, VrfKeyHash}; +use crate::{ + protocol_params::Nonce, rational_number::RationalNumber, GenesisKeyhash, PoolId, Slot, + VrfKeyHash, +}; /// Validation error #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Error)] @@ -166,8 +169,12 @@ impl PartialEq for PraosBadVrfProofError { // ------------------------------------------------------------ VrfLeaderValueTooBigError #[derive(Error, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum VrfLeaderValueTooBigError { - #[error("VRF Leader Value Too Big")] - VrfLeaderValueTooBig, + #[error("VRF Leader Value Too Big: pool_id={pool_id}, active_stake={active_stake}, relative_stake={relative_stake}")] + VrfLeaderValueTooBig { + pool_id: PoolId, + active_stake: u64, + relative_stake: RationalNumber, + }, } // ------------------------------------------------------------ BadVrfProofError diff --git a/modules/block_vrf_validator/src/block_vrf_validator.rs b/modules/block_vrf_validator/src/block_vrf_validator.rs index ccfeb3f0..531942fa 100644 --- a/modules/block_vrf_validator/src/block_vrf_validator.rs +++ b/modules/block_vrf_validator/src/block_vrf_validator.rs @@ -33,8 +33,8 @@ const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( "protocol-parameters-subscribe-topic", "cardano.protocol.parameters", ); -const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = - ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); +const DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC: (&str, &str) = + ("epoch-nonce-subscribe-topic", "cardano.epoch.nonce"); const DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC: (&str, &str) = ("spo-state-subscribe-topic", "cardano.spo.state"); const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) = @@ -57,7 +57,7 @@ impl BlockVrfValidator { mut bootstrapped_subscription: Box>, mut block_subscription: Box>, mut protocol_parameters_subscription: Box>, - mut epoch_activity_subscription: Box>, + mut epoch_nonce_subscription: Box>, mut spo_state_subscription: Box>, mut spdd_subscription: Box>, ) -> Result<()> { @@ -90,7 +90,7 @@ impl BlockVrfValidator { if is_new_epoch { // read epoch boundary messages let protocol_parameters_message_f = protocol_parameters_subscription.read(); - let epoch_activity_message_f = epoch_activity_subscription.read(); + let epoch_nonce_message_f = epoch_nonce_subscription.read(); let spo_state_message_f = spo_state_subscription.read(); let spdd_msg_f = spdd_subscription.read(); @@ -107,17 +107,20 @@ impl BlockVrfValidator { _ => error!("Unexpected message type: {protocol_parameters_msg:?}"), }); - let (_, epoch_activity_msg) = epoch_activity_message_f.await?; + let (_, epoch_nonce_msg) = epoch_nonce_message_f.await?; let span = info_span!( - "block_vrf_validator.handle_epoch_activity", + "block_vrf_validator.handle_epoch_nonce", epoch = block_info.epoch ); - span.in_scope(|| match epoch_activity_msg.as_ref() { - Message::Cardano((block_info, CardanoMessage::EpochActivity(msg))) => { + span.in_scope(|| match epoch_nonce_msg.as_ref() { + Message::Cardano(( + block_info, + CardanoMessage::EpochNonce(active_nonce), + )) => { Self::check_sync(¤t_block, block_info); - state.handle_epoch_activity(msg); + state.handle_epoch_nonce(active_nonce); } - _ => error!("Unexpected message type: {epoch_activity_msg:?}"), + _ => error!("Unexpected message type: {epoch_nonce_msg:?}"), }); let (_, spo_state_msg) = spo_state_message_f.await?; @@ -195,10 +198,10 @@ impl BlockVrfValidator { .unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string()); info!("Creating block subscription on '{block_subscribe_topic}'"); - let epoch_activity_subscribe_topic = config - .get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0) - .unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string()); - info!("Creating epoch activity subscription on '{epoch_activity_subscribe_topic}'"); + let epoch_nonce_subscribe_topic = config + .get_string(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating epoch nonce subscription on '{epoch_nonce_subscribe_topic}'"); let spo_state_subscribe_topic = config .get_string(DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC.0) @@ -219,8 +222,7 @@ impl BlockVrfValidator { let protocol_parameters_subscription = context.subscribe(&protocol_parameters_subscribe_topic).await?; let block_subscription = context.subscribe(&block_subscribe_topic).await?; - let epoch_activity_subscription = - context.subscribe(&epoch_activity_subscribe_topic).await?; + let epoch_nonce_subscription = context.subscribe(&epoch_nonce_subscribe_topic).await?; let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?; let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?; @@ -238,7 +240,7 @@ impl BlockVrfValidator { bootstrapped_subscription, block_subscription, protocol_parameters_subscription, - epoch_activity_subscription, + epoch_nonce_subscription, spo_state_subscription, spdd_subscription, ) diff --git a/modules/block_vrf_validator/src/ouroboros/praos.rs b/modules/block_vrf_validator/src/ouroboros/praos.rs index e6275faf..f26f8682 100644 --- a/modules/block_vrf_validator/src/ouroboros/praos.rs +++ b/modules/block_vrf_validator/src/ouroboros/praos.rs @@ -70,6 +70,7 @@ pub fn validate_vrf_praos<'a>( }), Box::new(move || { validate_vrf_leader_value( + &pool_id, &header.leader_vrf_output().map_err(|_| { VrfValidationError::Other("Leader VRF Output is not set".to_string()) })?[..], diff --git a/modules/block_vrf_validator/src/ouroboros/tpraos.rs b/modules/block_vrf_validator/src/ouroboros/tpraos.rs index 65f21e3d..05157e3b 100644 --- a/modules/block_vrf_validator/src/ouroboros/tpraos.rs +++ b/modules/block_vrf_validator/src/ouroboros/tpraos.rs @@ -97,6 +97,7 @@ pub fn validate_vrf_tpraos<'a>( }), Box::new(move || { validate_vrf_leader_value( + &pool_id, &leader_vrf_cert.0.to_vec()[..], &relative_stake, &active_slots_coeff, @@ -543,7 +544,11 @@ mod tests { assert_eq!( result.unwrap_err(), Box::new(VrfValidationError::VrfLeaderValueTooBig( - VrfLeaderValueTooBigError::VrfLeaderValueTooBig + VrfLeaderValueTooBigError::VrfLeaderValueTooBig { + pool_id, + active_stake: 75284250207, + relative_stake: RationalNumber::new(75284250207, 10177811974823000), + } )) ); } diff --git a/modules/block_vrf_validator/src/ouroboros/vrf_validation.rs b/modules/block_vrf_validator/src/ouroboros/vrf_validation.rs index 5ca77b25..79a51dd9 100644 --- a/modules/block_vrf_validator/src/ouroboros/vrf_validation.rs +++ b/modules/block_vrf_validator/src/ouroboros/vrf_validation.rs @@ -159,6 +159,7 @@ pub fn validate_praos_vrf_proof( /// We are using Pallas Math Library /// pub fn validate_vrf_leader_value( + pool_id: &PoolId, leader_vrf_output: &[u8], leader_relative_stake: &RationalNumber, active_slot_coeff: &RationalNumber, @@ -166,7 +167,7 @@ pub fn validate_vrf_leader_value( let certified_leader_vrf = &FixedDecimal::from(leader_vrf_output); let output_size_bits = leader_vrf_output.len() * 8; let cert_nat_max = FixedDecimal::from(UBig::ONE << output_size_bits); - let leader_relative_stake = FixedDecimal::from(UBig::from(*leader_relative_stake.numer())) + let leader_relative_stake_de = FixedDecimal::from(UBig::from(*leader_relative_stake.numer())) / FixedDecimal::from(UBig::from(*leader_relative_stake.denom())); let active_slot_coeff = FixedDecimal::from(UBig::from(*active_slot_coeff.numer())) / FixedDecimal::from(UBig::from(*active_slot_coeff.denom())); @@ -174,12 +175,16 @@ pub fn validate_vrf_leader_value( let denominator = &cert_nat_max - certified_leader_vrf; let recip_q = &cert_nat_max / &denominator; let c = (&FixedDecimal::from(1u64) - &active_slot_coeff).ln(); - let x = -(leader_relative_stake * c); + let x = -(leader_relative_stake_de * c); let ordering = x.exp_cmp(1000, 3, &recip_q); match ordering.estimation { ExpOrdering::LT => Ok(()), ExpOrdering::GT | ExpOrdering::UNKNOWN => { - Err(VrfLeaderValueTooBigError::VrfLeaderValueTooBig) + Err(VrfLeaderValueTooBigError::VrfLeaderValueTooBig { + pool_id: *pool_id, + active_stake: *leader_relative_stake.numer(), + relative_stake: *leader_relative_stake, + }) } } } diff --git a/modules/block_vrf_validator/src/state.rs b/modules/block_vrf_validator/src/state.rs index 03d4a877..f8d5f94d 100644 --- a/modules/block_vrf_validator/src/state.rs +++ b/modules/block_vrf_validator/src/state.rs @@ -5,9 +5,7 @@ use std::sync::Arc; use crate::{ouroboros, snapshot::Snapshot}; use acropolis_common::{ genesis_values::GenesisValues, - messages::{ - EpochActivityMessage, ProtocolParamsMessage, SPOStakeDistributionMessage, SPOStateMessage, - }, + messages::{ProtocolParamsMessage, SPOStakeDistributionMessage, SPOStateMessage}, protocol_params::Nonce, rational_number::RationalNumber, validation::VrfValidationError, @@ -62,8 +60,8 @@ impl State { } } - pub fn handle_epoch_activity(&mut self, msg: &EpochActivityMessage) { - self.epoch_nonce = msg.nonce.clone(); + pub fn handle_epoch_nonce(&mut self, active_nonce: &Option) { + self.epoch_nonce = active_nonce.clone(); } pub fn handle_new_snapshot( diff --git a/modules/epochs_state/src/epoch_nonce_publisher.rs b/modules/epochs_state/src/epoch_nonce_publisher.rs new file mode 100644 index 00000000..c2509263 --- /dev/null +++ b/modules/epochs_state/src/epoch_nonce_publisher.rs @@ -0,0 +1,41 @@ +use acropolis_common::{ + messages::{CardanoMessage, Message}, + protocol_params::Nonce, + BlockInfo, +}; +use caryatid_sdk::Context; +use std::sync::Arc; + +/// Message publisher for Epoch Nonce Message +pub struct EpochNoncePublisher { + /// Module context + context: Arc>, + + /// Topic to publish on + topic: String, +} + +impl EpochNoncePublisher { + /// Construct with context and topic to publish on + pub fn new(context: Arc>, topic: String) -> Self { + Self { context, topic } + } + + /// Publish the Epoch Nonce Message + pub async fn publish( + &mut self, + block_info: &BlockInfo, + nonce: Option, + ) -> anyhow::Result<()> { + self.context + .message_bus + .publish( + &self.topic, + Arc::new(Message::Cardano(( + block_info.clone(), + CardanoMessage::EpochNonce(nonce), + ))), + ) + .await + } +} diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 17a13017..c2d77d78 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -18,8 +18,11 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span}; mod epoch_activity_publisher; +mod epoch_nonce_publisher; mod state; -use crate::epoch_activity_publisher::EpochActivityPublisher; +use crate::{ + epoch_activity_publisher::EpochActivityPublisher, epoch_nonce_publisher::EpochNoncePublisher, +}; use state::State; const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -37,6 +40,8 @@ const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ( const DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC: (&str, &str) = ("epoch-activity-publish-topic", "cardano.epoch.activity"); +const DEFAULT_EPOCH_NONCE_PUBLISH_TOPIC: (&str, &str) = + ("epoch-nonce-publish-topic", "cardano.epoch.nonce"); /// Epochs State module #[module( @@ -55,6 +60,7 @@ impl EpochsState { mut block_txs_subscription: Box>, mut protocol_parameters_subscription: Box>, mut epoch_activity_publisher: EpochActivityPublisher, + mut epoch_nonce_publisher: EpochNoncePublisher, ) -> Result<()> { let (_, bootstrapped_message) = bootstrapped_subscription.read().await?; let genesis = match bootstrapped_message.as_ref() { @@ -129,6 +135,18 @@ impl EpochsState { } }); + // At the beginning of epoch, publish the newly evolved active nonce + // for that epoch + if is_new_epoch { + let active_nonce = state.get_active_nonce(); + epoch_nonce_publisher + .publish(block_info, active_nonce) + .await + .unwrap_or_else(|e| { + error!("Failed to publish epoch nonce messages: {e}") + }); + } + let span = info_span!("epochs_state.handle_mint", block = block_info.number); span.in_scope(|| { if let Some(header) = header.as_ref() { @@ -191,6 +209,11 @@ impl EpochsState { .unwrap_or(DEFAULT_EPOCH_ACTIVITY_PUBLISH_TOPIC.1.to_string()); info!("Publishing EpochActivityMessage on '{epoch_activity_publish_topic}'"); + let epoch_nonce_publish_topic = config + .get_string(DEFAULT_EPOCH_NONCE_PUBLISH_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_NONCE_PUBLISH_TOPIC.1.to_string()); + info!("Publishing EpochNonceMessage on '{epoch_nonce_publish_topic}'"); + // query topic let epochs_query_topic = config .get_string(DEFAULT_EPOCHS_QUERY_TOPIC.0) @@ -214,6 +237,8 @@ impl EpochsState { // Publisher let epoch_activity_publisher = EpochActivityPublisher::new(context.clone(), epoch_activity_publish_topic); + let epoch_nonce_publisher = + EpochNoncePublisher::new(context.clone(), epoch_nonce_publish_topic); // handle epochs query context.handle(&epochs_query_topic, move |message| { @@ -261,6 +286,7 @@ impl EpochsState { block_txs_subscription, protocol_parameters_subscription, epoch_activity_publisher, + epoch_nonce_publisher, ) .await .unwrap_or_else(|e| error!("Failed: {e}")); diff --git a/modules/epochs_state/src/state.rs b/modules/epochs_state/src/state.rs index 3f379e0e..b783d0f8 100644 --- a/modules/epochs_state/src/state.rs +++ b/modules/epochs_state/src/state.rs @@ -5,7 +5,7 @@ use acropolis_common::{ genesis_values::GenesisValues, messages::{BlockTxsMessage, EpochActivityMessage, ProtocolParamsMessage}, params::EPOCH_LENGTH, - protocol_params::{Nonces, PraosParams}, + protocol_params::{Nonce, Nonces, PraosParams}, BlockHash, BlockInfo, PoolId, }; use anyhow::Result; @@ -238,6 +238,11 @@ impl State { epoch_activity } + /// This function returns active nonce of current epoch's state + pub fn get_active_nonce(&self) -> Option { + self.nonces.as_ref().map(|n| n.active.clone()) + } + pub fn get_epoch_info(&self) -> EpochActivityMessage { EpochActivityMessage { epoch: self.epoch, diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index 5cb9e37e..cdf9426e 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -630,7 +630,7 @@ pub async fn handle_epoch_total_blocks_blockfrost( ))); let epoch_info = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.historical_epochs_query_topic, epoch_info_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( From 91453717aa2c7e4f73e9a583192928a5b68c5c4f Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 24 Nov 2025 16:40:33 +0100 Subject: [PATCH 2/8] fix: print leader value too big error --- common/src/validation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/validation.rs b/common/src/validation.rs index 139e8526..7c299129 100644 --- a/common/src/validation.rs +++ b/common/src/validation.rs @@ -64,7 +64,7 @@ pub enum VrfValidationError { PraosBadVrfProof(#[from] PraosBadVrfProofError), /// **Cause:** The VRF output is too large for this pool's stake. /// The pool lost the slot lottery - #[error("VRF Leader Value Too Big")] + #[error("{0}")] VrfLeaderValueTooBig(#[from] VrfLeaderValueTooBigError), /// **Cause:** This slot is in the overlay schedule but marked as non-active. /// It's an intentional gap slot where no blocks should be produced. From 5c99e586351fa3286ea3166b9c39557c3f93e079 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 25 Nov 2025 13:51:28 +0100 Subject: [PATCH 3/8] fix: cargo shear --- common/Cargo.toml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/common/Cargo.toml b/common/Cargo.toml index fc5e948d..be8619f5 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -29,7 +29,6 @@ regex = "1" serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } -tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } futures = "0.3.31" @@ -40,9 +39,12 @@ rayon = "1.11.0" cryptoxide = "0.5.1" thiserror = "2.0.17" sha2 = "0.10.8" -caryatid_process = { workspace = true } -config = { workspace = true } [lib] crate-type = ["rlib"] path = "src/lib.rs" + +[dev-dependencies] +caryatid_process = { workspace = true } +config = { workspace = true } +tempfile = "3" From 13f84fbff958086104f66a0e52305414edaeb9c8 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 25 Nov 2025 15:26:41 +0100 Subject: [PATCH 4/8] refactor: omnibus.toml --- processes/omnibus/omnibus.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 0548466d..b760a71c 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -65,7 +65,6 @@ store-stake-addresses = false [module.spdd-state] # Enables active_stakes in /epochs/latest | {number} endpoints -# Enables /epochs/{number}/{next | previous} store-spdd = false [module.historical-accounts-state] From a526b1a1305a29747cac06fde571fae087262f5b Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 26 Nov 2025 13:36:50 +0100 Subject: [PATCH 5/8] fix: publish spdd message and store it before handling the first block's info --- modules/accounts_state/src/accounts_state.rs | 33 ++++++++++---------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index f5eee598..9cbd1363 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -159,6 +159,23 @@ impl AccountsState { None => None, }; + // Publish SPDD message before anything else and store spdd history if enabled + if let Some(block_info) = current_block.as_ref() { + let spdd = state.generate_spdd(); + if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { + error!("Error publishing SPO stake distribution: {e:#}") + } + + // if we store spdd history + let spdd_state = state.dump_spdd_state(); + if let Some(mut spdd_store) = spdd_store_guard { + // active stakes taken at beginning of epoch i is for epoch + 1 + if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) { + error!("Error storing SPDD state: {e:#}") + } + } + } + // Handle DRep let (_, message) = dreps_message_f.await?; match message.as_ref() { @@ -195,22 +212,6 @@ impl AccountsState { .handle_spo_state(spo_msg) .inspect_err(|e| error!("SPOState handling error: {e:#}")) .ok(); - - let spdd = state.generate_spdd(); - if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { - error!("Error publishing SPO stake distribution: {e:#}") - } - - // if we store spdd history - let spdd_state = state.dump_spdd_state(); - if let Some(mut spdd_store) = spdd_store_guard { - // active stakes taken at beginning of epoch i is for epoch + 1 - if let Err(e) = - spdd_store.store_spdd(block_info.epoch + 1, spdd_state) - { - error!("Error storing SPDD state: {e:#}") - } - } } .instrument(span) .await; From 6c22adb1fb1d2fe864018bd18a78fa68aed6bd3c Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 26 Nov 2025 13:59:21 +0100 Subject: [PATCH 6/8] refactor: add spdd store config with clear on start option --- modules/accounts_state/src/accounts_state.rs | 32 ++++----- .../src/spo_distribution_store.rs | 66 ++++++++++--------- processes/omnibus/omnibus.toml | 1 + 3 files changed, 49 insertions(+), 50 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 9cbd1363..a0026037 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -8,7 +8,6 @@ use acropolis_common::{ BlockInfo, BlockStatus, }; use anyhow::Result; -use bigdecimal::Zero; use caryatid_sdk::{message_bus::Subscription, module, Context}; use config::Config; use std::sync::Arc; @@ -35,7 +34,7 @@ use acropolis_common::queries::accounts::{ use acropolis_common::queries::errors::QueryError; use verifier::Verifier; -use crate::spo_distribution_store::SPDDStore; +use crate::spo_distribution_store::{SPDDStore, SPDDStoreConfig}; mod spo_distribution_store; const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state"; @@ -53,6 +52,7 @@ const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./fjall-spdd"); const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0); +const DEFAULT_SPDD_CLEAR_ON_START: (&str, bool) = ("spdd-clear-on-start", true); /// Accounts State module #[module( @@ -427,24 +427,18 @@ impl AccountsState { .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); info!("Creating stake reward deltas publisher on '{stake_reward_deltas_topic}'"); + // SPDD configs let spdd_db_path = config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); - - // Convert to absolute path if relative - let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() { - spdd_db_path - } else { - let current_dir = std::env::current_dir() - .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?; - current_dir.join(&spdd_db_path).to_string_lossy().to_string() - }; - - // Get SPDD retention epochs configuration + info!("SPDD database path: {spdd_db_path}"); let spdd_retention_epochs = config .get_int(DEFAULT_SPDD_RETENTION_EPOCHS.0) .unwrap_or(DEFAULT_SPDD_RETENTION_EPOCHS.1 as i64) .max(0) as u64; info!("SPDD retention epochs: {:?}", spdd_retention_epochs); + let spdd_clear_on_start = + config.get_bool(DEFAULT_SPDD_CLEAR_ON_START.0).unwrap_or(DEFAULT_SPDD_CLEAR_ON_START.1); + info!("SPDD clear on start: {spdd_clear_on_start}"); // Query topics let accounts_query_topic = config @@ -474,11 +468,13 @@ impl AccountsState { let history_tick = history.clone(); // Spdd store - let spdd_store = if !spdd_retention_epochs.is_zero() { - Some(Arc::new(Mutex::new(SPDDStore::load( - std::path::Path::new(&spdd_db_path), - spdd_retention_epochs, - )?))) + let spdd_store_config = SPDDStoreConfig { + path: spdd_db_path, + retention_epochs: spdd_retention_epochs, + clear_on_start: spdd_clear_on_start, + }; + let spdd_store = if spdd_store_config.is_enabled() { + Some(Arc::new(Mutex::new(SPDDStore::new(&spdd_store_config)?))) } else { None }; diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index d051eb02..20f4e8e7 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -1,5 +1,6 @@ use acropolis_common::{PoolId, StakeAddress}; use anyhow::Result; +use bigdecimal::Zero; use fjall::{Config, Keyspace, PartitionCreateOptions}; use std::collections::HashMap; @@ -43,6 +44,19 @@ fn encode_epoch_marker(epoch: u64) -> Vec { epoch.to_be_bytes().to_vec() } +#[derive(Debug, Clone)] +pub struct SPDDStoreConfig { + pub path: String, + pub retention_epochs: u64, + pub clear_on_start: bool, +} + +impl SPDDStoreConfig { + pub fn is_enabled(&self) -> bool { + !self.retention_epochs.is_zero() + } +} + pub struct SPDDStore { keyspace: Keyspace, /// Partition for all SPDD data @@ -58,10 +72,9 @@ pub struct SPDDStore { } impl SPDDStore { - #[allow(dead_code)] - pub fn new(path: impl AsRef, retention_epochs: u64) -> fjall::Result { - let path = path.as_ref(); - if path.exists() { + pub fn new(config: &SPDDStoreConfig) -> fjall::Result { + let path = std::path::Path::new(&config.path); + if config.clear_on_start && path.exists() { std::fs::remove_dir_all(path)?; } @@ -74,23 +87,7 @@ impl SPDDStore { keyspace, spdd, epoch_markers, - retention_epochs, - }) - } - - pub fn load(path: impl AsRef, retention_epochs: u64) -> fjall::Result { - let path = path.as_ref(); - - let keyspace = Config::new(path).open()?; - let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?; - let epoch_markers = - keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?; - - Ok(Self { - keyspace, - spdd, - epoch_markers, - retention_epochs, + retention_epochs: config.retention_epochs, }) } @@ -237,11 +234,18 @@ mod tests { StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), Mainnet) } + fn spdd_store_config(retention_epochs: u64) -> SPDDStoreConfig { + SPDDStoreConfig { + path: TempDir::new().unwrap().path().to_string_lossy().into_owned(), + retention_epochs, + clear_on_start: true, + } + } + #[test] fn test_store_and_query_spdd() { - let temp_dir = TempDir::new().unwrap(); - let mut spdd_store = - SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + let config = spdd_store_config(10); + let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( @@ -273,9 +277,8 @@ mod tests { #[test] fn test_retention_pruning() { - let temp_dir = TempDir::new().unwrap(); - let mut spdd_store = - SPDDStore::new(temp_dir.path(), 2).expect("Failed to create SPDD store"); + let config = spdd_store_config(2); + let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store"); // Store epochs 1, 2, 3 for epoch in 1..=3 { @@ -302,8 +305,8 @@ mod tests { #[test] fn test_query_incomplete_epoch() { - let temp_dir = TempDir::new().unwrap(); - let spdd_store = SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + let config = spdd_store_config(10); + let spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store"); assert!(!spdd_store.is_epoch_complete(999).unwrap()); assert!(spdd_store.query_by_epoch(999).is_err()); @@ -312,9 +315,8 @@ mod tests { #[test] fn test_remove_epoch_data() { - let temp_dir = TempDir::new().unwrap(); - let mut spdd_store = - SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store"); + let config = spdd_store_config(10); + let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index b760a71c..d7b094bf 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -121,6 +121,7 @@ write-full-cache = "false" # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints spdd-retention-epochs = 0 spdd-db-path = "./fjall-spdd" +spdd-clear-on-start = true # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" From 0c2602b3134e2db6035f76b3b0432c980a880372 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 26 Nov 2025 15:36:39 +0100 Subject: [PATCH 7/8] fix: remove reward in total active stakes; not sure this is the correct solution though --- modules/block_vrf_validator/src/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/block_vrf_validator/src/snapshot.rs b/modules/block_vrf_validator/src/snapshot.rs index 72de3eb5..533f372c 100644 --- a/modules/block_vrf_validator/src/snapshot.rs +++ b/modules/block_vrf_validator/src/snapshot.rs @@ -26,7 +26,7 @@ impl From<(&SPOStateMessage, &SPOStakeDistributionMessage)> for Snapshot { .collect(); let active_stakes: HashMap = spdd_msg.spos.iter().map(|(pool_id, stake)| (*pool_id, stake.live)).collect(); - let total_active_stakes = active_stakes.values().sum(); + let total_active_stakes = spdd_msg.spos.iter().map(|(_, stake)| stake.active).sum(); Self { active_spos, active_stakes, From 08cc6fb8ee3ba0055ea2ffc1fdbcff7232eab03b Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 1 Dec 2025 04:38:32 +0100 Subject: [PATCH 8/8] fix: correct total active stakes --- modules/block_vrf_validator/src/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/block_vrf_validator/src/snapshot.rs b/modules/block_vrf_validator/src/snapshot.rs index 533f372c..72de3eb5 100644 --- a/modules/block_vrf_validator/src/snapshot.rs +++ b/modules/block_vrf_validator/src/snapshot.rs @@ -26,7 +26,7 @@ impl From<(&SPOStateMessage, &SPOStakeDistributionMessage)> for Snapshot { .collect(); let active_stakes: HashMap = spdd_msg.spos.iter().map(|(pool_id, stake)| (*pool_id, stake.live)).collect(); - let total_active_stakes = spdd_msg.spos.iter().map(|(_, stake)| stake.active).sum(); + let total_active_stakes = active_stakes.values().sum(); Self { active_spos, active_stakes,