Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use anyhow::Result;
use caryatid_sdk::{message_bus::Subscription, module, Context};
use config::Config;
use std::sync::Arc;
use tokio::{join, sync::Mutex};
use tokio::sync::Mutex;
use tracing::{error, info, info_span, Instrument};

mod drep_distribution_publisher;
Expand Down Expand Up @@ -155,23 +155,52 @@ impl AccountsState {
}

// Read from epoch-boundary messages only when it's a new epoch
// NEWEPOCH ticks
if new_epoch {
let spdd_store_guard = match spdd_store.as_ref() {
Some(s) => Some(s.lock().await),
None => None,
// Applies reewards from previous epoch
let previous_epoch_rewards_result = state
.complete_previous_epoch_rewards_calculation(verifier)
.await
.inspect_err(|e| error!("Previous epoch rewards calculation error: {e:#}"))
.ok();

let mut stake_reward_deltas = if let Some(block_info) = current_block.as_ref() {
if let Some((spo_rewards, stake_reward_deltas)) = previous_epoch_rewards_result
{
// publish spo rewards
spo_rewards_publisher
.publish_spo_rewards(block_info, spo_rewards)
.await
.inspect_err(|e| error!("Error publishing SPO rewards: {e:#}"))
.ok();
stake_reward_deltas
} else {
Vec::new()
}
} else {
Vec::new()
};

// Publish SPDD message before anything else and store spdd history if enabled
// EPOCH rule
// a. SNAP: Take the snapshot and pool distribution
// rotate the snapshots (mark, set, go)
// b. POOLREAP: for any retiring pools, refund,
// remove from pool registry, clear delegations

if let Some(block_info) = current_block.as_ref() {
let spdd = state.generate_spdd();
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}

// if we store spdd history
let spdd_state = state.dump_spdd_state();
// store spdd history if enabled
let spdd_store_guard = match spdd_store.as_ref() {
Some(s) => Some(s.lock().await),
None => None,
};
if let Some(mut spdd_store) = spdd_store_guard {
// active stakes taken at beginning of epoch i is for epoch + 1
let spdd_state = state.dump_spdd_state();
// stakes distribution taken at beginning of epoch i is active for epoch + 1
if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) {
error!("Error storing SPDD state: {e:#}")
}
Expand Down Expand Up @@ -258,23 +287,16 @@ impl AccountsState {
.await
.inspect_err(|e| error!("EpochActivity handling error: {e:#}"))
.ok();
if let Some((spo_rewards, stake_reward_deltas)) = after_epoch_result {
// SPO Rewards Future
let spo_rewards_future = spo_rewards_publisher
.publish_spo_rewards(block_info, spo_rewards);
// Stake Reward Deltas Future
let stake_reward_deltas_future = stake_reward_deltas_publisher
.publish_stake_reward_deltas(block_info, stake_reward_deltas);

// publish in parallel
let (spo_rewards_result, stake_reward_deltas_result) =
join!(spo_rewards_future, stake_reward_deltas_future);
spo_rewards_result.unwrap_or_else(|e| {
error!("Error publishing SPO rewards: {e:#}")
});
stake_reward_deltas_result.unwrap_or_else(|e| {
error!("Error publishing stake reward deltas: {e:#}")
});
if let Some(refund_deltas) = after_epoch_result {
// publish stake reward deltas
stake_reward_deltas.extend(refund_deltas);
stake_reward_deltas_publisher
.publish_stake_reward_deltas(block_info, stake_reward_deltas)
.await
.inspect_err(|e| {
error!("Error publishing stake reward deltas: {e:#}")
})
.ok();
}
}
.instrument(span)
Expand Down
33 changes: 24 additions & 9 deletions modules/accounts_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,20 +590,21 @@ impl State {
Ok(())
}

/// Handle an EpochActivityMessage giving total fees and block counts by SPO for
/// the just-ended epoch
/// This also returns SPO rewards for publishing to the SPDD topic (For epoch N)
/// and stake reward deltas for publishing to the StakeRewardDeltas topic (For epoch N)
pub async fn handle_epoch_activity(
/// Complete the previous epoch rewards calculation
/// And apply the rewards to the stake_addresses
/// This function is called at NEWEPOCH tick from epoch N-1 to N
///
/// This also returns SPO rewards (from epoch N-1) for publishing to the SPDD topic
/// and stake reward deltas for publishing to the StakeRewardDeltas topic
pub async fn complete_previous_epoch_rewards_calculation(
&mut self,
ea_msg: &EpochActivityMessage,
verifier: &Verifier,
) -> Result<(Vec<(PoolId, SPORewards)>, Vec<StakeRewardDelta>)> {
let mut spo_rewards: Vec<(PoolId, SPORewards)> = Vec::new();
// Collect stake addresses reward deltas
let mut spo_rewards: Vec<(PoolId, SPORewards)> = Vec::new();
let mut reward_deltas = Vec::<StakeRewardDelta>::new();

// Check previous epoch work is done
// Check previous epoch rewards calculation is done
let mut task = {
match self.epoch_rewards_task.lock() {
Ok(mut task) => task.take(),
Expand Down Expand Up @@ -667,6 +668,20 @@ impl State {
}
};

Ok((spo_rewards, reward_deltas))
}

/// Handle an EpochActivityMessage giving total fees and block counts by SPO for
/// the just-ended epoch
///
/// This returns stake reward deltas (Refund for pools retiring at epoch N) for publishing to the StakeRewardDeltas topic
pub async fn handle_epoch_activity(
&mut self,
ea_msg: &EpochActivityMessage,
verifier: &Verifier,
) -> Result<Vec<StakeRewardDelta>> {
let mut reward_deltas = Vec::<StakeRewardDelta>::new();

// Map block counts, filtering out SPOs we don't know (OBFT in early Shelley)
let spo_blocks: HashMap<PoolId, usize> = ea_msg
.spo_blocks
Expand All @@ -683,7 +698,7 @@ impl State {
verifier,
)?);

Ok((spo_rewards, reward_deltas))
Ok(reward_deltas)
}

/// Handle an SPOStateMessage with the full set of SPOs valid at the end of the last
Expand Down