Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c8bdaea
feat: use explicit Point type
SupernaviX Nov 25, 2025
bb922e1
feat: forward rollbacks over cardano.block.proposed
SupernaviX Nov 25, 2025
072e47c
feat: handle rollbacks on cardano.epoch.activity
SupernaviX Nov 25, 2025
e5174b1
feat: handle rollbacks on cardano.txs
SupernaviX Nov 25, 2025
081cd94
feat: handle rollbacks on cardano.utxo.deltas
SupernaviX Nov 25, 2025
6f0b25d
feat: handle rollbacks on cardano.asset.deltas
SupernaviX Nov 25, 2025
b4fc28c
feat: handle rollbacks on cardano.withdrawals
SupernaviX Nov 25, 2025
a41d394
feat: handle rollbacks on cardano.certificates
SupernaviX Nov 25, 2025
0566222
feat: handle rollbacks on cardano.stake.deltas
SupernaviX Nov 26, 2025
ff13036
feat: handle rollbacks on cardano.governance
SupernaviX Nov 26, 2025
318b61a
feat: handle rollbacks on cardano.block.txs
SupernaviX Nov 26, 2025
8592f5c
feat: handle rollbacks on cardano.address.delta
SupernaviX Nov 26, 2025
f14aba2
feat: handle rollbacks on cardano.drep.distribution
SupernaviX Nov 26, 2025
2374fe5
feat: handle rollbacks on cardano.spo.distribution
SupernaviX Nov 26, 2025
3ec902f
feat: handle rollbacks on cardano.spo.rewards
SupernaviX Nov 26, 2025
5e35bd7
feat: handle rollbacks on cardano.stake.reward.deltas
SupernaviX Nov 26, 2025
3db893b
feat: handle rollbacks on cardano.drep.state
SupernaviX Nov 26, 2025
ae2fe9b
feat: handle rollbacks on cardano.spo.state
SupernaviX Nov 26, 2025
6f88664
feat: handle rollbacks on cardano.enact.state
SupernaviX Nov 26, 2025
d8832c0
feat: handle rollbacks on cardano.protocol.parameters
SupernaviX Nov 26, 2025
48d0c2a
fix: use helper consistently
SupernaviX Nov 26, 2025
61cd082
fix: remove duplicate code
SupernaviX Nov 26, 2025
a763b47
feat: replace Rollback with StateTransition
SupernaviX Nov 26, 2025
3b39b87
fix: address comments
SupernaviX Nov 28, 2025
605e6e4
Merge branch 'main' into sg/chain-sync-tweaks
SupernaviX Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions common/src/caryatid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::sync::Arc;

use anyhow::Result;
use caryatid_sdk::{async_trait, Context, MessageBounds, Subscription};

use crate::messages::{CardanoMessage, Message, StateTransitionMessage};

#[async_trait]
pub trait SubscriptionExt<M: MessageBounds> {
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<M>)>;
}

#[async_trait]
impl SubscriptionExt<Message> for Box<dyn Subscription<Message>> {
async fn read_ignoring_rollbacks(&mut self) -> Result<(String, Arc<Message>)> {
loop {
let (stream, message) = self.read().await?;
if matches!(
message.as_ref(),
Message::Cardano((
_,
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_))
))
) {
continue;
}
break Ok((stream, message));
}
}
}

/// A utility to publish messages, which will only publish rollback messages if some work has been rolled back
pub struct RollbackAwarePublisher<M: MessageBounds> {
/// Module context
context: Arc<Context<M>>,

/// Topic to publish on
topic: String,

// At which slot did we publish our last non-rollback message
last_activity_at: Option<u64>,
}

impl RollbackAwarePublisher<Message> {
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self {
context,
topic,
last_activity_at: None,
}
}

pub async fn publish(&mut self, message: Arc<Message>) -> Result<()> {
match message.as_ref() {
Message::Cardano((
block,
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
)) => {
if self.last_activity_at.is_some_and(|slot| slot >= block.slot) {
self.last_activity_at = None;
self.context.publish(&self.topic, message).await?;
}
Ok(())
}
Message::Cardano((block, _)) => {
self.last_activity_at = Some(block.slot);
self.context.publish(&self.topic, message).await
}
_ => self.context.publish(&self.topic, message).await,
}
}
}
4 changes: 2 additions & 2 deletions common/src/commands/chain_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{BlockHash, Slot};
use crate::Point;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ChainSyncCommand {
FindIntersect { slot: Slot, hash: BlockHash },
FindIntersect(Point),
}
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod address;
pub mod calculations;
pub mod caryatid;
pub mod cbor;
pub mod cip19;
pub mod commands;
Expand Down
11 changes: 8 additions & 3 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Definition of Acropolis messages

// We don't use these messages in the acropolis_common crate itself
#![allow(dead_code)]

use crate::commands::chain_sync::ChainSyncCommand;
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
use crate::genesis_values::GenesisValues;
Expand Down Expand Up @@ -45,6 +42,13 @@ pub struct RawBlockMessage {
pub body: Vec<u8>,
}

/// Rollback message
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum StateTransitionMessage {
/// The chain has been rolled back to a specific point
Rollback(Point),
}

/// Snapshot completion message
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SnapshotCompleteMessage {
Expand Down Expand Up @@ -303,6 +307,7 @@ pub struct SPOStateMessage {
#[allow(clippy::large_enum_variant)]
pub enum CardanoMessage {
BlockAvailable(RawBlockMessage), // Block body available
StateTransition(StateTransitionMessage), // Our position on the chain has changed
BlockValidation(ValidationStatus), // Result of a block validation
SnapshotComplete, // Mithril snapshot loaded
ReceivedTxs(RawTxsMessage), // Transaction available
Expand Down
11 changes: 11 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,17 @@ impl TxOutRef {
/// Slot
pub type Slot = u64;

/// Point on the chain
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq)]
pub enum Point {
#[default]
Origin,
Specific {
hash: BlockHash,
slot: Slot,
},
}

/// Amount of Ada, in Lovelace
pub type Lovelace = u64;
pub type LovelaceDelta = i64;
Expand Down
45 changes: 27 additions & 18 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//! Manages stake and reward accounts state

use acropolis_common::{
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
caryatid::SubscriptionExt,
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse, StateTransitionMessage},
queries::accounts::{DrepDelegators, PoolDelegators, DEFAULT_ACCOUNTS_QUERY_TOPIC},
state_history::{StateHistory, StateHistoryStore},
BlockInfo, BlockStatus,
Expand Down Expand Up @@ -120,17 +121,13 @@ impl AccountsState {
// Get a mutable state
let mut state = history.lock().await.get_current_state();

// Read per-block topics in parallel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see you're cleaning this up too, nice!

let certs_message_f = certs_subscription.read();
let stake_message_f = stake_subscription.read();
let withdrawals_message_f = withdrawals_subscription.read();
let mut current_block: Option<BlockInfo> = None;

// Use certs_message as the synchroniser, but we have to handle it after the
// epoch things, because they apply to the new epoch, not the last
let (_, certs_message) = certs_message_f.await?;
let (_, certs_message) = certs_subscription.read().await?;
let new_epoch = match certs_message.as_ref() {
Message::Cardano((block_info, _)) => {
Message::Cardano((block_info, CardanoMessage::TxCertificates(_))) => {
// Handle rollbacks on this topic only
if block_info.status == BlockStatus::RolledBack {
state = history.lock().await.get_rolled_back_state(block_info.number);
Expand All @@ -139,6 +136,16 @@ impl AccountsState {
current_block = Some(block_info.clone());
block_info.new_epoch && block_info.epoch > 0
}
Message::Cardano((
_,
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
)) => {
drep_publisher.publish_rollback(certs_message.clone()).await?;
spo_publisher.publish_rollback(certs_message.clone()).await?;
spo_rewards_publisher.publish_rollback(certs_message.clone()).await?;
stake_reward_deltas_publisher.publish_rollback(certs_message.clone()).await?;
false
}
_ => false,
};

Expand All @@ -149,18 +156,13 @@ impl AccountsState {

// Read from epoch-boundary messages only when it's a new epoch
if new_epoch {
let dreps_message_f = drep_state_subscription.read();
let spos_message_f = spos_subscription.read();
let ea_message_f = ea_subscription.read();
let params_message_f = parameters_subscription.read();

let spdd_store_guard = match spdd_store.as_ref() {
Some(s) => Some(s.lock().await),
None => None,
};

// Handle DRep
let (_, message) = dreps_message_f.await?;
let (_, message) = drep_state_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::DRepState(dreps_msg))) => {
let span = info_span!(
Expand All @@ -184,7 +186,7 @@ impl AccountsState {
}

// Handle SPOs
let (_, message) = spos_message_f.await?;
let (_, message) = spos_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::SPOState(spo_msg))) => {
let span =
Expand Down Expand Up @@ -219,7 +221,7 @@ impl AccountsState {
_ => error!("Unexpected message type: {message:?}"),
}

let (_, message) = params_message_f.await?;
let (_, message) = parameters_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::ProtocolParams(params_msg))) => {
let span = info_span!(
Expand All @@ -241,7 +243,7 @@ impl AccountsState {
}

// Handle epoch activity
let (_, message) = ea_message_f.await?;
let (_, message) = ea_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::EpochActivity(ea_msg))) => {
let span = info_span!(
Expand Down Expand Up @@ -297,11 +299,18 @@ impl AccountsState {
.await;
}

Message::Cardano((
_,
CardanoMessage::StateTransition(StateTransitionMessage::Rollback(_)),
)) => {
// Ignore this, we already handled rollbacks
}

_ => error!("Unexpected message type: {certs_message:?}"),
}

// Handle withdrawals
let (_, message) = withdrawals_message_f.await?;
let (_, message) = withdrawals_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::Withdrawals(withdrawals_msg))) => {
let span = info_span!(
Expand All @@ -323,7 +332,7 @@ impl AccountsState {
}

// Handle stake address deltas
let (_, message) = stake_message_f.await?;
let (_, message) = stake_subscription.read_ignoring_rollbacks().await?;
match message.as_ref() {
Message::Cardano((block_info, CardanoMessage::StakeAddressDeltas(deltas_msg))) => {
let span = info_span!(
Expand Down
36 changes: 16 additions & 20 deletions modules/accounts_state/src/drep_distribution_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use acropolis_common::caryatid::RollbackAwarePublisher;
use acropolis_common::messages::{
CardanoMessage, DRepDelegationDistribution, DRepStakeDistributionMessage, Message,
};
Expand All @@ -6,18 +7,12 @@ use caryatid_sdk::Context;
use std::sync::Arc;

/// Message publisher for DRep Delegation Distribution (DRDD)
pub struct DRepDistributionPublisher {
/// Module context
context: Arc<Context<Message>>,

/// Topic to publish on
topic: String,
}
pub struct DRepDistributionPublisher(RollbackAwarePublisher<Message>);

impl DRepDistributionPublisher {
/// Construct with context and topic to publish on
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self { context, topic }
Self(RollbackAwarePublisher::new(context, topic))
}

/// Publish the DRep Delegation Distribution
Expand All @@ -26,18 +21,19 @@ impl DRepDistributionPublisher {
block: &BlockInfo,
drdd: DRepDelegationDistribution,
) -> anyhow::Result<()> {
self.context
.message_bus
.publish(
&self.topic,
Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
epoch: block.epoch,
drdd,
}),
))),
)
self.0
.publish(Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::DRepStakeDistribution(DRepStakeDistributionMessage {
epoch: block.epoch,
drdd,
}),
))))
.await
}

/// Publish a rollback message, if we have anything to roll back
pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
self.0.publish(message).await
}
}
36 changes: 16 additions & 20 deletions modules/accounts_state/src/spo_distribution_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
use acropolis_common::caryatid::RollbackAwarePublisher;
use acropolis_common::messages::{CardanoMessage, Message, SPOStakeDistributionMessage};
use acropolis_common::{BlockInfo, DelegatedStake, PoolId};
use caryatid_sdk::Context;
use std::collections::BTreeMap;
use std::sync::Arc;

/// Message publisher for Stake Pool Delegation Distribution (SPDD)
pub struct SPODistributionPublisher {
/// Module context
context: Arc<Context<Message>>,

/// Topic to publish on
topic: String,
}
pub struct SPODistributionPublisher(RollbackAwarePublisher<Message>);

impl SPODistributionPublisher {
/// Construct with context and topic to publish on
pub fn new(context: Arc<Context<Message>>, topic: String) -> Self {
Self { context, topic }
Self(RollbackAwarePublisher::new(context, topic))
}

/// Publish the SPDD
Expand All @@ -25,18 +20,19 @@ impl SPODistributionPublisher {
block: &BlockInfo,
spos: BTreeMap<PoolId, DelegatedStake>,
) -> anyhow::Result<()> {
self.context
.message_bus
.publish(
&self.topic,
Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
epoch: block.epoch - 1, // End of the previous epoch
spos: spos.into_iter().collect(),
}),
))),
)
self.0
.publish(Arc::new(Message::Cardano((
block.clone(),
CardanoMessage::SPOStakeDistribution(SPOStakeDistributionMessage {
epoch: block.epoch - 1, // End of the previous epoch
spos: spos.into_iter().collect(),
}),
))))
.await
}

/// Publish a rollback message, if we have anything to roll back
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some trick to make this common across all publishers? In C++ it would just be a RollbackEnabledPublisher superclass (I miss inheritance sometimes!)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no formal concept of a "publisher" in this codebase at all, just a bunch of existing structs named Publisher with similar logic. I moved that logic into a RollbackAwarePublisher struct, which all of the existing structs are now just wrappers for.

pub async fn publish_rollback(&mut self, message: Arc<Message>) -> anyhow::Result<()> {
self.0.publish(message).await
}
}
Loading