diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 32fccd43..2d57d7a9 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -8,6 +8,7 @@ use acropolis_common::{ BlockInfo, BlockStatus, }; use anyhow::Result; +use bigdecimal::Zero; use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; use config::Config; use std::sync::Arc; @@ -49,9 +50,8 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; -const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false); const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); -const DEFAULT_SPDD_RETENTION_EPOCHS: &str = "spdd-retention-epochs"; +const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0); /// Accounts State module #[module( @@ -420,11 +420,6 @@ impl AccountsState { .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); - // store spdd history config - let store_spdd_history = - config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1); - info!("Store SPDD history: {}", store_spdd_history); - let spdd_db_path = config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); @@ -437,25 +432,13 @@ impl AccountsState { current_dir.join(&spdd_db_path).to_string_lossy().to_string() }; - // Get retention epochs configuration (None = unlimited) + // Get SPDD retention epochs configuration let spdd_retention_epochs = config - .get_int(DEFAULT_SPDD_RETENTION_EPOCHS) - .ok() - .and_then(|v| if v > 0 { Some(v as u64) } else { None }); + .get_int(DEFAULT_SPDD_RETENTION_EPOCHS.0) + .unwrap_or(DEFAULT_SPDD_RETENTION_EPOCHS.1 as i64) + .max(0) as u64; info!("SPDD retention epochs: {:?}", spdd_retention_epochs); - if store_spdd_history { - info!("SPDD database path: {}", spdd_db_path); - match spdd_retention_epochs { - Some(epochs) => info!( - "SPDD retention: {} epochs (~{} GB max)", - epochs, - (epochs as f64 * 0.12).ceil() - ), - None => info!("SPDD retention: unlimited (no automatic pruning)"), - } - } - // Query topics let accounts_query_topic = config .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) @@ -484,7 +467,7 @@ impl AccountsState { let history_tick = history.clone(); // Spdd store - let spdd_store = if store_spdd_history { + let spdd_store = if !spdd_retention_epochs.is_zero() { Some(Arc::new(Mutex::new(SPDDStore::load( std::path::Path::new(&spdd_db_path), spdd_retention_epochs, diff --git a/modules/accounts_state/src/spo_distribution_store.rs b/modules/accounts_state/src/spo_distribution_store.rs index 0dcde887..9ae3a783 100644 --- a/modules/accounts_state/src/spo_distribution_store.rs +++ b/modules/accounts_state/src/spo_distribution_store.rs @@ -52,15 +52,12 @@ pub struct SPDDStore { /// Value: "complete" epoch_markers: fjall::PartitionHandle, /// Maximum number of epochs to retain (None = unlimited) - retention_epochs: Option, + retention_epochs: u64, } impl SPDDStore { #[allow(dead_code)] - pub fn new( - path: impl AsRef, - retention_epochs: Option, - ) -> fjall::Result { + pub fn new(path: impl AsRef, retention_epochs: u64) -> fjall::Result { let path = path.as_ref(); if path.exists() { std::fs::remove_dir_all(path)?; @@ -79,10 +76,7 @@ impl SPDDStore { }) } - pub fn load( - path: impl AsRef, - retention_epochs: Option, - ) -> fjall::Result { + pub fn load(path: impl AsRef, retention_epochs: u64) -> fjall::Result { let path = path.as_ref(); let keyspace = Config::new(path).open()?; @@ -137,11 +131,9 @@ impl SPDDStore { let marker_key = encode_epoch_marker(epoch); self.epoch_markers.insert(marker_key, b"complete")?; - if let Some(retention) = self.retention_epochs { - if epoch >= retention { - let keep_from_epoch = epoch - retention + 1; - self.prune_epochs_before(keep_from_epoch)?; - } + if epoch >= self.retention_epochs { + let keep_from_epoch = epoch - self.retention_epochs + 1; + self.prune_epochs_before(keep_from_epoch)?; } Ok(()) @@ -235,8 +227,8 @@ mod tests { #[test] fn test_store_spdd_state() { - let mut spdd_store = SPDDStore::new(std::path::Path::new(DB_PATH), None) - .expect("Failed to create SPDD store"); + let mut spdd_store = + SPDDStore::new(std::path::Path::new(DB_PATH), 1).expect("Failed to create SPDD store"); let mut spdd_state: HashMap> = HashMap::new(); spdd_state.insert( vec![0x01; 28], @@ -258,7 +250,7 @@ mod tests { #[test] fn test_prune_old_epochs() { - let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), Some(2)) + let mut spdd_store = SPDDStore::new(std::path::Path::new("spdd_prune_test_db"), 2) .expect("Failed to create SPDD store"); for epoch in 1..=3 { diff --git a/modules/spdd_state/src/rest.rs b/modules/spdd_state/src/rest.rs new file mode 100644 index 00000000..4cc059dd --- /dev/null +++ b/modules/spdd_state/src/rest.rs @@ -0,0 +1,56 @@ +use crate::state::State; +use acropolis_common::serialization::Bech32WithHrp; +use acropolis_common::DelegatedStake; +use acropolis_common::{extract_strict_query_params, messages::RESTResponse}; +use anyhow::Result; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; + +/// Handles /spdd +pub async fn handle_spdd( + state: Arc>, + params: HashMap, +) -> Result { + let locked = state.lock().await; + + extract_strict_query_params!(params, { + "epoch" => epoch: Option, + }); + + let spdd_opt = match epoch { + Some(epoch) => match locked.get_epoch(epoch) { + Some(spdd) => Some(spdd), + None => { + return Ok(RESTResponse::with_text( + 404, + &format!("SPDD not found for epoch {}", epoch), + )); + } + }, + None => locked.get_latest(), + }; + + if let Some(spdd) = spdd_opt { + let spdd: HashMap = spdd + .iter() + .map(|(k, v)| { + ( + k.to_bech32_with_hrp("pool").unwrap_or_else(|_| hex::encode(k)), + *v, + ) + }) + .collect(); + + match serde_json::to_string(&spdd) { + Ok(body) => Ok(RESTResponse::with_json(200, &body)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!( + "Internal server error retrieving stake pool delegation distribution: {e}" + ), + )), + } + } else { + Ok(RESTResponse::with_json(200, "{}")) + } +} diff --git a/modules/spdd_state/src/spdd_state.rs b/modules/spdd_state/src/spdd_state.rs index 75b003dd..197c0234 100644 --- a/modules/spdd_state/src/spdd_state.rs +++ b/modules/spdd_state/src/spdd_state.rs @@ -3,6 +3,7 @@ use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::spdd::{SPDDStateQuery, SPDDStateQueryResponse, DEFAULT_SPDD_QUERY_TOPIC}, + rest_helper::handle_rest_with_query_parameters, }; use anyhow::Result; use caryatid_sdk::{module, Context, Module}; @@ -12,8 +13,11 @@ use tokio::sync::Mutex; use tracing::{error, info, info_span, Instrument}; mod state; use state::State; +mod rest; +use rest::handle_spdd; const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.spo.distribution"; +const DEFAULT_HANDLE_SPDD_TOPIC: (&str, &str) = ("handle-topic-spdd", "rest.get.spdd"); const DEFAULT_STORE_SPDD: (&str, bool) = ("store-spdd", false); /// SPDD State module @@ -32,6 +36,12 @@ impl SPDDState { config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); info!("Creating subscriber on '{subscribe_topic}'"); + // REST topic (not included in BF) + let handle_spdd_topic = config + .get_string(DEFAULT_HANDLE_SPDD_TOPIC.0) + .unwrap_or(DEFAULT_HANDLE_SPDD_TOPIC.1.to_string()); + info!("Creating request handler on '{}'", handle_spdd_topic); + // Query topic let spdd_query_topic = config .get_string(DEFAULT_SPDD_QUERY_TOPIC.0) @@ -43,6 +53,12 @@ impl SPDDState { let state_opt = if store_spdd { let state = Arc::new(Mutex::new(State::new())); + // Register /spdd REST endpoint + let state_rest = state.clone(); + handle_rest_with_query_parameters(context.clone(), &handle_spdd_topic, move |params| { + handle_spdd(state_rest.clone(), params) + }); + // Subscribe for spdd messages from accounts_state let state_handler = state.clone(); let mut message_subscription = context.subscribe(&subscribe_topic).await?; diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 5fdc2f0e..b6d8a7f1 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -83,14 +83,9 @@ write-full-cache = "false" store-history = false [module.accounts-state] -# Store SPDD history -# Enable /epochs/{number}/stakes, /epochs/{number}/stakes/{pool_id} endpoints -store-spdd-history = false -# SPDD database path (only used when store-spdd-history is enabled) +# Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints +spdd-retention-epochs = 0 spdd-db-path = "./spdd_db" -# Number of epochs to retain in SPDD history -# Example: 73 -spdd-retention-epochs = "none" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" diff --git a/tests/integration/.env.example b/tests/integration/.env.example new file mode 100644 index 00000000..7ecbf43b --- /dev/null +++ b/tests/integration/.env.example @@ -0,0 +1,3 @@ +MAINNET_DBSYNC_URL= +ACROPOLIS_REST_URL=http://127.0.0.1:4340 +SPDD_VALIDATION_START_EPOCH=208 \ No newline at end of file diff --git a/tests/integration/.gitignore b/tests/integration/.gitignore new file mode 100644 index 00000000..9a30b152 --- /dev/null +++ b/tests/integration/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +package-lock.json +.env \ No newline at end of file diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 00000000..c265642d --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,38 @@ +# Running the SPDD validation test +1. Install dependencies +```bash +npm install +``` + +2. Set environment variables +Create a `.env` file in the `tests/integration/` with the following values: +```env +MAINNET_DBSYNC_URL=postgres://user:password@host:port/dbname +ACROPOLIS_REST_URL=https://your-acropolis-endpoint +SPDD_VALIDATION_START_EPOCH=208 +``` +If you do not currently operate a mainnet DB Sync server, Demeter provides free access to an instance + +3. Enable SPDD storage in `omnibus.toml` +```toml +[module.spdd-state] +store-spdd = true +``` + +4. Start Omnibus process and wait for sync +```bash +cd ~/acropolis/processes/omnibus +cargo run --release --bin acropolis_process_omnibus +``` + +5. Run the validator +```bash +npm run test:spdd +``` + +6. Observe output +This validator will: +* Compare Acropolis SPDD data with DB Sync epoch stake data +* Display total stake and per-pool differences +* Pause for review when mismatches are detected +* Stop automatically when Acropolis stops returning data diff --git a/tests/integration/package.json b/tests/integration/package.json new file mode 100644 index 00000000..8427e0e2 --- /dev/null +++ b/tests/integration/package.json @@ -0,0 +1,20 @@ +{ + "name": "acropolis-integration-tests", + "version": "1.0.0", + "type": "module", + "private": true, + "scripts": { + "test:spdd": "ts-node spdd.test.ts" + }, + "dependencies": { + "axios": "^1.12.2", + "dotenv": "^17.2.3", + "pg": "^8.16.3" + }, + "devDependencies": { + "@types/node": "^22.5.2", + "@types/pg": "^8.15.5", + "ts-node": "^10.9.2", + "typescript": "^5.9.3" + } +} diff --git a/tests/integration/spdd.test.ts b/tests/integration/spdd.test.ts new file mode 100644 index 00000000..e64cb4bc --- /dev/null +++ b/tests/integration/spdd.test.ts @@ -0,0 +1,157 @@ +import "dotenv/config"; +import axios from "axios"; +import { Client } from "pg"; +import readline from "readline"; + +const ACROPOLIS_URL = process.env.ACROPOLIS_REST_URL!; +const DBSYNC_URL = process.env.MAINNET_DBSYNC_URL!; +if (!ACROPOLIS_URL || !DBSYNC_URL) { + throw new Error("Missing required environment variables ACROPOLIS_REST_URL or MAINNET_DBSYNC_URL"); +} + +const START_EPOCH = Number(process.env.SPDD_VALIDATION_START_EPOCH); +if (Number.isNaN(START_EPOCH)) { + throw new Error("SPDD_VALIDATION_START_EPOCH must be a number"); +} + +async function pause() { + const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); + + return new Promise((resolve) => { + rl.on("SIGINT", () => { + rl.close(); + process.exit(0); + }); + + rl.question("Press Enter to continue or Ctrl+C to stop...", () => { + rl.close(); + resolve(); + }); + }); +} + +async function queryDbSync(client: Client, epoch: number) { + const { rows } = await client.query( + ` + SELECT ph.view AS pool_id, SUM(es.amount)::bigint AS stake + FROM epoch_stake es + JOIN pool_hash ph ON es.pool_id = ph.id + WHERE es.epoch_no = ($1 + 2) + GROUP BY ph.view + `, + [epoch] + ); + return rows; +} + +async function queryAcropolis(epoch: number) { + const { data, status } = await axios.get(`${ACROPOLIS_URL}/spdd?epoch=${epoch}`, { + validateStatus: () => true, + }); + + if (status !== 200) { + throw new Error(`HTTP ${status}`); + } + + if (typeof data !== "object" || Array.isArray(data) || !data) + throw new Error("Invalid SPDD response"); + + const pools: { pool_id: string; stake: bigint }[] = []; + for (const [pool_id, info] of Object.entries(data)) { + if (!info || typeof info !== "object" || !("active" in info)) continue; + pools.push({ pool_id, stake: BigInt((info as any).active) }); + } + return pools; +} + +async function validateEpoch(db: Client, epoch: number) { + const dbPools = await queryDbSync(db, epoch); + if (!dbPools.length) { + console.log(`No db-sync data found for epoch ${epoch}. Exiting.`); + return false; + } + + const spddPools = await queryAcropolis(epoch); + const spddMap = new Map(spddPools.map((p) => [p.pool_id, p.stake])); + const dbMap = new Map(dbPools.map((p) => [p.pool_id, BigInt(p.stake)])); + + const dbTotal = dbPools.reduce((acc, p) => acc + BigInt(p.stake), 0n); + const apiTotal = spddPools.reduce((acc, p) => acc + p.stake, 0n); + + if (dbTotal !== apiTotal) { + console.log( + `❌ Total active stake mismatch for epoch ${epoch}:\n DB: ${dbTotal}\n SPDD: ${apiTotal}` + ); + } + + let matched = 0; + const missing: string[] = []; + const extra: string[] = []; + const mismatched: { id: string; db: bigint; spdd: bigint }[] = []; + + // Missing or mismatched pools in Acropolis SPDD + for (const { pool_id, stake } of dbPools) { + const expected = BigInt(stake); + const found = spddMap.get(pool_id); + if (found === undefined) missing.push(pool_id); + else if (found !== expected) mismatched.push({ id: pool_id, db: expected, spdd: found }); + else matched++; + } + + // Extra pools in Acropolis SPDD which do not exist in DB Sync + for (const { pool_id } of spddPools) { + if (!dbMap.has(pool_id)) extra.push(pool_id); + } + + const total = matched + mismatched.length + missing.length; + console.log( + `Epoch ${epoch}: ✅ ${matched} match, ⚠️ ${mismatched.length} mismatch, ❌ ${missing.length} missing, 🌀 ${extra.length} extra (total ${total})` + ); + + if (missing.length || mismatched.length || extra.length) { + if (missing.length) console.log(` Missing pools: ${missing.join(", ")}`); + if (extra.length) console.log(` Extra pools (in SPDD only): ${extra.join(", ")}`); + if (mismatched.length) { + console.log(` Mismatched pools:`); + for (const { id, db, spdd } of mismatched) { + console.log(` - ${id} (db: ${db}, spdd: ${spdd})`); + } + } + await pause(); + } + + return true; +} + +async function run() { + const db = new Client({ connectionString: DBSYNC_URL }); + await db.connect(); + + console.log("Validating Acropolis SPDD vs DB Sync...\n"); + + for (let epoch = START_EPOCH; ; epoch++) { + try { + const keepGoing = await validateEpoch(db, epoch); + if (!keepGoing) break; + } catch (err: any) { + if (err.message.startsWith("HTTP")) { + if (epoch == START_EPOCH) { + console.log(`store-spdd=false or sync has not reached epoch ${START_EPOCH}.`); + } else { + console.log(`Reached end of available epochs (${err.message}).`); + } + break; + } + console.error(`Stopping at epoch ${epoch}: ${err.message}`); + break; + } + } + + await db.end(); + console.log("\nFinished."); +} + +run().catch((err) => { + console.error(err.message || err); + process.exit(1); +});