diff --git a/common/src/queries/addresses.rs b/common/src/queries/addresses.rs index 4c53ca03..47a93733 100644 --- a/common/src/queries/addresses.rs +++ b/common/src/queries/addresses.rs @@ -1,7 +1,5 @@ use crate::queries::errors::QueryError; -use crate::{ - Address, AddressTotals, NativeAssets, ShelleyAddress, TxIdentifier, UTxOIdentifier, ValueDelta, -}; +use crate::{Address, AddressTotals, ShelleyAddress, TxIdentifier, UTxOIdentifier}; pub const DEFAULT_ADDRESS_QUERY_TOPIC: (&str, &str) = ("address-state-query-topic", "cardano.query.address"); @@ -13,7 +11,6 @@ pub enum AddressStateQuery { GetAddressTransactions { address: Address }, // Accounts related queries - GetAddressesAssets { addresses: Vec }, GetAddressesTotals { addresses: Vec }, GetAddressesUTxOs { addresses: Vec }, } @@ -25,8 +22,7 @@ pub enum AddressStateQueryResponse { AddressTransactions(Vec), // Accounts related queries - AddressesAssets(NativeAssets), - AddressesTotals(ValueDelta), + AddressesTotals(AddressTotals), AddressesUTxOs(Vec), Error(QueryError), } diff --git a/common/src/types.rs b/common/src/types.rs index 2a176cde..ea76d8b7 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -388,6 +388,19 @@ pub struct ValueMap { pub assets: NativeAssetsMap, } +impl AddAssign for ValueMap { + fn add_assign(&mut self, other: Self) { + self.lovelace += other.lovelace; + + for (policy, assets) in other.assets { + let entry = self.assets.entry(policy).or_default(); + for (asset_name, amount) in assets { + *entry.entry(asset_name).or_default() += amount; + } + } + } +} + #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct ValueDelta { pub lovelace: i64, @@ -2063,6 +2076,14 @@ pub struct AddressTotals { pub tx_count: u64, } +impl AddAssign for AddressTotals { + fn add_assign(&mut self, other: Self) { + self.sent += other.sent; + self.received += other.received; + self.tx_count += other.tx_count; + } +} + impl AddressTotals { pub fn apply_delta(&mut self, delta: &ValueDelta) { if delta.lovelace > 0 { diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index adde9176..5426ac7d 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -251,14 +251,6 @@ impl AddressState { )), } } - AddressStateQuery::GetAddressesAssets { addresses } => { - match state.get_addresses_assets(addresses).await { - Ok(assets) => AddressStateQueryResponse::AddressesAssets(assets), - Err(e) => AddressStateQueryResponse::Error(QueryError::internal_error( - e.to_string(), - )), - } - } AddressStateQuery::GetAddressesTotals { addresses } => { match state.get_addresses_totals(addresses).await { Ok(totals) => AddressStateQueryResponse::AddressesTotals(totals), diff --git a/modules/address_state/src/immutable_address_store.rs b/modules/address_state/src/immutable_address_store.rs index 86135e2e..b06084f4 100644 --- a/modules/address_state/src/immutable_address_store.rs +++ b/modules/address_state/src/immutable_address_store.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - path::Path, -}; +use std::{collections::HashMap, path::Path}; use crate::state::{AddressEntry, AddressStorageConfig, UtxoDelta}; use acropolis_common::{Address, AddressTotals, TxIdentifier, UTxOIdentifier}; @@ -16,6 +13,14 @@ const ADDRESS_UTXOS_EPOCH_COUNTER: &[u8] = b"utxos_epoch_last"; const ADDRESS_TXS_EPOCH_COUNTER: &[u8] = b"txs_epoch_last"; const ADDRESS_TOTALS_EPOCH_COUNTER: &[u8] = b"totals_epoch_last"; +#[derive(Default)] +struct MergedDeltas { + created_utxos: Vec, + spent_utxos: Vec, + txs: Vec, + totals: AddressTotals, +} + pub struct ImmutableAddressStore { utxos: Partition, txs: Partition, @@ -26,7 +31,7 @@ pub struct ImmutableAddressStore { impl ImmutableAddressStore { pub fn new(path: impl AsRef) -> Result { - let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024); + let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true); let keyspace = Keyspace::open(cfg)?; let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?; @@ -43,8 +48,8 @@ impl ImmutableAddressStore { }) } - /// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions for an entire epoch. - /// Skips any partitions that have already stored the given epoch. + /// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions + /// for an entire epoch. Skips any partitions that have already stored the given epoch. /// All writes are batched and committed atomically, preventing on-disk corruption in case of failure. pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> { let persist_utxos = config.store_info @@ -55,7 +60,7 @@ impl ImmutableAddressStore { && !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?; if !(persist_utxos || persist_txs || persist_totals) { - debug!("no persistence needed for epoch {epoch} (already persisted or disabled)",); + debug!("no persistence needed for epoch {epoch} (already persisted or disabled)"); return Ok(()); } @@ -67,70 +72,50 @@ impl ImmutableAddressStore { let mut batch = self.keyspace.batch(); let mut change_count = 0; - for block_map in drained_blocks.into_iter() { - if block_map.is_empty() { - continue; - } + for (address, deltas) in Self::merge_block_deltas(drained_blocks) { + change_count += 1; + let addr_key = address.to_bytes_key()?; - for (addr, entry) in block_map { - change_count += 1; - let addr_key = addr.to_bytes_key()?; - - if persist_utxos { - let mut live: HashSet = self - .utxos - .get(&addr_key)? - .map(|bytes| decode(&bytes)) - .transpose()? - .unwrap_or_default(); - - if let Some(deltas) = &entry.utxos { - for delta in deltas { - match delta { - UtxoDelta::Created(u) => { - live.insert(*u); - } - UtxoDelta::Spent(u) => { - live.remove(u); - } - } - } - } + if persist_utxos && (!deltas.created_utxos.is_empty() || !deltas.spent_utxos.is_empty()) + { + let mut live: Vec = self + .utxos + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); - batch.insert(&self.utxos, &addr_key, to_vec(&live)?); + live.extend(&deltas.created_utxos); + + for u in &deltas.spent_utxos { + live.retain(|x| x != u); } - if persist_txs { - let mut live: Vec = self - .txs - .get(&addr_key)? - .map(|bytes| decode(&bytes)) - .transpose()? - .unwrap_or_default(); + batch.insert(&self.utxos, &addr_key, to_vec(&live)?); + } - if let Some(txs_deltas) = &entry.transactions { - live.extend(txs_deltas.iter().cloned()); - } + if persist_txs && !deltas.txs.is_empty() { + let mut live: Vec = self + .txs + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); - batch.insert(&self.txs, &addr_key, to_vec(&live)?); - } + live.extend(deltas.txs.iter().cloned()); + batch.insert(&self.txs, &addr_key, to_vec(&live)?); + } - if persist_totals { - let mut live: AddressTotals = self - .totals - .get(&addr_key)? - .map(|bytes| decode(&bytes)) - .transpose()? - .unwrap_or_default(); - - if let Some(deltas) = &entry.totals { - for delta in deltas { - live.apply_delta(delta); - } - } + if persist_totals && deltas.totals.tx_count != 0 { + let mut live: AddressTotals = self + .totals + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); - batch.insert(&self.totals, &addr_key, to_vec(&live)?); - } + live += deltas.totals; + batch.insert(&self.totals, &addr_key, to_vec(&live)?); } } @@ -173,7 +158,7 @@ impl ImmutableAddressStore { pub async fn get_utxos(&self, address: &Address) -> Result>> { let key = address.to_bytes_key()?; - let mut live: HashSet = + let mut live: Vec = self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); let pending = self.pending.lock().await; @@ -182,12 +167,8 @@ impl ImmutableAddressStore { if let Some(deltas) = &entry.utxos { for delta in deltas { match delta { - UtxoDelta::Created(u) => { - live.insert(*u); - } - UtxoDelta::Spent(u) => { - live.remove(u); - } + UtxoDelta::Created(u) => live.push(*u), + UtxoDelta::Spent(u) => live.retain(|x| x != u), } } } @@ -197,8 +178,7 @@ impl ImmutableAddressStore { if live.is_empty() { Ok(None) } else { - let vec: Vec<_> = live.into_iter().collect(); - Ok(Some(vec)) + Ok(Some(live)) } } @@ -311,4 +291,46 @@ impl ImmutableAddressStore { Ok(exists) } + + fn merge_block_deltas( + drained_blocks: Vec>, + ) -> HashMap { + let mut merged = HashMap::new(); + + for block_map in drained_blocks { + for (addr, entry) in block_map { + let target = merged.entry(addr.clone()).or_insert_with(MergedDeltas::default); + + // Remove UTxOs that are spent in the same epoch + if let Some(deltas) = &entry.utxos { + for delta in deltas { + match delta { + UtxoDelta::Created(u) => target.created_utxos.push(*u), + UtxoDelta::Spent(u) => { + if target.created_utxos.contains(u) { + target.created_utxos.retain(|x| x != u); + } else { + target.spent_utxos.push(*u); + } + } + } + } + } + + // Merge Tx vectors + if let Some(txs) = &entry.transactions { + target.txs.extend(txs.iter().cloned()); + } + + // Sum totals + if let Some(totals) = &entry.totals { + for delta in totals { + target.totals.apply_delta(delta); + } + } + } + } + + merged + } } diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 1069b964..7f92d72c 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -5,8 +5,8 @@ use std::{ }; use acropolis_common::{ - Address, AddressDelta, AddressTotals, BlockInfo, NativeAssets, ShelleyAddress, TxIdentifier, - UTxOIdentifier, ValueDelta, + Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier, + ValueDelta, }; use anyhow::Result; @@ -201,15 +201,15 @@ impl State { Ok(()) } - pub async fn get_addresses_assets( + pub async fn get_addresses_totals( &self, - _addresses: &[ShelleyAddress], - ) -> Result { - Ok(NativeAssets::default()) - } - - pub async fn get_addresses_totals(&self, _addresses: &[ShelleyAddress]) -> Result { - Ok(ValueDelta::default()) + addresses: &[ShelleyAddress], + ) -> Result { + let mut totals = AddressTotals::default(); + for addr in addresses { + totals += self.get_address_totals(&Address::Shelley(addr.clone())).await?; + } + Ok(totals) } pub async fn get_addresses_utxos( diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 6f392973..e2b7415f 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crate::handlers_config::HandlersConfig; use crate::types::{ - AccountAddressREST, AccountRewardREST, AccountUTxOREST, AccountWithdrawalREST, + AccountAddressREST, AccountRewardREST, AccountUTxOREST, AccountWithdrawalREST, AmountList, DelegationUpdateREST, RegistrationUpdateREST, }; use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse}; @@ -531,11 +531,92 @@ pub async fn handle_account_addresses_blockfrost( /// Handle `/accounts/{stake_address}/addresses/assets` Blockfrost-compatible endpoint pub async fn handle_account_assets_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Account assets not implemented")) + let account = parse_stake_address(¶ms)?; + + // Get addresses from historical accounts state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountAssociatedAddresses { account }, + ))); + let addresses = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountAssociatedAddresses(addresses), + )) => Ok(Some(addresses)), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(QueryError::NotFound { .. }), + )) => Ok(None), + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving account addresses", + )), + }, + ) + .await?; + + let Some(addresses) = addresses else { + return Ok(RESTResponse::with_text(404, "Account not found")); + }; + + // Get utxos from address state + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressesUTxOs { addresses }, + ))); + let utxo_identifiers = query_state( + &context, + &handlers_config.addresses_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressesUTxOs(utxos), + )) => Ok(utxos), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving account UTxOs", + )), + }, + ) + .await?; + + // Get utxo balance sum from utxo state + let msg = Arc::new(Message::StateQuery(StateQuery::UTxOs( + UTxOStateQuery::GetUTxOsSum { utxo_identifiers }, + ))); + let utxos_balance = query_state( + &context, + &handlers_config.utxos_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::UTxOsSum(sum), + )) => Ok(sum), + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving UTxO sum", + )), + }, + ) + .await?; + + let mut rest_response = AmountList::from(utxos_balance).0; + if !rest_response.is_empty() { + rest_response.drain(..1); + } + + let json = serde_json::to_string_pretty(&rest_response)?; + Ok(RESTResponse::with_json(200, &json)) } /// Handle `/accounts/{stake_address}/addresses/total` Blockfrost-compatible endpoint diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index 0465b65c..0a6ae340 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -345,7 +345,7 @@ impl BlockfrostREST { handle_account_assets_blockfrost, ); - // Handler for /accounts/{stake_address}/addreesses/total + // Handler for /accounts/{stake_address}/addresses/total register_handler( context.clone(), DEFAULT_HANDLE_ACCOUNT_TOTALS_TOPIC, diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 2c9fc7a2..4c699fcf 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -6,7 +6,7 @@ use acropolis_common::{ rest_helper::ToCheckedF64, serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, - PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, Vote, VrfKeyHash, + PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, ValueMap, Vote, VrfKeyHash, }; use anyhow::Result; use num_traits::ToPrimitive; @@ -854,6 +854,32 @@ impl From for AmountList { } } +impl From for AmountList { + fn from(value: ValueMap) -> Self { + let mut out = Vec::new(); + + out.push(AmountEntry { + unit: "lovelace".to_string(), + quantity: value.lovelace.to_string(), + }); + + for (policy_id, assets) in value.assets { + for (asset_name, amount) in assets { + out.push(AmountEntry { + unit: format!( + "{}{}", + hex::encode(policy_id), + hex::encode(asset_name.as_slice()) + ), + quantity: amount.to_string(), + }); + } + } + + Self(out) + } +} + #[derive(Serialize)] pub struct RegistrationUpdateREST { pub tx_hash: String, @@ -911,3 +937,11 @@ pub struct AccountUTxOREST { pub inline_datum: Option, pub reference_script_hash: Option, } + +#[derive(serde::Serialize)] +pub struct _AccountTotalsREST { + pub stake_address: String, + pub received_sum: AmountList, + pub sent_sum: AmountList, + pub tx_count: u64, +}