Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions common/src/queries/addresses.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::queries::errors::QueryError;
use crate::{
Address, AddressTotals, NativeAssets, ShelleyAddress, TxIdentifier, UTxOIdentifier, ValueDelta,
};
use crate::{Address, AddressTotals, ShelleyAddress, TxIdentifier, UTxOIdentifier};

pub const DEFAULT_ADDRESS_QUERY_TOPIC: (&str, &str) =
("address-state-query-topic", "cardano.query.address");
Expand All @@ -13,7 +11,6 @@ pub enum AddressStateQuery {
GetAddressTransactions { address: Address },

// Accounts related queries
GetAddressesAssets { addresses: Vec<ShelleyAddress> },
GetAddressesTotals { addresses: Vec<ShelleyAddress> },
GetAddressesUTxOs { addresses: Vec<ShelleyAddress> },
}
Expand All @@ -25,8 +22,7 @@ pub enum AddressStateQueryResponse {
AddressTransactions(Vec<TxIdentifier>),

// Accounts related queries
AddressesAssets(NativeAssets),
AddressesTotals(ValueDelta),
AddressesTotals(AddressTotals),
AddressesUTxOs(Vec<UTxOIdentifier>),
Error(QueryError),
}
21 changes: 21 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,19 @@ pub struct ValueMap {
pub assets: NativeAssetsMap,
}

impl AddAssign for ValueMap {
fn add_assign(&mut self, other: Self) {
self.lovelace += other.lovelace;

for (policy, assets) in other.assets {
let entry = self.assets.entry(policy).or_default();
for (asset_name, amount) in assets {
*entry.entry(asset_name).or_default() += amount;
}
}
}
}

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct ValueDelta {
pub lovelace: i64,
Expand Down Expand Up @@ -2063,6 +2076,14 @@ pub struct AddressTotals {
pub tx_count: u64,
}

impl AddAssign for AddressTotals {
fn add_assign(&mut self, other: Self) {
self.sent += other.sent;
self.received += other.received;
self.tx_count += other.tx_count;
}
}

impl AddressTotals {
pub fn apply_delta(&mut self, delta: &ValueDelta) {
if delta.lovelace > 0 {
Expand Down
8 changes: 0 additions & 8 deletions modules/address_state/src/address_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,6 @@ impl AddressState {
)),
}
}
AddressStateQuery::GetAddressesAssets { addresses } => {
match state.get_addresses_assets(addresses).await {
Ok(assets) => AddressStateQueryResponse::AddressesAssets(assets),
Err(e) => AddressStateQueryResponse::Error(QueryError::internal_error(
e.to_string(),
)),
}
}
AddressStateQuery::GetAddressesTotals { addresses } => {
match state.get_addresses_totals(addresses).await {
Ok(totals) => AddressStateQueryResponse::AddressesTotals(totals),
Expand Down
168 changes: 95 additions & 73 deletions modules/address_state/src/immutable_address_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::{HashMap, HashSet},
path::Path,
};
use std::{collections::HashMap, path::Path};

use crate::state::{AddressEntry, AddressStorageConfig, UtxoDelta};
use acropolis_common::{Address, AddressTotals, TxIdentifier, UTxOIdentifier};
Expand All @@ -16,6 +13,14 @@ const ADDRESS_UTXOS_EPOCH_COUNTER: &[u8] = b"utxos_epoch_last";
const ADDRESS_TXS_EPOCH_COUNTER: &[u8] = b"txs_epoch_last";
const ADDRESS_TOTALS_EPOCH_COUNTER: &[u8] = b"totals_epoch_last";

#[derive(Default)]
struct MergedDeltas {
created_utxos: Vec<UTxOIdentifier>,
spent_utxos: Vec<UTxOIdentifier>,
txs: Vec<TxIdentifier>,
totals: AddressTotals,
}

pub struct ImmutableAddressStore {
utxos: Partition,
txs: Partition,
Expand All @@ -26,7 +31,7 @@ pub struct ImmutableAddressStore {

impl ImmutableAddressStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024);
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true);
let keyspace = Keyspace::open(cfg)?;

let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?;
Expand All @@ -43,8 +48,8 @@ impl ImmutableAddressStore {
})
}

/// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions for an entire epoch.
/// Skips any partitions that have already stored the given epoch.
/// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions
/// for an entire epoch. Skips any partitions that have already stored the given epoch.
/// All writes are batched and committed atomically, preventing on-disk corruption in case of failure.
pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> {
let persist_utxos = config.store_info
Expand All @@ -55,7 +60,7 @@ impl ImmutableAddressStore {
&& !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?;

if !(persist_utxos || persist_txs || persist_totals) {
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)",);
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)");
return Ok(());
}

Expand All @@ -67,70 +72,50 @@ impl ImmutableAddressStore {
let mut batch = self.keyspace.batch();
let mut change_count = 0;

for block_map in drained_blocks.into_iter() {
if block_map.is_empty() {
continue;
}
for (address, deltas) in Self::merge_block_deltas(drained_blocks) {
change_count += 1;
let addr_key = address.to_bytes_key()?;

for (addr, entry) in block_map {
change_count += 1;
let addr_key = addr.to_bytes_key()?;

if persist_utxos {
let mut live: HashSet<UTxOIdentifier> = self
.utxos
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();

if let Some(deltas) = &entry.utxos {
for delta in deltas {
match delta {
UtxoDelta::Created(u) => {
live.insert(*u);
}
UtxoDelta::Spent(u) => {
live.remove(u);
}
}
}
}
if persist_utxos && (!deltas.created_utxos.is_empty() || !deltas.spent_utxos.is_empty())
{
let mut live: Vec<UTxOIdentifier> = self
.utxos
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();

batch.insert(&self.utxos, &addr_key, to_vec(&live)?);
live.extend(&deltas.created_utxos);

for u in &deltas.spent_utxos {
live.retain(|x| x != u);
}

if persist_txs {
let mut live: Vec<TxIdentifier> = self
.txs
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();
batch.insert(&self.utxos, &addr_key, to_vec(&live)?);
}

if let Some(txs_deltas) = &entry.transactions {
live.extend(txs_deltas.iter().cloned());
}
if persist_txs && !deltas.txs.is_empty() {
let mut live: Vec<TxIdentifier> = self
.txs
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();

batch.insert(&self.txs, &addr_key, to_vec(&live)?);
}
live.extend(deltas.txs.iter().cloned());
batch.insert(&self.txs, &addr_key, to_vec(&live)?);
}

if persist_totals {
let mut live: AddressTotals = self
.totals
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();

if let Some(deltas) = &entry.totals {
for delta in deltas {
live.apply_delta(delta);
}
}
if persist_totals && deltas.totals.tx_count != 0 {
let mut live: AddressTotals = self
.totals
.get(&addr_key)?
.map(|bytes| decode(&bytes))
.transpose()?
.unwrap_or_default();

batch.insert(&self.totals, &addr_key, to_vec(&live)?);
}
live += deltas.totals;
batch.insert(&self.totals, &addr_key, to_vec(&live)?);
}
}

Expand Down Expand Up @@ -173,7 +158,7 @@ impl ImmutableAddressStore {
pub async fn get_utxos(&self, address: &Address) -> Result<Option<Vec<UTxOIdentifier>>> {
let key = address.to_bytes_key()?;

let mut live: HashSet<UTxOIdentifier> =
let mut live: Vec<UTxOIdentifier> =
self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();

let pending = self.pending.lock().await;
Expand All @@ -182,12 +167,8 @@ impl ImmutableAddressStore {
if let Some(deltas) = &entry.utxos {
for delta in deltas {
match delta {
UtxoDelta::Created(u) => {
live.insert(*u);
}
UtxoDelta::Spent(u) => {
live.remove(u);
}
UtxoDelta::Created(u) => live.push(*u),
UtxoDelta::Spent(u) => live.retain(|x| x != u),
}
}
}
Expand All @@ -197,8 +178,7 @@ impl ImmutableAddressStore {
if live.is_empty() {
Ok(None)
} else {
let vec: Vec<_> = live.into_iter().collect();
Ok(Some(vec))
Ok(Some(live))
}
}

Expand Down Expand Up @@ -311,4 +291,46 @@ impl ImmutableAddressStore {

Ok(exists)
}

fn merge_block_deltas(
drained_blocks: Vec<HashMap<Address, AddressEntry>>,
) -> HashMap<Address, MergedDeltas> {
let mut merged = HashMap::new();

for block_map in drained_blocks {
for (addr, entry) in block_map {
let target = merged.entry(addr.clone()).or_insert_with(MergedDeltas::default);
Copy link
Collaborator

@lowhung lowhung Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we insert defaults as our fallback here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to populate an empty entry for an address if the current block delta is the first time the address is detected in the epoch.


// Remove UTxOs that are spent in the same epoch
if let Some(deltas) = &entry.utxos {
for delta in deltas {
match delta {
UtxoDelta::Created(u) => target.created_utxos.push(*u),
UtxoDelta::Spent(u) => {
if target.created_utxos.contains(u) {
target.created_utxos.retain(|x| x != u);
} else {
target.spent_utxos.push(*u);
}
}
}
}
}

// Merge Tx vectors
if let Some(txs) = &entry.transactions {
target.txs.extend(txs.iter().cloned());
}

// Sum totals
if let Some(totals) = &entry.totals {
for delta in totals {
target.totals.apply_delta(delta);
}
}
}
}

merged
}
}
20 changes: 10 additions & 10 deletions modules/address_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
};

use acropolis_common::{
Address, AddressDelta, AddressTotals, BlockInfo, NativeAssets, ShelleyAddress, TxIdentifier,
UTxOIdentifier, ValueDelta,
Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier,
ValueDelta,
};
use anyhow::Result;

Expand Down Expand Up @@ -201,15 +201,15 @@ impl State {
Ok(())
}

pub async fn get_addresses_assets(
pub async fn get_addresses_totals(
&self,
_addresses: &[ShelleyAddress],
) -> Result<NativeAssets> {
Ok(NativeAssets::default())
}

pub async fn get_addresses_totals(&self, _addresses: &[ShelleyAddress]) -> Result<ValueDelta> {
Ok(ValueDelta::default())
addresses: &[ShelleyAddress],
) -> Result<AddressTotals> {
let mut totals = AddressTotals::default();
for addr in addresses {
totals += self.get_address_totals(&Address::Shelley(addr.clone())).await?;
}
Ok(totals)
}

pub async fn get_addresses_utxos(
Expand Down
Loading