From ddf9941ac9cdbf6a09ce7d787d612791b2316638 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Thu, 11 Dec 2025 14:54:19 +0100 Subject: [PATCH] fix: apply previous epoch rewards before snapshot --- modules/accounts_state/src/accounts_state.rs | 72 +++++++++++++------- modules/accounts_state/src/state.rs | 33 ++++++--- 2 files changed, 71 insertions(+), 34 deletions(-) diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 11c31b62..af120830 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -12,7 +12,7 @@ use anyhow::Result; use caryatid_sdk::{message_bus::Subscription, module, Context}; use config::Config; use std::sync::Arc; -use tokio::{join, sync::Mutex}; +use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; mod drep_distribution_publisher; @@ -155,23 +155,52 @@ impl AccountsState { } // Read from epoch-boundary messages only when it's a new epoch + // NEWEPOCH ticks if new_epoch { - let spdd_store_guard = match spdd_store.as_ref() { - Some(s) => Some(s.lock().await), - None => None, + // Applies reewards from previous epoch + let previous_epoch_rewards_result = state + .complete_previous_epoch_rewards_calculation(verifier) + .await + .inspect_err(|e| error!("Previous epoch rewards calculation error: {e:#}")) + .ok(); + + let mut stake_reward_deltas = if let Some(block_info) = current_block.as_ref() { + if let Some((spo_rewards, stake_reward_deltas)) = previous_epoch_rewards_result + { + // publish spo rewards + spo_rewards_publisher + .publish_spo_rewards(block_info, spo_rewards) + .await + .inspect_err(|e| error!("Error publishing SPO rewards: {e:#}")) + .ok(); + stake_reward_deltas + } else { + Vec::new() + } + } else { + Vec::new() }; - // Publish SPDD message before anything else and store spdd history if enabled + // EPOCH rule + // a. SNAP: Take the snapshot and pool distribution + // rotate the snapshots (mark, set, go) + // b. POOLREAP: for any retiring pools, refund, + // remove from pool registry, clear delegations + if let Some(block_info) = current_block.as_ref() { let spdd = state.generate_spdd(); if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await { error!("Error publishing SPO stake distribution: {e:#}") } - // if we store spdd history - let spdd_state = state.dump_spdd_state(); + // store spdd history if enabled + let spdd_store_guard = match spdd_store.as_ref() { + Some(s) => Some(s.lock().await), + None => None, + }; if let Some(mut spdd_store) = spdd_store_guard { - // active stakes taken at beginning of epoch i is for epoch + 1 + let spdd_state = state.dump_spdd_state(); + // stakes distribution taken at beginning of epoch i is active for epoch + 1 if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) { error!("Error storing SPDD state: {e:#}") } @@ -258,23 +287,16 @@ impl AccountsState { .await .inspect_err(|e| error!("EpochActivity handling error: {e:#}")) .ok(); - if let Some((spo_rewards, stake_reward_deltas)) = after_epoch_result { - // SPO Rewards Future - let spo_rewards_future = spo_rewards_publisher - .publish_spo_rewards(block_info, spo_rewards); - // Stake Reward Deltas Future - let stake_reward_deltas_future = stake_reward_deltas_publisher - .publish_stake_reward_deltas(block_info, stake_reward_deltas); - - // publish in parallel - let (spo_rewards_result, stake_reward_deltas_result) = - join!(spo_rewards_future, stake_reward_deltas_future); - spo_rewards_result.unwrap_or_else(|e| { - error!("Error publishing SPO rewards: {e:#}") - }); - stake_reward_deltas_result.unwrap_or_else(|e| { - error!("Error publishing stake reward deltas: {e:#}") - }); + if let Some(refund_deltas) = after_epoch_result { + // publish stake reward deltas + stake_reward_deltas.extend(refund_deltas); + stake_reward_deltas_publisher + .publish_stake_reward_deltas(block_info, stake_reward_deltas) + .await + .inspect_err(|e| { + error!("Error publishing stake reward deltas: {e:#}") + }) + .ok(); } } .instrument(span) diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 40d5dbb7..a5e4ccd0 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -590,20 +590,21 @@ impl State { Ok(()) } - /// Handle an EpochActivityMessage giving total fees and block counts by SPO for - /// the just-ended epoch - /// This also returns SPO rewards for publishing to the SPDD topic (For epoch N) - /// and stake reward deltas for publishing to the StakeRewardDeltas topic (For epoch N) - pub async fn handle_epoch_activity( + /// Complete the previous epoch rewards calculation + /// And apply the rewards to the stake_addresses + /// This function is called at NEWEPOCH tick from epoch N-1 to N + /// + /// This also returns SPO rewards (from epoch N-1) for publishing to the SPDD topic + /// and stake reward deltas for publishing to the StakeRewardDeltas topic + pub async fn complete_previous_epoch_rewards_calculation( &mut self, - ea_msg: &EpochActivityMessage, verifier: &Verifier, ) -> Result<(Vec<(PoolId, SPORewards)>, Vec)> { - let mut spo_rewards: Vec<(PoolId, SPORewards)> = Vec::new(); // Collect stake addresses reward deltas + let mut spo_rewards: Vec<(PoolId, SPORewards)> = Vec::new(); let mut reward_deltas = Vec::::new(); - // Check previous epoch work is done + // Check previous epoch rewards calculation is done let mut task = { match self.epoch_rewards_task.lock() { Ok(mut task) => task.take(), @@ -667,6 +668,20 @@ impl State { } }; + Ok((spo_rewards, reward_deltas)) + } + + /// Handle an EpochActivityMessage giving total fees and block counts by SPO for + /// the just-ended epoch + /// + /// This returns stake reward deltas (Refund for pools retiring at epoch N) for publishing to the StakeRewardDeltas topic + pub async fn handle_epoch_activity( + &mut self, + ea_msg: &EpochActivityMessage, + verifier: &Verifier, + ) -> Result> { + let mut reward_deltas = Vec::::new(); + // Map block counts, filtering out SPOs we don't know (OBFT in early Shelley) let spo_blocks: HashMap = ea_msg .spo_blocks @@ -683,7 +698,7 @@ impl State { verifier, )?); - Ok((spo_rewards, reward_deltas)) + Ok(reward_deltas) } /// Handle an SPOStateMessage with the full set of SPOs valid at the end of the last