Skip to content
1 change: 1 addition & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ pub enum CardanoMessage {
PotDeltas(PotDeltasMessage), // Changes to pot balances
BlockInfoMessage(BlockTxsMessage), // Transaction Info (total count, total output, total fees in a block)
EpochActivity(EpochActivityMessage), // Total fees and VRF keys for an epoch
EpochNonce(Option<Nonce>), // Epoch nonce for the current epoch
DRepState(DRepStateMessage), // Active DReps at epoch end
SPOState(SPOStateMessage), // Active SPOs at epoch end
GovernanceProcedures(GovernanceProceduresMessage), // Governance procedures received
Expand Down
15 changes: 11 additions & 4 deletions common/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::array::TryFromSliceError;

use thiserror::Error;

use crate::{protocol_params::Nonce, GenesisKeyhash, PoolId, Slot, VrfKeyHash};
use crate::{
protocol_params::Nonce, rational_number::RationalNumber, GenesisKeyhash, PoolId, Slot,
VrfKeyHash,
};

/// Validation error
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Error)]
Expand Down Expand Up @@ -61,7 +64,7 @@ pub enum VrfValidationError {
PraosBadVrfProof(#[from] PraosBadVrfProofError),
/// **Cause:** The VRF output is too large for this pool's stake.
/// The pool lost the slot lottery
#[error("VRF Leader Value Too Big")]
#[error("{0}")]
VrfLeaderValueTooBig(#[from] VrfLeaderValueTooBigError),
/// **Cause:** This slot is in the overlay schedule but marked as non-active.
/// It's an intentional gap slot where no blocks should be produced.
Expand Down Expand Up @@ -166,8 +169,12 @@ impl PartialEq for PraosBadVrfProofError {
// ------------------------------------------------------------ VrfLeaderValueTooBigError
#[derive(Error, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum VrfLeaderValueTooBigError {
#[error("VRF Leader Value Too Big")]
VrfLeaderValueTooBig,
#[error("VRF Leader Value Too Big: pool_id={pool_id}, active_stake={active_stake}, relative_stake={relative_stake}")]
VrfLeaderValueTooBig {
pool_id: PoolId,
active_stake: u64,
relative_stake: RationalNumber,
},
}

// ------------------------------------------------------------ BadVrfProofError
Expand Down
65 changes: 31 additions & 34 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use acropolis_common::{
BlockInfo, BlockStatus,
};
use anyhow::Result;
use bigdecimal::Zero;
use caryatid_sdk::{message_bus::Subscription, module, Context};
use config::Config;
use std::sync::Arc;
Expand All @@ -36,7 +35,7 @@ use acropolis_common::queries::accounts::{
use acropolis_common::queries::errors::QueryError;
use verifier::Verifier;

use crate::spo_distribution_store::SPDDStore;
use crate::spo_distribution_store::{SPDDStore, SPDDStoreConfig};
mod spo_distribution_store;

const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state";
Expand All @@ -54,6 +53,7 @@ const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas";

const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./fjall-spdd");
const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0);
const DEFAULT_SPDD_CLEAR_ON_START: (&str, bool) = ("spdd-clear-on-start", true);

/// Accounts State module
#[module(
Expand Down Expand Up @@ -161,6 +161,23 @@ impl AccountsState {
None => None,
};

// Publish SPDD message before anything else and store spdd history if enabled
if let Some(block_info) = current_block.as_ref() {
let spdd = state.generate_spdd();
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}

// if we store spdd history
let spdd_state = state.dump_spdd_state();
if let Some(mut spdd_store) = spdd_store_guard {
// active stakes taken at beginning of epoch i is for epoch + 1
if let Err(e) = spdd_store.store_spdd(block_info.epoch + 1, spdd_state) {
error!("Error storing SPDD state: {e:#}")
}
}
}

// Handle DRep
let (_, message) = drep_state_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Expand Down Expand Up @@ -197,22 +214,6 @@ impl AccountsState {
.handle_spo_state(spo_msg)
.inspect_err(|e| error!("SPOState handling error: {e:#}"))
.ok();

let spdd = state.generate_spdd();
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}

// if we store spdd history
let spdd_state = state.dump_spdd_state();
if let Some(mut spdd_store) = spdd_store_guard {
// active stakes taken at beginning of epoch i is for epoch + 1
if let Err(e) =
spdd_store.store_spdd(block_info.epoch + 1, spdd_state)
{
error!("Error storing SPDD state: {e:#}")
}
}
}
.instrument(span)
.await;
Expand Down Expand Up @@ -435,24 +436,18 @@ impl AccountsState {
.unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string());
info!("Creating stake reward deltas publisher on '{stake_reward_deltas_topic}'");

// SPDD configs
let spdd_db_path =
config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string());

// Convert to absolute path if relative
let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() {
spdd_db_path
} else {
let current_dir = std::env::current_dir()
.map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?;
current_dir.join(&spdd_db_path).to_string_lossy().to_string()
};

// Get SPDD retention epochs configuration
info!("SPDD database path: {spdd_db_path}");
let spdd_retention_epochs = config
.get_int(DEFAULT_SPDD_RETENTION_EPOCHS.0)
.unwrap_or(DEFAULT_SPDD_RETENTION_EPOCHS.1 as i64)
.max(0) as u64;
info!("SPDD retention epochs: {:?}", spdd_retention_epochs);
let spdd_clear_on_start =
config.get_bool(DEFAULT_SPDD_CLEAR_ON_START.0).unwrap_or(DEFAULT_SPDD_CLEAR_ON_START.1);
info!("SPDD clear on start: {spdd_clear_on_start}");

// Query topics
let accounts_query_topic = config
Expand Down Expand Up @@ -482,11 +477,13 @@ impl AccountsState {
let history_tick = history.clone();

// Spdd store
let spdd_store = if !spdd_retention_epochs.is_zero() {
Some(Arc::new(Mutex::new(SPDDStore::load(
std::path::Path::new(&spdd_db_path),
spdd_retention_epochs,
)?)))
let spdd_store_config = SPDDStoreConfig {
path: spdd_db_path,
retention_epochs: spdd_retention_epochs,
clear_on_start: spdd_clear_on_start,
};
let spdd_store = if spdd_store_config.is_enabled() {
Some(Arc::new(Mutex::new(SPDDStore::new(&spdd_store_config)?)))
} else {
None
};
Expand Down
66 changes: 34 additions & 32 deletions modules/accounts_state/src/spo_distribution_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use acropolis_common::{PoolId, StakeAddress};
use anyhow::Result;
use bigdecimal::Zero;
use fjall::{Config, Keyspace, PartitionCreateOptions};
use std::collections::HashMap;

Expand Down Expand Up @@ -43,6 +44,19 @@ fn encode_epoch_marker(epoch: u64) -> Vec<u8> {
epoch.to_be_bytes().to_vec()
}

#[derive(Debug, Clone)]
pub struct SPDDStoreConfig {
pub path: String,
pub retention_epochs: u64,
pub clear_on_start: bool,
}

impl SPDDStoreConfig {
pub fn is_enabled(&self) -> bool {
!self.retention_epochs.is_zero()
}
}

pub struct SPDDStore {
keyspace: Keyspace,
/// Partition for all SPDD data
Expand All @@ -58,10 +72,9 @@ pub struct SPDDStore {
}

impl SPDDStore {
#[allow(dead_code)]
pub fn new(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
let path = path.as_ref();
if path.exists() {
pub fn new(config: &SPDDStoreConfig) -> fjall::Result<Self> {
let path = std::path::Path::new(&config.path);
if config.clear_on_start && path.exists() {
std::fs::remove_dir_all(path)?;
}

Expand All @@ -74,23 +87,7 @@ impl SPDDStore {
keyspace,
spdd,
epoch_markers,
retention_epochs,
})
}

pub fn load(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
let path = path.as_ref();

let keyspace = Config::new(path).open()?;
let spdd = keyspace.open_partition("spdd", PartitionCreateOptions::default())?;
let epoch_markers =
keyspace.open_partition("epoch_markers", PartitionCreateOptions::default())?;

Ok(Self {
keyspace,
spdd,
epoch_markers,
retention_epochs,
retention_epochs: config.retention_epochs,
})
}

Expand Down Expand Up @@ -237,11 +234,18 @@ mod tests {
StakeAddress::new(StakeCredential::AddrKeyHash(keyhash_224(&[byte])), Mainnet)
}

fn spdd_store_config(retention_epochs: u64) -> SPDDStoreConfig {
SPDDStoreConfig {
path: TempDir::new().unwrap().path().to_string_lossy().into_owned(),
retention_epochs,
clear_on_start: true,
}
}

#[test]
fn test_store_and_query_spdd() {
let temp_dir = TempDir::new().unwrap();
let mut spdd_store =
SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
let config = spdd_store_config(10);
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");

let mut spdd_state: HashMap<PoolId, Vec<(StakeAddress, u64)>> = HashMap::new();
spdd_state.insert(
Expand Down Expand Up @@ -273,9 +277,8 @@ mod tests {

#[test]
fn test_retention_pruning() {
let temp_dir = TempDir::new().unwrap();
let mut spdd_store =
SPDDStore::new(temp_dir.path(), 2).expect("Failed to create SPDD store");
let config = spdd_store_config(2);
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");

// Store epochs 1, 2, 3
for epoch in 1..=3 {
Expand All @@ -302,8 +305,8 @@ mod tests {

#[test]
fn test_query_incomplete_epoch() {
let temp_dir = TempDir::new().unwrap();
let spdd_store = SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
let config = spdd_store_config(10);
let spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");

assert!(!spdd_store.is_epoch_complete(999).unwrap());
assert!(spdd_store.query_by_epoch(999).is_err());
Expand All @@ -312,9 +315,8 @@ mod tests {

#[test]
fn test_remove_epoch_data() {
let temp_dir = TempDir::new().unwrap();
let mut spdd_store =
SPDDStore::new(temp_dir.path(), 10).expect("Failed to create SPDD store");
let config = spdd_store_config(10);
let mut spdd_store = SPDDStore::new(&config).expect("Failed to create SPDD store");

let mut spdd_state: HashMap<PoolId, Vec<(StakeAddress, u64)>> = HashMap::new();
spdd_state.insert(
Expand Down
41 changes: 22 additions & 19 deletions modules/block_vrf_validator/src/block_vrf_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = (
"protocol-parameters-subscribe-topic",
"cardano.protocol.parameters",
);
const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) =
("epoch-activity-subscribe-topic", "cardano.epoch.activity");
const DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC: (&str, &str) =
("epoch-nonce-subscribe-topic", "cardano.epoch.nonce");
const DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC: (&str, &str) =
("spo-state-subscribe-topic", "cardano.spo.state");
const DEFAULT_SPDD_SUBSCRIBE_TOPIC: (&str, &str) =
Expand All @@ -58,7 +58,7 @@ impl BlockVrfValidator {
mut bootstrapped_subscription: Box<dyn Subscription<Message>>,
mut block_subscription: Box<dyn Subscription<Message>>,
mut protocol_parameters_subscription: Box<dyn Subscription<Message>>,
mut epoch_activity_subscription: Box<dyn Subscription<Message>>,
mut epoch_nonce_subscription: Box<dyn Subscription<Message>>,
mut spo_state_subscription: Box<dyn Subscription<Message>>,
mut spdd_subscription: Box<dyn Subscription<Message>>,
) -> Result<()> {
Expand Down Expand Up @@ -90,8 +90,10 @@ impl BlockVrfValidator {

if is_new_epoch {
// read epoch boundary messages
let (_, protocol_parameters_msg) =
protocol_parameters_subscription.read_ignoring_rollbacks().await?;
let protocol_parameters_message_f = protocol_parameters_subscription.read();
let epoch_nonce_message_f = epoch_nonce_subscription.read();

let (_, protocol_parameters_msg) = protocol_parameters_message_f.await?;
let span = info_span!(
"block_vrf_validator.handle_protocol_parameters",
epoch = block_info.epoch
Expand All @@ -104,18 +106,20 @@ impl BlockVrfValidator {
_ => error!("Unexpected message type: {protocol_parameters_msg:?}"),
});

let (_, epoch_activity_msg) =
epoch_activity_subscription.read_ignoring_rollbacks().await?;
let (_, epoch_nonce_msg) = epoch_nonce_message_f.await?;
let span = info_span!(
"block_vrf_validator.handle_epoch_activity",
"block_vrf_validator.handle_epoch_nonce",
epoch = block_info.epoch
);
span.in_scope(|| match epoch_activity_msg.as_ref() {
Message::Cardano((block_info, CardanoMessage::EpochActivity(msg))) => {
span.in_scope(|| match epoch_nonce_msg.as_ref() {
Message::Cardano((
block_info,
CardanoMessage::EpochNonce(active_nonce),
)) => {
Self::check_sync(&current_block, block_info);
state.handle_epoch_activity(msg);
state.handle_epoch_nonce(active_nonce);
}
_ => error!("Unexpected message type: {epoch_activity_msg:?}"),
_ => error!("Unexpected message type: {epoch_nonce_msg:?}"),
});

let (_, spo_state_msg) =
Expand Down Expand Up @@ -194,10 +198,10 @@ impl BlockVrfValidator {
.unwrap_or(DEFAULT_BLOCK_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating block subscription on '{block_subscribe_topic}'");

let epoch_activity_subscribe_topic = config
.get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating epoch activity subscription on '{epoch_activity_subscribe_topic}'");
let epoch_nonce_subscribe_topic = config
.get_string(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.0)
.unwrap_or(DEFAULT_EPOCH_NONCE_SUBSCRIBE_TOPIC.1.to_string());
info!("Creating epoch nonce subscription on '{epoch_nonce_subscribe_topic}'");

let spo_state_subscribe_topic = config
.get_string(DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC.0)
Expand All @@ -218,8 +222,7 @@ impl BlockVrfValidator {
let protocol_parameters_subscription =
context.subscribe(&protocol_parameters_subscribe_topic).await?;
let block_subscription = context.subscribe(&block_subscribe_topic).await?;
let epoch_activity_subscription =
context.subscribe(&epoch_activity_subscribe_topic).await?;
let epoch_nonce_subscription = context.subscribe(&epoch_nonce_subscribe_topic).await?;
let spo_state_subscription = context.subscribe(&spo_state_subscribe_topic).await?;
let spdd_subscription = context.subscribe(&spdd_subscribe_topic).await?;

Expand All @@ -237,7 +240,7 @@ impl BlockVrfValidator {
bootstrapped_subscription,
block_subscription,
protocol_parameters_subscription,
epoch_activity_subscription,
epoch_nonce_subscription,
spo_state_subscription,
spdd_subscription,
)
Expand Down
1 change: 1 addition & 0 deletions modules/block_vrf_validator/src/ouroboros/praos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub fn validate_vrf_praos<'a>(
}),
Box::new(move || {
validate_vrf_leader_value(
&pool_id,
&header.leader_vrf_output().map_err(|_| {
VrfValidationError::Other("Leader VRF Output is not set".to_string())
})?[..],
Expand Down
Loading