diff --git a/common/src/caryatid.rs b/common/src/caryatid.rs new file mode 100644 index 000000000..cedd87772 --- /dev/null +++ b/common/src/caryatid.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use anyhow::Result; +use caryatid_sdk::{async_trait, Context, MessageBounds, Subscription}; + +use crate::messages::{CardanoMessage, Message, StateTransitionMessage}; + +#[async_trait] +pub trait SubscriptionExt { + async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc)>; +} + +#[async_trait] +impl SubscriptionExt for Box> { + async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc)> { + loop { + let (stream, message) = self.read().await?; + if matches!( + message.as_ref(), + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)) + )) + ) { + continue; + } + break Ok((stream, message)); + } + } +} + +/// A utility to publish messages, which will only publish rollback messages if some work has been rolled back +pub struct RollbackAwarePublisher { + /// Module context + context: Arc>, + + /// Topic to publish on + topic: String, + + // At which slot did we publish our last non-rollback message + last_activity_at: Option, +} + +impl RollbackAwarePublisher { + pub fn new(context: Arc>, topic: String) -> Self { + Self { + context, + topic, + last_activity_at: None, + } + } + + pub async fn publish(&mut self, message: Arc) -> Result<()> { + match message.as_ref() { + Message::Cardano(( + block, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + if self.last_activity_at.is_some_and(|slot| slot >= block.slot) { + self.last_activity_at = None; + self.context.publish(&self.topic, message).await?; + } + Ok(()) + } + Message::Cardano((block, _)) => { + self.last_activity_at = Some(block.slot); + self.context.publish(&self.topic, message).await + } + _ => self.context.publish(&self.topic, message).await, + } + } +} diff --git a/common/src/commands/chain_sync.rs b/common/src/commands/chain_sync.rs index 77a55f12b..2096de910 100644 --- a/common/src/commands/chain_sync.rs +++ b/common/src/commands/chain_sync.rs @@ -1,6 +1,6 @@ -use crate::{BlockHash, Slot}; +use crate::Point; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ChainSyncCommand { - FindIntersect { slot: Slot, hash: BlockHash }, + FindIntersect(Point), } diff --git a/common/src/lib.rs b/common/src/lib.rs index cc5c9ba97..950e23597 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,7 @@ pub mod address; pub mod calculations; +pub mod caryatid; pub mod cbor; pub mod cip19; pub mod commands; diff --git a/common/src/messages.rs b/common/src/messages.rs index 61b14e493..1190e00fe 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -1,8 +1,5 @@ //! Definition of Acropolis messages -// We don't use these messages in the acropolis_common crate itself -#![allow(dead_code)] - use crate::commands::chain_sync::ChainSyncCommand; use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; @@ -45,6 +42,13 @@ pub struct RawBlockMessage { pub body: Vec, } +/// Rollback message +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum StateTransitionMessage { + /// The chain has been rolled back to a specific point + Rollback(Point), +} + /// Snapshot completion message #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct SnapshotCompleteMessage { @@ -303,6 +307,7 @@ pub struct SPOStateMessage { #[allow(clippy::large_enum_variant)] pub enum CardanoMessage { BlockAvailable(RawBlockMessage), // Block body available + StateTransition(StateTransitionMessage), // Our position on the chain has changed BlockValidation(ValidationStatus), // Result of a block validation SnapshotComplete, // Mithril snapshot loaded ReceivedTxs(RawTxsMessage), // Transaction available diff --git a/common/src/types.rs b/common/src/types.rs index e5f35740c..81f849f9a 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -724,6 +724,17 @@ impl TxOutRef { /// Slot pub type Slot = u64; +/// Point on the chain +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq)] +pub enum Point { + #[default] + Origin, + Specific { + hash: BlockHash, + slot: Slot, + }, +} + /// Amount of Ada, in Lovelace pub type Lovelace = u64; pub type LovelaceDelta = i64; diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index f5eee5989..d777cc50d 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -2,7 +2,8 @@ //! Manages stake and reward accounts state use acropolis_common::{ - messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, + caryatid::SubscriptionExt, + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage}, queries::accounts::{DrepDelegators, PoolDelegators, DEFAULT_ACCOUNTS_QUERY_TOPIC}, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, @@ -120,17 +121,13 @@ impl AccountsState { // Get a mutable state let mut state = history.lock().await.get_current_state(); - // Read per-block topics in parallel - let certs_message_f = certs_subscription.read(); - let stake_message_f = stake_subscription.read(); - let withdrawals_message_f = withdrawals_subscription.read(); let mut current_block: Option = None; // Use certs_message as the synchroniser, but we have to handle it after the // epoch things, because they apply to the new epoch, not the last - let (_, certs_message) = certs_message_f.await?; + let (_, certs_message) = certs_subscription.read().await?; let new_epoch = match certs_message.as_ref() { - Message::Cardano((block_info, _)) => { + Message::Cardano((block_info, CardanoMessage::TxCertificates(_))) => { // Handle rollbacks on this topic only if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); @@ -139,6 +136,16 @@ impl AccountsState { current_block = Some(block_info.clone()); block_info.new_epoch && block_info.epoch > 0 } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + drep_publisher.publish_rollback(certs_message.clone()).await?; + spo_publisher.publish_rollback(certs_message.clone()).await?; + spo_rewards_publisher.publish_rollback(certs_message.clone()).await?; + stake_reward_deltas_publisher.publish_rollback(certs_message.clone()).await?; + false + } _ => false, }; @@ -149,18 +156,13 @@ impl AccountsState { // Read from epoch-boundary messages only when it's a new epoch if new_epoch { - let dreps_message_f = drep_state_subscription.read(); - let spos_message_f = spos_subscription.read(); - let ea_message_f = ea_subscription.read(); - let params_message_f = parameters_subscription.read(); - let spdd_store_guard = match spdd_store.as_ref() { Some(s) => Some(s.lock().await), None => None, }; // Handle DRep - let (_, message) = dreps_message_f.await?; + let (_, message) = drep_state_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => { let span = info_span!( @@ -184,7 +186,7 @@ impl AccountsState { } // Handle SPOs - let (_, message) = spos_message_f.await?; + let (_, message) = spos_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => { let span = @@ -219,7 +221,7 @@ impl AccountsState { _ => error!("Unexpected message type: {message:?}"), } - let (_, message) = params_message_f.await?; + let (_, message) = parameters_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => { let span = info_span!( @@ -241,7 +243,7 @@ impl AccountsState { } // Handle epoch activity - let (_, message) = ea_message_f.await?; + let (_, message) = ea_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => { let span = info_span!( @@ -297,11 +299,18 @@ impl AccountsState { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // Ignore this, we already handled rollbacks + } + _ => error!("Unexpected message type: {certs_message:?}"), } // Handle withdrawals - let (_, message) = withdrawals_message_f.await?; + let (_, message) = withdrawals_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => { let span = info_span!( @@ -323,7 +332,7 @@ impl AccountsState { } // Handle stake address deltas - let (_, message) = stake_message_f.await?; + let (_, message) = stake_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => { let span = info_span!( diff --git a/modules/accounts_state/src/drep_distribution_publisher.rs b/modules/accounts_state/src/drep_distribution_publisher.rs index 7207ae0ec..31de1ceb4 100644 --- a/modules/accounts_state/src/drep_distribution_publisher.rs +++ b/modules/accounts_state/src/drep_distribution_publisher.rs @@ -1,3 +1,4 @@ +use acropolis_common::caryatid::RollbackAwarePublisher; use acropolis_common::messages::{ CardanoMessage, DRepDelegationDistribution, DRepStakeDistributionMessage, Message, }; @@ -6,18 +7,12 @@ use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for DRep Delegation Distribution (DRDD) -pub struct DRepDistributionPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct DRepDistributionPublisher(RollbackAwarePublisher); impl DRepDistributionPublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the DRep Delegation Distribution @@ -26,18 +21,19 @@ impl DRepDistributionPublisher { block: &BlockInfo, drdd: DRepDelegationDistribution, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block.clone(), - CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage { - epoch: block.epoch, - drdd, - }), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage { + epoch: block.epoch, + drdd, + }), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/accounts_state/src/spo_distribution_publisher.rs b/modules/accounts_state/src/spo_distribution_publisher.rs index 47a8efdc6..049aac1b6 100644 --- a/modules/accounts_state/src/spo_distribution_publisher.rs +++ b/modules/accounts_state/src/spo_distribution_publisher.rs @@ -1,3 +1,4 @@ +use acropolis_common::caryatid::RollbackAwarePublisher; use acropolis_common::messages::{CardanoMessage, Message, SPOStakeDistributionMessage}; use acropolis_common::{BlockInfo, DelegatedStake, PoolId}; use caryatid_sdk::Context; @@ -5,18 +6,12 @@ use std::collections::BTreeMap; use std::sync::Arc; /// Message publisher for Stake Pool Delegation Distribution (SPDD) -pub struct SPODistributionPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct SPODistributionPublisher(RollbackAwarePublisher); impl SPODistributionPublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the SPDD @@ -25,18 +20,19 @@ impl SPODistributionPublisher { block: &BlockInfo, spos: BTreeMap, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block.clone(), - CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage { - epoch: block.epoch - 1, // End of the previous epoch - spos: spos.into_iter().collect(), - }), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage { + epoch: block.epoch - 1, // End of the previous epoch + spos: spos.into_iter().collect(), + }), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/accounts_state/src/spo_rewards_publisher.rs b/modules/accounts_state/src/spo_rewards_publisher.rs index 42ace388a..60456918c 100644 --- a/modules/accounts_state/src/spo_rewards_publisher.rs +++ b/modules/accounts_state/src/spo_rewards_publisher.rs @@ -1,21 +1,16 @@ +use acropolis_common::caryatid::RollbackAwarePublisher; use acropolis_common::messages::{CardanoMessage, Message, SPORewardsMessage}; use acropolis_common::{BlockInfo, PoolId, SPORewards}; use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for Stake Pool Delegation Distribution (SPDD) -pub struct SPORewardsPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct SPORewardsPublisher(RollbackAwarePublisher); impl SPORewardsPublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the SPO rewards @@ -24,18 +19,19 @@ impl SPORewardsPublisher { block: &BlockInfo, spo_rewards: Vec<(PoolId, SPORewards)>, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block.clone(), - CardanoMessage::SPORewards(SPORewardsMessage { - epoch: block.epoch - 1, // End of previous epoch - spos: spo_rewards.into_iter().collect(), - }), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::SPORewards(SPORewardsMessage { + epoch: block.epoch - 1, // End of previous epoch + spos: spo_rewards.into_iter().collect(), + }), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/accounts_state/src/stake_reward_deltas_publisher.rs b/modules/accounts_state/src/stake_reward_deltas_publisher.rs index dce6e8471..911f08ba2 100644 --- a/modules/accounts_state/src/stake_reward_deltas_publisher.rs +++ b/modules/accounts_state/src/stake_reward_deltas_publisher.rs @@ -1,21 +1,16 @@ +use acropolis_common::caryatid::RollbackAwarePublisher; use acropolis_common::messages::{CardanoMessage, Message, StakeRewardDeltasMessage}; use acropolis_common::{BlockInfo, StakeRewardDelta}; use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for Stake Reward Deltas -pub struct StakeRewardDeltasPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct StakeRewardDeltasPublisher(RollbackAwarePublisher); impl StakeRewardDeltasPublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the Stake Diffs @@ -24,17 +19,18 @@ impl StakeRewardDeltasPublisher { block: &BlockInfo, stake_reward_deltas: Vec, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block.clone(), - CardanoMessage::StakeRewardDeltas(StakeRewardDeltasMessage { - deltas: stake_reward_deltas, - }), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::StakeRewardDeltas(StakeRewardDeltasMessage { + deltas: stake_reward_deltas, + }), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index ec746b0ec..f58a843fa 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -8,7 +8,7 @@ use crate::{ immutable_address_store::ImmutableAddressStore, state::{AddressStorageConfig, State}, }; -use acropolis_common::queries::errors::QueryError; +use acropolis_common::{caryatid::SubscriptionExt, queries::errors::QueryError}; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::addresses::{ @@ -72,7 +72,7 @@ impl AddressState { // Main loop of synchronised messages loop { // Address deltas are the synchroniser - let (_, deltas_msg) = address_deltas_subscription.read().await?; + let (_, deltas_msg) = address_deltas_subscription.read_ignoring_rollbacks().await?; let (current_block, new_epoch) = match deltas_msg.as_ref() { Message::Cardano((info, _)) => (info.clone(), info.new_epoch && info.epoch > 0), _ => continue, @@ -87,7 +87,7 @@ impl AddressState { // Read params message on epoch bounday to update rollback window // length if needed and set epoch start block for volatile pruning if new_epoch { - let (_, message) = params_subscription.read().await?; + let (_, message) = params_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = message.as_ref() { diff --git a/modules/assets_state/src/assets_state.rs b/modules/assets_state/src/assets_state.rs index 9cb25f6cf..2ef0a94ac 100644 --- a/modules/assets_state/src/assets_state.rs +++ b/modules/assets_state/src/assets_state.rs @@ -7,6 +7,7 @@ use crate::{ state::{AssetsStorageConfig, State, StoreTransactions}, }; use acropolis_common::{ + caryatid::SubscriptionExt, messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::{ assets::{AssetsStateQuery, AssetsStateQueryResponse, DEFAULT_ASSETS_QUERY_TOPIC}, @@ -58,11 +59,11 @@ impl AssetsState { registry: Arc>, ) -> Result<()> { if let Some(sub) = utxo_deltas_subscription.as_mut() { - let _ = sub.read().await?; + let _ = sub.read_ignoring_rollbacks().await?; info!("Consumed initial message from utxo_deltas_subscription"); } if let Some(sub) = address_deltas_subscription.as_mut() { - let _ = sub.read().await?; + let _ = sub.read_ignoring_rollbacks().await?; info!("Consumed initial message from address_deltas_subscription"); } // Main loop of synchronised messages @@ -75,7 +76,7 @@ impl AssetsState { let current_block: BlockInfo; // Asset deltas are the synchroniser - let (_, asset_msg) = asset_deltas_subscription.read().await?; + let (_, asset_msg) = asset_deltas_subscription.read_ignoring_rollbacks().await?; match asset_msg.as_ref() { Message::Cardano((ref block_info, CardanoMessage::AssetDeltas(deltas_msg))) => { // rollback only on asset deltas @@ -118,7 +119,7 @@ impl AssetsState { // Handle UTxO deltas if subscription is registered (store-info or store-transactions enabled) if let Some(sub) = utxo_deltas_subscription.as_mut() { - let (_, utxo_msg) = sub.read().await?; + let (_, utxo_msg) = sub.read_ignoring_rollbacks().await?; match utxo_msg.as_ref() { Message::Cardano(( ref block_info, @@ -154,7 +155,7 @@ impl AssetsState { } if let Some(sub) = address_deltas_subscription.as_mut() { - let (_, address_msg) = sub.read().await?; + let (_, address_msg) = sub.read_ignoring_rollbacks().await?; match address_msg.as_ref() { Message::Cardano(( ref block_info, diff --git a/modules/block_kes_validator/src/block_kes_validator.rs b/modules/block_kes_validator/src/block_kes_validator.rs index 4de71696b..9d8c687f2 100644 --- a/modules/block_kes_validator/src/block_kes_validator.rs +++ b/modules/block_kes_validator/src/block_kes_validator.rs @@ -2,6 +2,7 @@ //! Validate KES signatures in the block header use acropolis_common::{ + caryatid::SubscriptionExt, messages::{CardanoMessage, Message}, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, @@ -70,7 +71,7 @@ impl BlockKesValidator { let mut state = history.lock().await.get_or_init_with(State::new); let mut current_block: Option = None; - let (_, message) = block_subscription.read().await?; + let (_, message) = block_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -82,10 +83,8 @@ impl BlockKesValidator { if is_new_epoch { // read epoch boundary messages - let protocol_parameters_message_f = protocol_parameters_subscription.read(); - let spo_state_message_f = spo_state_subscription.read(); - - let (_, protocol_parameters_msg) = protocol_parameters_message_f.await?; + let (_, protocol_parameters_msg) = + protocol_parameters_subscription.read_ignoring_rollbacks().await?; let span = info_span!( "block_kes_validator.handle_protocol_parameters", epoch = block_info.epoch @@ -98,7 +97,8 @@ impl BlockKesValidator { _ => error!("Unexpected message type: {protocol_parameters_msg:?}"), }); - let (_, spo_state_msg) = spo_state_message_f.await?; + let (_, spo_state_msg) = + spo_state_subscription.read_ignoring_rollbacks().await?; let span = info_span!( "block_kes_validator.handle_spo_state", epoch = block_info.epoch diff --git a/modules/block_unpacker/src/block_unpacker.rs b/modules/block_unpacker/src/block_unpacker.rs index 7e605db1d..a50072434 100644 --- a/modules/block_unpacker/src/block_unpacker.rs +++ b/modules/block_unpacker/src/block_unpacker.rs @@ -1,7 +1,7 @@ //! Acropolis Block unpacker module for Caryatid //! Unpacks block bodies into transactions -use acropolis_common::messages::{CardanoMessage, Message, RawTxsMessage}; +use acropolis_common::messages::{CardanoMessage, Message, RawTxsMessage, StateTransitionMessage}; use anyhow::Result; use caryatid_sdk::{module, Context}; use config::Config; @@ -81,6 +81,18 @@ impl BlockUnpacker { } } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // forward the rollback downstream + context + .message_bus + .publish(&publish_topic, message) + .await + .unwrap_or_else(|e| error!("Failed to publish rollback: {e}")); + } + _ => error!("Unexpected message type: {message:?}"), } } diff --git a/modules/block_vrf_validator/src/block_vrf_validator.rs b/modules/block_vrf_validator/src/block_vrf_validator.rs index ccfeb3f0c..a6a84ad0d 100644 --- a/modules/block_vrf_validator/src/block_vrf_validator.rs +++ b/modules/block_vrf_validator/src/block_vrf_validator.rs @@ -2,6 +2,7 @@ //! Validate the VRF calculation in the block header use acropolis_common::{ + caryatid::SubscriptionExt, messages::{CardanoMessage, Message}, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, @@ -77,7 +78,7 @@ impl BlockVrfValidator { let mut state = history.lock().await.get_or_init_with(State::new); let mut current_block: Option = None; - let (_, message) = block_subscription.read().await?; + let (_, message) = block_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -89,12 +90,8 @@ impl BlockVrfValidator { if is_new_epoch { // read epoch boundary messages - let protocol_parameters_message_f = protocol_parameters_subscription.read(); - let epoch_activity_message_f = epoch_activity_subscription.read(); - let spo_state_message_f = spo_state_subscription.read(); - let spdd_msg_f = spdd_subscription.read(); - - let (_, protocol_parameters_msg) = protocol_parameters_message_f.await?; + let (_, protocol_parameters_msg) = + protocol_parameters_subscription.read_ignoring_rollbacks().await?; let span = info_span!( "block_vrf_validator.handle_protocol_parameters", epoch = block_info.epoch @@ -107,7 +104,8 @@ impl BlockVrfValidator { _ => error!("Unexpected message type: {protocol_parameters_msg:?}"), }); - let (_, epoch_activity_msg) = epoch_activity_message_f.await?; + let (_, epoch_activity_msg) = + epoch_activity_subscription.read_ignoring_rollbacks().await?; let span = info_span!( "block_vrf_validator.handle_epoch_activity", epoch = block_info.epoch @@ -120,8 +118,9 @@ impl BlockVrfValidator { _ => error!("Unexpected message type: {epoch_activity_msg:?}"), }); - let (_, spo_state_msg) = spo_state_message_f.await?; - let (_, spdd_msg) = spdd_msg_f.await?; + let (_, spo_state_msg) = + spo_state_subscription.read_ignoring_rollbacks().await?; + let (_, spdd_msg) = spdd_subscription.read_ignoring_rollbacks().await?; let span = info_span!( "block_vrf_validator.handle_new_snapshot", epoch = block_info.epoch diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index d43c46e25..a2f4225a1 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -9,6 +9,7 @@ use acropolis_codec::{ use acropolis_common::queries::blocks::TransactionHashesAndTimeStamps; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ + caryatid::SubscriptionExt, crypto::keyhash_224, messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::transactions::{ @@ -137,10 +138,6 @@ impl ChainStore { let mut new_blocks_subscription = context.subscribe(&new_blocks_topic).await?; let mut params_subscription = context.subscribe(¶ms_topic).await?; context.run(async move { - // Get promise of params message so the params queue is cleared and - // the message is ready as soon as possible when we need it - let mut params_message = params_subscription.read(); - // Validate the first stored block matches what is already persisted when clear-on-start is false let Ok((_, first_block_message)) = new_blocks_subscription.read().await else { return; @@ -160,7 +157,8 @@ impl ChainStore { if let Message::Cardano((block_info, _)) = message.as_ref() { if block_info.new_epoch { - let Ok((_, message)) = params_message.await else { + let Ok((_, message)) = params_subscription.read_ignoring_rollbacks().await + else { return; }; let mut history = history.lock().await; @@ -169,8 +167,6 @@ impl ChainStore { return; }; history.commit(block_info.number, state); - // Have the next params message ready for the next epoch - params_message = params_subscription.read(); } } } diff --git a/modules/consensus/src/consensus.rs b/modules/consensus/src/consensus.rs index 69f91c235..68460bf88 100644 --- a/modules/consensus/src/consensus.rs +++ b/modules/consensus/src/consensus.rs @@ -2,7 +2,7 @@ //! Maintains a favoured chain based on offered options from multiple sources use acropolis_common::{ - messages::{CardanoMessage, Message}, + messages::{CardanoMessage, Message, StateTransitionMessage}, validation::ValidationStatus, }; use anyhow::Result; @@ -134,6 +134,18 @@ impl Consensus { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // Send rollback to all validators and state modules + context + .message_bus + .publish(&publish_blocks_topic, message.clone()) + .await + .unwrap_or_else(|e| error!("Failed to publish: {e}")); + } + _ => error!("Unexpected message type: {message:?}"), } } diff --git a/modules/drdd_state/src/drdd_state.rs b/modules/drdd_state/src/drdd_state.rs index 8d919aba9..fce95dc78 100644 --- a/modules/drdd_state/src/drdd_state.rs +++ b/modules/drdd_state/src/drdd_state.rs @@ -1,6 +1,7 @@ //! Acropolis DRDD state module for Caryatid //! Stores historical DRep delegation distributions use acropolis_common::{ + caryatid::SubscriptionExt, messages::{CardanoMessage, Message}, rest_helper::handle_rest_with_query_parameters, }; @@ -50,7 +51,8 @@ impl DRDDState { let mut message_subscription = context.subscribe(&subscribe_topic).await?; context.run(async move { loop { - let Ok((_, message)) = message_subscription.read().await else { + let Ok((_, message)) = message_subscription.read_ignoring_rollbacks().await + else { return; }; match message.as_ref() { diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index b1de308d4..51d202772 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -1,6 +1,8 @@ //! Acropolis DRep State module for Caryatid //! Accepts certificate events and derives the DRep State in memory +use acropolis_common::caryatid::SubscriptionExt; +use acropolis_common::messages::StateTransitionMessage; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, @@ -79,13 +81,10 @@ impl DRepState { }; let mut current_block: Option = None; - // Read per-block messages in parallel - let certs_message_f = certs_subscription.read(); - // Certificates are the synchroniser - let (_, certs_message) = certs_message_f.await?; + let (_, certs_message) = certs_subscription.read().await?; let new_epoch = match certs_message.as_ref() { - Message::Cardano((ref block_info, _)) => { + Message::Cardano((ref block_info, CardanoMessage::TxCertificates(_))) => { // rollback only on certs if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); @@ -93,6 +92,13 @@ impl DRepState { current_block = Some(block_info.clone()); block_info.new_epoch && block_info.epoch > 0 } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + drep_state_publisher.publish_rollback(certs_message.clone()).await?; + false + } _ => false, }; @@ -100,7 +106,7 @@ impl DRepState { if new_epoch { // Read params subscription if store-info is enabled to obtain DRep expiration param. Update expirations on epoch transition if let Some(sub) = params_subscription.as_mut() { - let (_, message) = sub.read().await?; + let (_, message) = sub.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano(( ref block_info, @@ -151,12 +157,19 @@ impl DRepState { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // Do nothing, we handled the rollback earlier + } + _ => error!("Unexpected message type: {certs_message:?}"), } // Handle governance message if let Some(sub) = gov_subscription.as_mut() { - let (_, message) = sub.read().await?; + let (_, message) = sub.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano(( block_info, diff --git a/modules/drep_state/src/drep_state_publisher.rs b/modules/drep_state/src/drep_state_publisher.rs index 7a94a099b..48344b058 100644 --- a/modules/drep_state/src/drep_state_publisher.rs +++ b/modules/drep_state/src/drep_state_publisher.rs @@ -1,4 +1,5 @@ use acropolis_common::{ + caryatid::RollbackAwarePublisher, messages::{CardanoMessage, DRepStateMessage, Message}, BlockInfo, Credential, }; @@ -6,18 +7,12 @@ use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for DRep State -pub struct DRepStatePublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct DRepStatePublisher(RollbackAwarePublisher); impl DRepStatePublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the DRep state @@ -26,18 +21,19 @@ impl DRepStatePublisher { block: &BlockInfo, dreps: Vec<(Credential, u64)>, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block.clone(), - CardanoMessage::DRepState(DRepStateMessage { - epoch: block.epoch, - dreps, - }), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::DRepState(DRepStateMessage { + epoch: block.epoch, + dreps, + }), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/epochs_state/src/epoch_activity_publisher.rs b/modules/epochs_state/src/epoch_activity_publisher.rs index 9341ccafe..31561eea3 100644 --- a/modules/epochs_state/src/epoch_activity_publisher.rs +++ b/modules/epochs_state/src/epoch_activity_publisher.rs @@ -1,4 +1,5 @@ use acropolis_common::{ + caryatid::RollbackAwarePublisher, messages::{CardanoMessage, EpochActivityMessage, Message}, BlockInfo, }; @@ -6,18 +7,12 @@ use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for Epoch Activity Message -pub struct EpochActivityPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct EpochActivityPublisher(RollbackAwarePublisher); impl EpochActivityPublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the Epoch Activity Message @@ -26,15 +21,16 @@ impl EpochActivityPublisher { block_info: &BlockInfo, ea: EpochActivityMessage, ) -> anyhow::Result<()> { - self.context - .message_bus - .publish( - &self.topic, - Arc::new(Message::Cardano(( - block_info.clone(), - CardanoMessage::EpochActivity(ea), - ))), - ) + self.0 + .publish(Arc::new(Message::Cardano(( + block_info.clone(), + CardanoMessage::EpochActivity(ea), + )))) .await } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await + } } diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 17a130172..cf2f0d8be 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -2,11 +2,14 @@ //! Unpacks block bodies to get transaction fees use acropolis_common::{ - messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, - queries::epochs::{ - EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC, + caryatid::SubscriptionExt, + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage}, + queries::{ + epochs::{ + EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC, + }, + errors::QueryError, }, - queries::errors::QueryError, state_history::{StateHistory, StateHistoryStore}, BlockInfo, BlockStatus, }; @@ -72,12 +75,8 @@ impl EpochsState { let mut state = history.lock().await.get_or_init_with(|| State::new(&genesis)); let mut current_block: Option = None; - // Read both topics in parallel - let block_message_f = block_subscription.read(); - let block_txs_message_f = block_txs_subscription.read(); - // Handle blocks first - let (_, message) = block_message_f.await?; + let (_, message) = block_subscription.read().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { // handle rollback here @@ -90,7 +89,7 @@ impl EpochsState { // read protocol parameters if new epoch if is_new_epoch { let (_, protocol_parameters_msg) = - protocol_parameters_subscription.read().await?; + protocol_parameters_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano((_, CardanoMessage::ProtocolParams(params))) = protocol_parameters_msg.as_ref() { @@ -137,11 +136,21 @@ impl EpochsState { }); } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // publish epoch activity rollback message + epoch_activity_publisher.publish_rollback(message).await.unwrap_or_else(|e| { + error!("Failed to publish epoch activity rollback: {e}") + }); + } + _ => error!("Unexpected message type: {message:?}"), } // Handle block txs second so new epoch's state don't get counted in the last one - let (_, message) = block_txs_message_f.await?; + let (_, message) = block_txs_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockInfoMessage(txs_msg))) => { let span = diff --git a/modules/governance_state/src/governance_state.rs b/modules/governance_state/src/governance_state.rs index 80d36fc59..cb57d454c 100644 --- a/modules/governance_state/src/governance_state.rs +++ b/modules/governance_state/src/governance_state.rs @@ -1,11 +1,13 @@ //! Acropolis Governance State module for Caryatid //! Accepts certificate events and derives the Governance State in memory +use acropolis_common::caryatid::SubscriptionExt; +use acropolis_common::messages::StateTransitionMessage; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{ - CardanoMessage, DRepStakeDistributionMessage, GovernanceProceduresMessage, Message, - ProtocolParamsMessage, SPOStakeDistributionMessage, StateQuery, StateQueryResponse, + CardanoMessage, DRepStakeDistributionMessage, Message, ProtocolParamsMessage, + SPOStakeDistributionMessage, StateQuery, StateQueryResponse, }, queries::governance::{ GovernanceStateQuery, GovernanceStateQueryResponse, ProposalInfo, ProposalVotes, @@ -13,7 +15,7 @@ use acropolis_common::{ }, BlockInfo, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use caryatid_sdk::{message_bus::Subscription, module, Context}; use config::Config; use std::sync::Arc; @@ -82,23 +84,10 @@ impl GovernanceStateConfig { } impl GovernanceState { - async fn read_governance( - governance_s: &mut Box>, - ) -> Result<(BlockInfo, GovernanceProceduresMessage)> { - match governance_s.read().await?.1.as_ref() { - Message::Cardano((blk, CardanoMessage::GovernanceProcedures(msg))) => { - Ok((blk.clone(), msg.clone())) - } - msg => Err(anyhow!( - "Unexpected message {msg:?} for governance procedures topic" - )), - } - } - async fn read_parameters( parameters_s: &mut Box>, ) -> Result<(BlockInfo, ProtocolParamsMessage)> { - match parameters_s.read().await?.1.as_ref() { + match parameters_s.read_ignoring_rollbacks().await?.1.as_ref() { Message::Cardano((blk, CardanoMessage::ProtocolParams(params))) => { Ok((blk.clone(), params.clone())) } @@ -111,7 +100,7 @@ impl GovernanceState { async fn read_drep( drep_s: &mut Box>, ) -> Result<(BlockInfo, DRepStakeDistributionMessage)> { - match drep_s.read().await?.1.as_ref() { + match drep_s.read_ignoring_rollbacks().await?.1.as_ref() { Message::Cardano((blk, CardanoMessage::DRepStakeDistribution(distr))) => { Ok((blk.clone(), distr.clone())) } @@ -124,7 +113,7 @@ impl GovernanceState { async fn read_spo( spo_s: &mut Box>, ) -> Result<(BlockInfo, SPOStakeDistributionMessage)> { - match spo_s.read().await?.1.as_ref() { + match spo_s.read_ignoring_rollbacks().await?.1.as_ref() { Message::Cardano((blk, CardanoMessage::SPOStakeDistribution(distr))) => { Ok((blk.clone(), distr.clone())) } @@ -229,7 +218,21 @@ impl GovernanceState { }); loop { - let (blk_g, gov_procs) = Self::read_governance(&mut governance_s).await?; + let (_, message) = governance_s.read().await?; + let (blk_g, gov_procs) = match message.as_ref() { + Message::Cardano((blk, CardanoMessage::GovernanceProcedures(msg))) => { + (blk.clone(), msg.clone()) + } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + let mut state = state.lock().await; + state.publish_rollback(message).await?; + continue; + } + _ => bail!("Unexpected message {message:?} for governance procedures topic"), + }; let span = info_span!("governance_state.handle", block = blk_g.number); async { diff --git a/modules/governance_state/src/state.rs b/modules/governance_state/src/state.rs index 1735cbbfe..11b0b6a62 100644 --- a/modules/governance_state/src/state.rs +++ b/modules/governance_state/src/state.rs @@ -1,6 +1,7 @@ //! Acropolis Governance State: State storage use acropolis_common::{ + caryatid::RollbackAwarePublisher, messages::{ CardanoMessage, DRepStakeDistributionMessage, GovernanceOutcomesMessage, GovernanceProceduresMessage, Message, ProtocolParamsMessage, SPOStakeDistributionMessage, @@ -20,8 +21,7 @@ use crate::{ }; pub struct State { - pub enact_state_topic: String, - pub context: Arc>, + publisher: RollbackAwarePublisher, pub drep_stake_messages_count: usize, @@ -42,8 +42,7 @@ impl State { verification_output_file: Option, ) -> Self { Self { - context, - enact_state_topic, + publisher: RollbackAwarePublisher::new(context, enact_state_topic), drep_stake_messages_count: 0, @@ -226,22 +225,21 @@ impl State { ); } - pub async fn send(&self, block: &BlockInfo, message: GovernanceOutcomesMessage) -> Result<()> { + pub async fn send( + &mut self, + block: &BlockInfo, + message: GovernanceOutcomesMessage, + ) -> Result<()> { let packed_message = Arc::new(Message::Cardano(( block.clone(), CardanoMessage::GovernanceOutcomes(message), ))); - let context = self.context.clone(); - let enact_state_topic = self.enact_state_topic.clone(); - - tokio::spawn(async move { - context - .message_bus - .publish(&enact_state_topic, packed_message) - .await - .unwrap_or_else(|e| tracing::error!("Failed to publish: {e}")); - }); - Ok(()) + self.publisher.publish(packed_message).await + } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.publisher.publish(message).await } /// Get list of actual voting proposals diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 64382af17..acc08661c 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -1,6 +1,7 @@ //! Acropolis historical accounts state module for Caryatid //! Manages optional state data needed for Blockfrost alignment +use acropolis_common::caryatid::SubscriptionExt; use acropolis_common::queries::accounts::{ AccountsStateQuery, AccountsStateQueryResponse, DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC, }; @@ -85,15 +86,10 @@ impl HistoricalAccountsState { }); // Main loop of synchronised messages loop { - // Create all per-block message futures upfront before processing messages sequentially - let certs_message_f = certs_subscription.read(); - let stake_address_deltas_message_f = stake_address_deltas_subscription.read(); - let withdrawals_message_f = withdrawals_subscription.read(); - let mut current_block: Option = None; // Use certs_message as the synchroniser - let (_, certs_message) = certs_message_f.await?; + let (_, certs_message) = certs_subscription.read_ignoring_rollbacks().await?; let new_epoch = match certs_message.as_ref() { Message::Cardano((block_info, _)) => { // Handle rollbacks on this topic only @@ -111,7 +107,7 @@ impl HistoricalAccountsState { // Read from epoch-boundary messages only when it's a new epoch if new_epoch { - let (_, params_msg) = params_subscription.read().await?; + let (_, params_msg) = params_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = params_msg.as_ref() { @@ -123,7 +119,7 @@ impl HistoricalAccountsState { } } - let (_, rewards_msg) = rewards_subscription.read().await?; + let (_, rewards_msg) = rewards_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano(( block_info, CardanoMessage::StakeRewardDeltas(rewards_msg), @@ -153,7 +149,7 @@ impl HistoricalAccountsState { } // Handle withdrawals - let (_, message) = withdrawals_message_f.await?; + let (_, message) = withdrawals_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => { let span = info_span!( @@ -171,7 +167,7 @@ impl HistoricalAccountsState { } // Handle address deltas - let (_, message) = stake_address_deltas_message_f.await?; + let (_, message) = stake_address_deltas_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => { let span = info_span!( diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 6a97f39bd..fdb851e3a 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -3,6 +3,7 @@ use crate::immutable_historical_epochs_state::ImmutableHistoricalEpochsState; use crate::state::{HistoricalEpochsStateConfig, State}; +use acropolis_common::caryatid::SubscriptionExt; use acropolis_common::messages::StateQuery; use acropolis_common::queries::epochs::{ EpochInfo, EpochsStateQuery, NextEpochs, PreviousEpochs, DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC, @@ -70,9 +71,9 @@ impl HistoricalEpochsState { let mut current_block: Option = None; // Use certs_message as the synchroniser - let (_, blocks_message) = blocks_subscription.read().await?; + let (_, blocks_message) = blocks_subscription.read_ignoring_rollbacks().await?; let new_epoch = match blocks_message.as_ref() { - Message::Cardano((block_info, _)) => { + Message::Cardano((block_info, CardanoMessage::BlockAvailable(_))) => { // Handle rollbacks on this topic only let mut state = state_mutex.lock().await; if block_info.status == BlockStatus::RolledBack { @@ -87,10 +88,7 @@ impl HistoricalEpochsState { // Read from epoch-boundary messages only when it's a new epoch if new_epoch { - let params_message_f = params_subscription.read(); - let epoch_activity_message_f = epoch_activity_subscription.read(); - - let (_, params_msg) = params_message_f.await?; + let (_, params_msg) = params_subscription.read_ignoring_rollbacks().await?; match params_msg.as_ref() { Message::Cardano((block_info, CardanoMessage::ProtocolParams(params))) => { let span = info_span!( @@ -110,7 +108,8 @@ impl HistoricalEpochsState { _ => error!("Unexpected message type: {params_msg:?}"), } - let (_, epoch_activity_msg) = epoch_activity_message_f.await?; + let (_, epoch_activity_msg) = + epoch_activity_subscription.read_ignoring_rollbacks().await?; match epoch_activity_msg.as_ref() { Message::Cardano((block_info, CardanoMessage::EpochActivity(ea))) => { let span = info_span!( diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs index 8877c6d5d..7199c47fd 100644 --- a/modules/indexer/src/indexer.rs +++ b/modules/indexer/src/indexer.rs @@ -5,6 +5,7 @@ use acropolis_common::{ commands::chain_sync::ChainSyncCommand, hash::Hash, messages::{Command, Message}, + Point, }; use anyhow::Result; use caryatid_sdk::{module, Context}; @@ -35,13 +36,13 @@ impl Indexer { // This is a placeholder to test dynamic sync context.run(async move { - let example = ChainSyncCommand::FindIntersect { + let example = ChainSyncCommand::FindIntersect(Point::Specific { slot: 4492799, hash: Hash::from_str( "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", ) .expect("Valid hash"), - }; + }); // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) ctx.message_bus diff --git a/modules/parameters_state/src/parameters_state.rs b/modules/parameters_state/src/parameters_state.rs index 20db783a5..5ad95ede0 100644 --- a/modules/parameters_state/src/parameters_state.rs +++ b/modules/parameters_state/src/parameters_state.rs @@ -1,6 +1,7 @@ //! Acropolis Parameter State module for Caryatid //! Accepts certificate events and derives the Governance State in memory +use acropolis_common::messages::StateTransitionMessage; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, ProtocolParamsMessage, StateQuery, StateQueryResponse}, @@ -102,7 +103,8 @@ impl ParametersState { mut enact_s: Box>, ) -> Result<()> { loop { - match enact_s.read().await?.1.as_ref() { + let (_, message) = enact_s.read().await?; + match message.as_ref() { Message::Cardano((block, CardanoMessage::GovernanceOutcomes(gov))) => { let span = info_span!("parameters_state.handle", epoch = block.epoch); async { @@ -144,6 +146,13 @@ impl ParametersState { .instrument(span) .await?; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // forward the rollback downstream + config.context.publish(&config.protocol_parameters_topic, message).await?; + } msg => error!("Unexpected message {msg:?} for enact state topic"), } } diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 7456565c1..c46c370fa 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -153,12 +153,15 @@ impl PeerNetworkInterface { events_sender: mpsc::Sender, ) -> Result<()> { while let Ok((_, msg)) = subscription.read().await { - if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect { - slot, - hash, - })) = msg.as_ref() + if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect(p))) = + msg.as_ref() { - let point = Point::new(*slot, hash.to_vec()); + let point = match p { + acropolis_common::Point::Origin => Point::Origin, + acropolis_common::Point::Specific { hash, slot } => { + Point::Specific(*slot, hash.to_vec()) + } + }; if events_sender.send(NetworkEvent::SyncPointUpdate { point }).await.is_err() { bail!("event channel closed"); diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs index c1a9e4481..11596dabc 100644 --- a/modules/spdd_state/src/spdd_state.rs +++ b/modules/spdd_state/src/spdd_state.rs @@ -1,5 +1,6 @@ //! Acropolis SPDD state module for Caryatid //! Stores historical stake pool delegation distributions +use acropolis_common::caryatid::SubscriptionExt; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, @@ -66,7 +67,8 @@ impl SPDDState { let mut message_subscription = context.subscribe(&subscribe_topic).await?; context.run(async move { loop { - let Ok((_, message)) = message_subscription.read().await else { + let Ok((_, message)) = message_subscription.read_ignoring_rollbacks().await + else { return; }; match message.as_ref() { diff --git a/modules/spo_state/src/spo_state.rs b/modules/spo_state/src/spo_state.rs index d9ca05c6b..44392dd49 100644 --- a/modules/spo_state/src/spo_state.rs +++ b/modules/spo_state/src/spo_state.rs @@ -1,6 +1,8 @@ //! Acropolis SPO state module for Caryatid //! Accepts certificate events and derives the SPO state in memory +use acropolis_common::caryatid::SubscriptionExt; +use acropolis_common::messages::StateTransitionMessage; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ ledger_state::SPOState as LedgerSPOState, @@ -111,17 +113,10 @@ impl SPOState { let mut state = history.lock().await.get_or_init_with(|| State::new(store_config)); let mut current_block: Option = None; - // read per-block topics in parallel - let certs_message_f = certificates_subscription.read(); - let block_message_f = block_subscription.read(); - let withdrawals_message_f = withdrawals_subscription.as_mut().map(|s| s.read()); - let governance_message_f = governance_subscription.as_mut().map(|s| s.read()); - let stake_deltas_message_f = stake_deltas_subscription.as_mut().map(|s| s.read()); - // Use certs_message as the synchroniser - let (_, certs_message) = certs_message_f.await?; + let (_, certs_message) = certificates_subscription.read().await?; let new_epoch = match certs_message.as_ref() { - Message::Cardano((block_info, _)) => { + Message::Cardano((block_info, CardanoMessage::TxCertificates(_))) => { // Handle rollbacks on this topic only if block_info.status == BlockStatus::RolledBack { state = history.lock().await.get_rolled_back_state(block_info.number); @@ -132,6 +127,14 @@ impl SPOState { block_info.new_epoch && block_info.epoch > 0 } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + spo_state_publisher.publish_rollback(certs_message.clone()).await?; + false + } + _ => { error!("Unexpected message type: {certs_message:?}"); false @@ -140,7 +143,7 @@ impl SPOState { // handle blocks (handle_mint) before handle_tx_certs // in case of epoch boundary - let (_, block_message) = block_message_f.await?; + let (_, block_message) = block_subscription.read_ignoring_rollbacks().await?; match block_message.as_ref() { Message::Cardano((block_info, CardanoMessage::BlockAvailable(block_msg))) => { let span = @@ -206,19 +209,20 @@ impl SPOState { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + // Do nothing, we handled rollback earlier + } + _ => error!("Unexpected message type: {certs_message:?}"), }; // read from epoch-boundary messages only when it's a new epoch if new_epoch { - let spdd_message_f = spdd_subscription.read(); - let spo_rewards_message_f = spo_rewards_subscription.as_mut().map(|s| s.read()); - let ea_message_f = epoch_activity_subscription.read(); - let stake_reward_deltas_message_f = - stake_reward_deltas_subscription.as_mut().map(|s| s.read()); - // Handle SPDD - let (_, spdd_message) = spdd_message_f.await?; + let (_, spdd_message) = spdd_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano(( block_info, CardanoMessage::SPOStakeDistribution(spdd_message), @@ -233,8 +237,9 @@ impl SPOState { } // Handle SPO rewards - if let Some(spo_rewards_message_f) = spo_rewards_message_f { - let (_, spo_rewards_message) = spo_rewards_message_f.await?; + if let Some(spo_rewards_subscription) = spo_rewards_subscription.as_mut() { + let (_, spo_rewards_message) = + spo_rewards_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano(( block_info, CardanoMessage::SPORewards(spo_rewards_message), @@ -251,8 +256,11 @@ impl SPOState { } // Handle Stake Reward Deltas - if let Some(stake_reward_deltas_message_f) = stake_reward_deltas_message_f { - let (_, stake_reward_deltas_message) = stake_reward_deltas_message_f.await?; + if let Some(stake_reward_deltas_subscription) = + stake_reward_deltas_subscription.as_mut() + { + let (_, stake_reward_deltas_message) = + stake_reward_deltas_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano(( block_info, CardanoMessage::StakeRewardDeltas(stake_reward_deltas_message), @@ -274,7 +282,7 @@ impl SPOState { } // Handle EpochActivityMessage - let (_, ea_message) = ea_message_f.await?; + let (_, ea_message) = epoch_activity_subscription.read_ignoring_rollbacks().await?; if let Message::Cardano(( block_info, CardanoMessage::EpochActivity(epoch_activity_message), @@ -300,8 +308,8 @@ impl SPOState { } // Handle withdrawals - if let Some(withdrawals_message_f) = withdrawals_message_f { - let (_, message) = withdrawals_message_f.await?; + if let Some(withdrawals_subscription) = withdrawals_subscription.as_mut() { + let (_, message) = withdrawals_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano(( block_info, @@ -325,8 +333,8 @@ impl SPOState { } // Handle stake deltas - if let Some(stake_deltas_message_f) = stake_deltas_message_f { - let (_, message) = stake_deltas_message_f.await?; + if let Some(stake_deltas_subscription) = stake_deltas_subscription.as_mut() { + let (_, message) = stake_deltas_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano(( block_info, @@ -350,8 +358,8 @@ impl SPOState { } // Handle governance - if let Some(governance_message_f) = governance_message_f { - let (_, message) = governance_message_f.await?; + if let Some(governance_subscription) = governance_subscription.as_mut() { + let (_, message) = governance_subscription.read_ignoring_rollbacks().await?; match message.as_ref() { Message::Cardano(( block_info, diff --git a/modules/spo_state/src/spo_state_publisher.rs b/modules/spo_state/src/spo_state_publisher.rs index 700612071..f790e736d 100644 --- a/modules/spo_state/src/spo_state_publisher.rs +++ b/modules/spo_state/src/spo_state_publisher.rs @@ -1,24 +1,23 @@ -use acropolis_common::messages::Message; +use acropolis_common::{caryatid::RollbackAwarePublisher, messages::Message}; use caryatid_sdk::Context; use std::sync::Arc; /// Message publisher for SPO State -pub struct SPOStatePublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: String, -} +pub struct SPOStatePublisher(RollbackAwarePublisher); impl SPOStatePublisher { /// Construct with context and topic to publish on pub fn new(context: Arc>, topic: String) -> Self { - Self { context, topic } + Self(RollbackAwarePublisher::new(context, topic)) } /// Publish the DRep Delegation Distribution pub async fn publish(&mut self, message: Arc) -> anyhow::Result<()> { - self.context.message_bus.publish(&self.topic, message).await + self.0.publish(message).await + } + + /// Publish a rollback message, if we have anything to roll back + pub async fn publish_rollback(&mut self, message: Arc) -> anyhow::Result<()> { + self.0.publish(message).await } } diff --git a/modules/stake_delta_filter/src/stake_delta_filter.rs b/modules/stake_delta_filter/src/stake_delta_filter.rs index b2d42fb0b..caafcfc4a 100644 --- a/modules/stake_delta_filter/src/stake_delta_filter.rs +++ b/modules/stake_delta_filter/src/stake_delta_filter.rs @@ -2,7 +2,8 @@ //! Reads address deltas and filters out only stake addresses from it; also resolves pointer addresses. use acropolis_common::{ - messages::{CardanoMessage, Message}, + caryatid::SubscriptionExt, + messages::{CardanoMessage, Message, StateTransitionMessage}, NetworkId, }; use anyhow::{anyhow, Result}; @@ -162,7 +163,7 @@ impl StakeDeltaFilter { let mut subscription = params.context.subscribe(¶ms.clone().address_delta_topic).await?; params.context.clone().run(async move { - let publisher = DeltaPublisher::new(params.clone()); + let mut publisher = DeltaPublisher::new(params.clone()); loop { let Ok((_, message)) = subscription.read().await else { @@ -185,6 +186,16 @@ impl StakeDeltaFilter { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + publisher + .publish_rollback(message) + .await + .unwrap_or_else(|e| error!("Publish error: {e}")); + } + msg => error!( "Unexpected message type for {}: {msg:?}", ¶ms.address_delta_topic @@ -208,7 +219,7 @@ impl StakeDeltaFilter { let mut subscription = params.context.subscribe(¶ms.tx_certificates_topic).await?; params.clone().context.run(async move { loop { - let Ok((_, message)) = subscription.read().await else { + let Ok((_, message)) = subscription.read_ignoring_rollbacks().await else { return; }; match message.as_ref() { @@ -261,6 +272,18 @@ impl StakeDeltaFilter { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + let mut state = state_deltas.lock().await; + state + .handle_rollback(message) + .await + .inspect_err(|e| error!("Messaging handling error: {e}")) + .ok(); + } + _ => error!("Unexpected message type for {}: {message:?}", &topic), } } diff --git a/modules/stake_delta_filter/src/state.rs b/modules/stake_delta_filter/src/state.rs index c898ba0a6..9f3929114 100644 --- a/modules/stake_delta_filter/src/state.rs +++ b/modules/stake_delta_filter/src/state.rs @@ -2,6 +2,7 @@ use crate::StakeDeltaFilterParams; use crate::{process_message, PointerCache, Tracker}; +use acropolis_common::caryatid::RollbackAwarePublisher; use acropolis_common::{ messages::{ AddressDeltasMessage, CardanoMessage, Message, StakeAddressDeltasMessage, @@ -24,17 +25,18 @@ pub struct PointerOccurrence { pub occurrence: HashMap, BlockInfo, Address)>>, } -pub struct DeltaPublisher { - pub params: Arc, -} +pub struct DeltaPublisher(RollbackAwarePublisher); impl DeltaPublisher { pub fn new(params: Arc) -> Self { - Self { params } + Self(RollbackAwarePublisher::new( + params.context.clone(), + params.stake_address_delta_topic.clone(), + )) } pub async fn publish( - &self, + &mut self, block: &BlockInfo, message: StakeAddressDeltasMessage, ) -> Result<()> { @@ -42,16 +44,11 @@ impl DeltaPublisher { block.clone(), CardanoMessage::StakeAddressDeltas(message), ))); - let params = self.params.clone(); - - params - .context - .message_bus - .publish(¶ms.stake_address_delta_topic, packed_message) - .await - .unwrap_or_else(|e| tracing::error!("Failed to publish: {e}")); + self.0.publish(packed_message).await + } - Ok(()) + pub async fn publish_rollback(&mut self, message: Arc) -> Result<()> { + self.0.publish(message).await } } @@ -98,6 +95,10 @@ impl State { Ok(()) } + pub async fn handle_rollback(&mut self, message: Arc) -> Result<()> { + self.delta_publisher.publish_rollback(message).await + } + pub fn new(params: Arc) -> Self { Self { pointer_cache: PointerCache::new(), diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 296be7e3d..183a95863 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -5,7 +5,7 @@ use acropolis_codec::*; use acropolis_common::{ messages::{ AssetDeltasMessage, BlockTxsMessage, CardanoMessage, GovernanceProceduresMessage, Message, - TxCertificatesMessage, UTXODeltasMessage, WithdrawalsMessage, + StateTransitionMessage, TxCertificatesMessage, UTXODeltasMessage, WithdrawalsMessage, }, *, }; @@ -466,6 +466,39 @@ impl TxUnpacker { }.instrument(span).await; } + Message::Cardano((_, CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)))) => { + let mut futures = Vec::new(); + if let Some(ref topic) = publish_utxo_deltas_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + if let Some(ref topic) = publish_asset_deltas_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + if let Some(ref topic) = publish_withdrawals_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + if let Some(ref topic) = publish_certificates_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + if let Some(ref topic) = publish_governance_procedures_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + if let Some(ref topic) = publish_block_txs_topic { + futures.push(context.message_bus.publish(topic, message.clone())); + } + + join_all(futures) + .await + .into_iter() + .filter_map(Result::err) + .for_each(|e| error!("Failed to publish: {e}")); + } + _ => error!("Unexpected message type: {message:?}") } } diff --git a/modules/utxo_state/src/address_delta_publisher.rs b/modules/utxo_state/src/address_delta_publisher.rs index b0ae78ced..6cf10e340 100644 --- a/modules/utxo_state/src/address_delta_publisher.rs +++ b/modules/utxo_state/src/address_delta_publisher.rs @@ -1,5 +1,6 @@ //! Address delta publisher for the UTXO state Acropolis module use acropolis_common::{ + caryatid::RollbackAwarePublisher, messages::{AddressDeltasMessage, CardanoMessage, Message}, AddressDelta, BlockInfo, }; @@ -14,23 +15,22 @@ use crate::state::AddressDeltaObserver; /// Address delta publisher pub struct AddressDeltaPublisher { - /// Module context - context: Arc>, - - /// Topic to publish on - topic: Option, - /// Accumulating deltas for the current block deltas: Mutex>, + + /// Publisher + publisher: Option>>, } impl AddressDeltaPublisher { /// Create pub fn new(context: Arc>, config: Arc) -> Self { Self { - context, - topic: config.get_string("address-delta-topic").ok(), deltas: Mutex::new(Vec::new()), + publisher: config + .get_string("address-delta-topic") + .ok() + .map(|topic| Mutex::new(RollbackAwarePublisher::new(context, topic))), } } } @@ -51,7 +51,7 @@ impl AddressDeltaObserver for AddressDeltaPublisher { async fn finalise_block(&self, block: &BlockInfo) { // Send out the accumulated deltas - if let Some(topic) = &self.topic { + if let Some(publisher) = &self.publisher { let mut deltas = self.deltas.lock().await; let message = AddressDeltasMessage { deltas: std::mem::take(&mut *deltas), @@ -59,11 +59,23 @@ impl AddressDeltaObserver for AddressDeltaPublisher { let message_enum = Message::Cardano((block.clone(), CardanoMessage::AddressDeltas(message))); - self.context - .message_bus - .publish(topic, Arc::new(message_enum)) + publisher + .lock() + .await + .publish(Arc::new(message_enum)) .await .unwrap_or_else(|e| error!("Failed to publish: {e}")); } } + + async fn rollback(&self, message: Arc) { + if let Some(publisher) = &self.publisher { + publisher + .lock() + .await + .publish(message) + .await + .unwrap_or_else(|e| error!("Failed to publish rollback: {e}")); + } + } } diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index f9c24c47d..88d03fc14 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -1,5 +1,6 @@ //! Acropolis UTXOState: State storage use crate::volatile_index::VolatileIndex; +use acropolis_common::messages::Message; use acropolis_common::{ messages::UTXODeltasMessage, params::SECURITY_PARAMETER_K, BlockInfo, BlockStatus, TxOutput, }; @@ -23,6 +24,9 @@ pub trait AddressDeltaObserver: Send + Sync { /// Finalise a block async fn finalise_block(&self, block: &BlockInfo); + + /// handle rollback + async fn rollback(&self, message: Arc); } /// Immutable UTXO store @@ -379,6 +383,13 @@ impl State { Ok(()) } + + pub async fn handle_rollback(&mut self, message: Arc) -> Result<()> { + if let Some(observer) = self.address_delta_observer.as_mut() { + observer.rollback(message).await; + } + Ok(()) + } } /// Internal helper used during `handle` aggregation for summing UTxO deltas. @@ -805,6 +816,8 @@ mod tests { } async fn finalise_block(&self, _block: &BlockInfo) {} + + async fn rollback(&self, _msg: Arc) {} } #[tokio::test] diff --git a/modules/utxo_state/src/utxo_state.rs b/modules/utxo_state/src/utxo_state.rs index bcd93b277..c5f398539 100644 --- a/modules/utxo_state/src/utxo_state.rs +++ b/modules/utxo_state/src/utxo_state.rs @@ -2,7 +2,7 @@ //! Accepts UTXO events and derives the current ledger state in memory use acropolis_common::{ - messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage}, queries::utxos::{UTxOStateQuery, UTxOStateQueryResponse, DEFAULT_UTXOS_QUERY_TOPIC}, }; use caryatid_sdk::{module, Context}; @@ -101,6 +101,18 @@ impl UTXOState { .await; } + Message::Cardano(( + _, + CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)), + )) => { + let mut state = state1.lock().await; + state + .handle_rollback(message) + .await + .inspect_err(|e| error!("Rollback handling error: {e}")) + .ok(); + } + _ => error!("Unexpected message type: {message:?}"), } }