Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 7 additions & 24 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use acropolis_common::{
BlockInfo, BlockStatus,
};
use anyhow::Result;
use bigdecimal::Zero;
use caryatid_sdk::{message_bus::Subscription, module, Context, Module};
use config::Config;
use std::sync::Arc;
Expand Down Expand Up @@ -49,9 +50,8 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards";
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";
const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas";

const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false);
const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db");
const DEFAULT_SPDD_RETENTION_EPOCHS: &str = "spdd-retention-epochs";
const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0);

/// Accounts State module
#[module(
Expand Down Expand Up @@ -420,11 +420,6 @@ impl AccountsState {
.unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string());
info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'");

// store spdd history config
let store_spdd_history =
config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1);
info!("Store SPDD history: {}", store_spdd_history);

let spdd_db_path =
config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string());

Expand All @@ -437,25 +432,13 @@ impl AccountsState {
current_dir.join(&spdd_db_path).to_string_lossy().to_string()
};

// Get retention epochs configuration (None = unlimited)
// Get SPDD retention epochs configuration
let spdd_retention_epochs = config
.get_int(DEFAULT_SPDD_RETENTION_EPOCHS)
.ok()
.and_then(|v| if v > 0 { Some(v as u64) } else { None });
.get_int(DEFAULT_SPDD_RETENTION_EPOCHS.0)
.unwrap_or(DEFAULT_SPDD_RETENTION_EPOCHS.1 as i64)
.max(0) as u64;
info!("SPDD retention epochs: {:?}", spdd_retention_epochs);

if store_spdd_history {
info!("SPDD database path: {}", spdd_db_path);
match spdd_retention_epochs {
Some(epochs) => info!(
"SPDD retention: {} epochs (~{} GB max)",
epochs,
(epochs as f64 * 0.12).ceil()
),
None => info!("SPDD retention: unlimited (no automatic pruning)"),
}
}

// Query topics
let accounts_query_topic = config
.get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0)
Expand Down Expand Up @@ -484,7 +467,7 @@ impl AccountsState {
let history_tick = history.clone();

// Spdd store
let spdd_store = if store_spdd_history {
let spdd_store = if !spdd_retention_epochs.is_zero() {
Some(Arc::new(Mutex::new(SPDDStore::load(
std::path::Path::new(&spdd_db_path),
spdd_retention_epochs,
Expand Down
26 changes: 9 additions & 17 deletions modules/accounts_state/src/spo_distribution_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,12 @@ pub struct SPDDStore {
/// Value: "complete"
epoch_markers: fjall::PartitionHandle,
/// Maximum number of epochs to retain (None = unlimited)
retention_epochs: Option<u64>,
retention_epochs: u64,
}

impl SPDDStore {
#[allow(dead_code)]
pub fn new(
path: impl AsRef<std::path::Path>,
retention_epochs: Option<u64>,
) -> fjall::Result<Self> {
pub fn new(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
let path = path.as_ref();
if path.exists() {
std::fs::remove_dir_all(path)?;
Expand All @@ -79,10 +76,7 @@ impl SPDDStore {
})
}

pub fn load(
path: impl AsRef<std::path::Path>,
retention_epochs: Option<u64>,
) -> fjall::Result<Self> {
pub fn load(path: impl AsRef<std::path::Path>, retention_epochs: u64) -> fjall::Result<Self> {
let path = path.as_ref();

let keyspace = Config::new(path).open()?;
Expand Down Expand Up @@ -137,11 +131,9 @@ impl SPDDStore {
let marker_key = encode_epoch_marker(epoch);
self.epoch_markers.insert(marker_key, b"complete")?;

if let Some(retention) = self.retention_epochs {
if epoch >= retention {
let keep_from_epoch = epoch - retention + 1;
self.prune_epochs_before(keep_from_epoch)?;
}
if epoch >= self.retention_epochs {
let keep_from_epoch = epoch - self.retention_epochs + 1;
self.prune_epochs_before(keep_from_epoch)?;
}

Ok(())
Expand Down Expand Up @@ -235,8 +227,8 @@ mod tests {

#[test]
fn test_store_spdd_state() {
let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None)
.expect("Failed to create SPDD store");
let mut spdd_store =
SPDDStore::new(std::path::Path::new(DB_PATH), 1).expect("Failed to create SPDD store");
let mut spdd_state: HashMap<PoolId, Vec<(AddrKeyhash, u64)>> = HashMap::new();
spdd_state.insert(
vec![0x01; 28],
Expand All @@ -258,7 +250,7 @@ mod tests {

#[test]
fn test_prune_old_epochs() {
let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2))
let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), 2)
.expect("Failed to create SPDD store");

for epoch in 1..=3 {
Expand Down
56 changes: 56 additions & 0 deletions modules/spdd_state/src/rest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use crate::state::State;
use acropolis_common::serialization::Bech32WithHrp;
use acropolis_common::DelegatedStake;
use acropolis_common::{extract_strict_query_params, messages::RESTResponse};
use anyhow::Result;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

/// Handles /spdd
pub async fn handle_spdd(
state: Arc<Mutex<State>>,
params: HashMap<String, String>,
) -> Result<RESTResponse> {
let locked = state.lock().await;

extract_strict_query_params!(params, {
"epoch" => epoch: Option<u64>,
});

let spdd_opt = match epoch {
Some(epoch) => match locked.get_epoch(epoch) {
Some(spdd) => Some(spdd),
None => {
return Ok(RESTResponse::with_text(
404,
&format!("SPDD not found for epoch {}", epoch),
));
}
},
None => locked.get_latest(),
};

if let Some(spdd) = spdd_opt {
let spdd: HashMap<String, DelegatedStake> = spdd
.iter()
.map(|(k, v)| {
(
k.to_bech32_with_hrp("pool").unwrap_or_else(|_| hex::encode(k)),
*v,
)
})
.collect();

match serde_json::to_string(&spdd) {
Ok(body) => Ok(RESTResponse::with_json(200, &body)),
Err(e) => Ok(RESTResponse::with_text(
500,
&format!(
"Internal server error retrieving stake pool delegation distribution: {e}"
),
)),
}
} else {
Ok(RESTResponse::with_json(200, "{}"))
}
}
16 changes: 16 additions & 0 deletions modules/spdd_state/src/spdd_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use acropolis_common::{
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
queries::spdd::{SPDDStateQuery, SPDDStateQueryResponse, DEFAULT_SPDD_QUERY_TOPIC},
rest_helper::handle_rest_with_query_parameters,
};
use anyhow::Result;
use caryatid_sdk::{module, Context, Module};
Expand All @@ -12,8 +13,11 @@ use tokio::sync::Mutex;
use tracing::{error, info, info_span, Instrument};
mod state;
use state::State;
mod rest;
use rest::handle_spdd;

const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution";
const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd");
const DEFAULT_STORE_SPDD: (&str, bool) = ("store-spdd", false);

/// SPDD State module
Expand All @@ -32,6 +36,12 @@ impl SPDDState {
config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string());
info!("Creating subscriber on '{subscribe_topic}'");

// REST topic (not included in BF)
let handle_spdd_topic = config
.get_string(DEFAULT_HANDLE_SPDD_TOPIC.0)
.unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string());
info!("Creating request handler on '{}'", handle_spdd_topic);

// Query topic
let spdd_query_topic = config
.get_string(DEFAULT_SPDD_QUERY_TOPIC.0)
Expand All @@ -43,6 +53,12 @@ impl SPDDState {
let state_opt = if store_spdd {
let state = Arc::new(Mutex::new(State::new()));

// Register /spdd REST endpoint
let state_rest = state.clone();
handle_rest_with_query_parameters(context.clone(), &handle_spdd_topic, move |params| {
handle_spdd(state_rest.clone(), params)
});

// Subscribe for spdd messages from accounts_state
let state_handler = state.clone();
let mut message_subscription = context.subscribe(&subscribe_topic).await?;
Expand Down
9 changes: 2 additions & 7 deletions processes/omnibus/omnibus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,9 @@ write-full-cache = "false"
store-history = false

[module.accounts-state]
# Store SPDD history
# Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints
store-spdd-history = false
# SPDD database path (only used when store-spdd-history is enabled)
# Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints
spdd-retention-epochs = 0
spdd-db-path = "./spdd_db"
# Number of epochs to retain in SPDD history
# Example: 73
spdd-retention-epochs = "none"
# Verify against captured CSV
verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv"
verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv"
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
MAINNET_DBSYNC_URL=
ACROPOLIS_REST_URL=http://127.0.0.1:4340
SPDD_VALIDATION_START_EPOCH=208
3 changes: 3 additions & 0 deletions tests/integration/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules/
package-lock.json
.env
38 changes: 38 additions & 0 deletions tests/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Running the SPDD validation test
1. Install dependencies
```bash
npm install
```

2. Set environment variables
Create a `.env` file in the `tests/integration/` with the following values:
```env
MAINNET_DBSYNC_URL=postgres://user:password@host:port/dbname
ACROPOLIS_REST_URL=https://your-acropolis-endpoint
SPDD_VALIDATION_START_EPOCH=208
```
If you do not currently operate a mainnet DB Sync server, Demeter provides free access to an instance

3. Enable SPDD storage in `omnibus.toml`
```toml
[module.spdd-state]
store-spdd = true
```

4. Start Omnibus process and wait for sync
```bash
cd ~/acropolis/processes/omnibus
cargo run --release --bin acropolis_process_omnibus
```

5. Run the validator
```bash
npm run test:spdd
```

6. Observe output
This validator will:
* Compare Acropolis SPDD data with DB Sync epoch stake data
* Display total stake and per-pool differences
* Pause for review when mismatches are detected
* Stop automatically when Acropolis stops returning data
20 changes: 20 additions & 0 deletions tests/integration/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "acropolis-integration-tests",
"version": "1.0.0",
"type": "module",
"private": true,
"scripts": {
"test:spdd": "ts-node spdd.test.ts"
},
"dependencies": {
"axios": "^1.12.2",
"dotenv": "^17.2.3",
"pg": "^8.16.3"
},
"devDependencies": {
"@types/node": "^22.5.2",
"@types/pg": "^8.15.5",
"ts-node": "^10.9.2",
"typescript": "^5.9.3"
}
}
Loading