Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zcoin): impl balance event streaming #2076

Merged
merged 13 commits into from
Mar 29, 2024
82 changes: 56 additions & 26 deletions mm2src/coins/z_coin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
pub mod storage;
mod z_balance_streaming;
mod z_coin_errors;
#[cfg(all(test, feature = "zhtlc-native-tests"))]
mod z_coin_native_tests;
mod z_htlc;
mod z_rpc;
mod z_tx_history;

use crate::coin_errors::{MyAddressError, ValidatePaymentResult};
use crate::my_tx_history_v2::{MyTxHistoryErrorV2, MyTxHistoryRequestV2, MyTxHistoryResponseV2};
use crate::rpc_command::init_withdraw::{InitWithdrawCoin, WithdrawInProgressStatus, WithdrawTaskHandleShared};
Expand All @@ -14,6 +23,7 @@ use crate::utxo::{sat_from_big_decimal, utxo_common, ActualTxFee, AdditionalTxDa
UtxoCommonOps, UtxoRpcMode, UtxoTxBroadcastOps, UtxoTxGenerationOps, VerboseTransactionFrom};
use crate::utxo::{UnsupportedAddr, UtxoFeeDetails};
use crate::z_coin::storage::{BlockDbImpl, WalletDbShared};
use crate::z_coin::z_balance_streaming::ZBalanceEventHandler;
use crate::z_coin::z_tx_history::{fetch_tx_history_from_db, ZCoinTxHistoryItem};
use crate::{BalanceError, BalanceFut, CheckIfMyPaymentSentArgs, CoinBalance, CoinFutSpawner, ConfirmPaymentInput,
DexFee, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MakerSwapTakerCoin, MarketCoinOps, MmCoin,
Expand Down Expand Up @@ -48,6 +58,7 @@ use keys::hash::H256;
use keys::{KeyPair, Message, Public};
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_number::{BigDecimal, MmNumber};
#[cfg(test)] use mocktopus::macros::*;
use primitives::bytes::Bytes;
Expand All @@ -60,8 +71,10 @@ use std::convert::TryInto;
use std::iter;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(target_arch = "wasm32")]
use z_coin_errors::ZCoinBalanceError;
pub use z_coin_errors::*;
use z_htlc::{z_p2sh_spend, z_send_dex_fee, z_send_htlc};
use z_rpc::init_light_client;
pub use z_rpc::{FirstSyncBlock, SyncStatus};
use z_rpc::{SaplingSyncConnector, SaplingSyncGuard};
use zcash_client_backend::encoding::{decode_payment_address, encode_extended_spending_key, encode_payment_address};
use zcash_client_backend::wallet::{AccountId, SpendableNote};
Expand All @@ -78,13 +91,6 @@ use zcash_primitives::{constants::mainnet as z_mainnet_constants, sapling::Payme
zip32::ExtendedFullViewingKey, zip32::ExtendedSpendingKey};
use zcash_proofs::prover::LocalTxProver;

mod z_htlc;
use z_htlc::{z_p2sh_spend, z_send_dex_fee, z_send_htlc};

mod z_rpc;
use z_rpc::init_light_client;
pub use z_rpc::{FirstSyncBlock, SyncStatus};

cfg_native!(
use common::{async_blocking, sha256_digest};
use zcash_client_sqlite::error::SqliteClientError as ZcashClientError;
Expand All @@ -94,22 +100,14 @@ cfg_native!(
);

cfg_wasm32!(
use crate::z_coin::z_params::ZcashParamsWasmImpl;
use crate::z_coin::storage::ZcashParamsWasmImpl;
use common::executor::AbortOnDropHandle;
use futures::channel::oneshot;
use rand::rngs::OsRng;
use zcash_primitives::transaction::builder::TransactionMetadata;
use z_coin_errors::ZCoinBalanceError;
);

#[allow(unused)] mod z_coin_errors;
pub use z_coin_errors::*;

pub mod storage;
#[cfg(all(test, feature = "zhtlc-native-tests"))]
mod z_coin_native_tests;
#[cfg(target_arch = "wasm32")] mod z_params;
mod z_tx_history;

/// `ZP2SHSpendError` compatible `TransactionErr` handling macro.
macro_rules! try_ztx_s {
($e: expr) => {
Expand Down Expand Up @@ -209,6 +207,7 @@ pub struct ZCoinFields {
light_wallet_db: WalletDbShared,
consensus_params: ZcoinConsensusParams,
sync_state_connector: AsyncMutex<SaplingSyncConnector>,
z_balance_event_handler: Option<ZBalanceEventHandler>,
}

impl Transaction for ZTransaction {
Expand Down Expand Up @@ -654,6 +653,17 @@ impl ZCoin {
paging_options: request.paging_options,
})
}

async fn spawn_balance_stream_if_enabled(&self, ctx: &MmArc) -> Result<(), String> {
let coin = self.clone();
if let Some(stream_config) = &ctx.event_stream_configuration {
if let EventInitStatus::Failed(err) = EventBehaviour::spawn_if_active(coin, stream_config).await {
return ERR!("Failed spawning zcoin balance event with error: {}", err);
}
}

Ok(())
}
}

impl AsRef<UtxoCoinFields> for ZCoin {
Expand Down Expand Up @@ -875,10 +885,24 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> {
);

let blocks_db = self.init_blocks_db().await?;
let (z_balance_event_sender, z_balance_event_handler) = if self.ctx.event_stream_configuration.is_some() {
let (sender, receiver) = futures::channel::mpsc::unbounded();
(Some(sender), Some(Arc::new(AsyncMutex::new(receiver))))
} else {
(None, None)
};

let (sync_state_connector, light_wallet_db) = match &self.z_coin_params.mode {
#[cfg(not(target_arch = "wasm32"))]
ZcoinRpcMode::Native => {
init_native_client(&self, self.native_client()?, blocks_db, &z_spending_key).await?
init_native_client(
&self,
self.native_client()?,
blocks_db,
&z_spending_key,
z_balance_event_sender,
)
.await?
},
ZcoinRpcMode::Light {
light_wallet_d_servers,
Expand All @@ -893,11 +917,13 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> {
sync_params,
skip_sync_params.unwrap_or_default(),
&z_spending_key,
z_balance_event_sender,
)
.await?
},
};
let z_fields = ZCoinFields {

let z_fields = Arc::new(ZCoinFields {
dex_fee_addr,
my_z_addr,
my_z_addr_encoded,
Expand All @@ -907,12 +933,16 @@ impl<'a> UtxoCoinBuilder for ZCoinBuilder<'a> {
light_wallet_db,
consensus_params: self.protocol_info.consensus_params,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clone is redundant? no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks.

sync_state_connector,
};
z_balance_event_handler,
});

Ok(ZCoin {
utxo_arc,
z_fields: Arc::new(z_fields),
})
let zcoin = ZCoin { utxo_arc, z_fields };
zcoin
.spawn_balance_stream_if_enabled(self.ctx)
.await
.map_to_mm(ZCoinBuildError::FailedSpawningBalanceEvents)?;

Ok(zcoin)
}
}

Expand Down
12 changes: 9 additions & 3 deletions mm2src/coins/z_coin/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ pub mod blockdb;
pub use blockdb::*;

pub mod walletdb;
#[cfg(target_arch = "wasm32")] mod z_params;
#[cfg(target_arch = "wasm32")]
pub(crate) use z_params::ZcashParamsWasmImpl;

pub use walletdb::*;

use crate::z_coin::z_balance_streaming::ZBalanceEventSender;
use mm2_err_handle::mm_error::MmResult;
#[cfg(target_arch = "wasm32")]
use walletdb::wasm::storage::DataConnStmtCacheWasm;
Expand Down Expand Up @@ -55,7 +60,7 @@ pub struct CompactBlockRow {
#[derive(Clone)]
pub enum BlockProcessingMode {
Validate,
Scan(DataConnStmtCacheWrapper),
Scan(DataConnStmtCacheWrapper, Option<ZBalanceEventSender>),
}

/// Checks that the scanned blocks in the data database, when combined with the recent
Expand Down Expand Up @@ -114,7 +119,7 @@ pub async fn scan_cached_block(
params: &ZcoinConsensusParams,
block: &CompactBlock,
last_height: &mut BlockHeight,
) -> Result<(), ValidateBlocksError> {
) -> Result<usize, ValidateBlocksError> {
let mut data_guard = data.inner().clone();
// Fetch the ExtendedFullViewingKeys we are tracking
let extfvks = data_guard.get_extended_full_viewing_keys().await?;
Expand Down Expand Up @@ -201,5 +206,6 @@ pub async fn scan_cached_block(

*last_height = current_height;

Ok(())
// If there are any transactions in the block, return the transaction count
Ok(txs.len())
}
16 changes: 12 additions & 4 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_idb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::z_coin::storage::{scan_cached_block, validate_chain, BlockDbImpl, Blo
use crate::z_coin::z_coin_errors::ZcoinStorageError;

use async_trait::async_trait;
use futures_util::SinkExt;
use mm2_core::mm_ctx::MmArc;
use mm2_db::indexed_db::{BeBigUint, ConstructibleDb, DbIdentifier, DbInstance, DbLocked, DbUpgrader, IndexedDb,
IndexedDbBuilder, InitDbResult, MultiIndex, OnUpgradeResult, TableSignature};
Expand Down Expand Up @@ -123,7 +124,7 @@ impl BlockDbImpl {
}

/// Asynchronously rewinds the storage to a specified block height, effectively
/// removing data beyond the specified height from the storage.
/// removing data beyond the specified height from the storage.
pub async fn rewind_to_height(&self, height: BlockHeight) -> ZcoinStorageRes<usize> {
let locked_db = self.lock_db().await?;
let db_transaction = locked_db.get_inner().transaction().await?;
Expand Down Expand Up @@ -224,7 +225,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => data.inner().block_height_extrema().await.map(|opt| {
BlockProcessingMode::Scan(data, _) => data.inner().block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1)
})?,
Expand All @@ -250,8 +251,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there is/are transactions present in the current scanned block(s),
// we trigger a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
14 changes: 11 additions & 3 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::z_coin::ZcoinConsensusParams;
use common::async_blocking;
use db_common::sqlite::rusqlite::{params, Connection};
use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite};
use futures_util::SinkExt;
use itertools::Itertools;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -193,7 +194,7 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => validate_from
.map(|(height, _)| height)
.unwrap_or(BlockHeight::from_u32(params.sapling_activation_height) - 1),
BlockProcessingMode::Scan(data) => {
BlockProcessingMode::Scan(data, _) => {
let data = data.inner();
data.block_height_extrema().await.map(|opt| {
opt.map(|(_, max)| max)
Expand Down Expand Up @@ -224,8 +225,15 @@ impl BlockDbImpl {
BlockProcessingMode::Validate => {
validate_chain(block, &mut prev_height, &mut prev_hash).await?;
},
BlockProcessingMode::Scan(data) => {
scan_cached_block(data, &params, &block, &mut from_height).await?;
BlockProcessingMode::Scan(data, z_balance_change_sender) => {
let tx_size = scan_cached_block(data, &params, &block, &mut from_height).await?;
// If there are transactions present in the current scanned block,
// we send a `Triggered` event to update the balance change.
if tx_size > 0 {
if let Some(mut sender) = z_balance_change_sender.clone() {
sender.send(()).await.expect("No receiver is available/dropped");
};
};
},
}
}
Expand Down
110 changes: 110 additions & 0 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::common::Future01CompatExt;
use crate::hd_wallet::AsyncMutex;
use crate::z_coin::ZCoin;
use crate::{MarketCoinOps, MmCoin};

use async_trait::async_trait;
use common::executor::{AbortSettings, SpawnAbortable};
use common::log::{error, info};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::channel::oneshot::{Receiver, Sender};
use futures_util::StreamExt;
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_event_stream::{Event, EventStreamConfiguration};
use std::sync::Arc;

pub type ZBalanceEventSender = UnboundedSender<()>;
pub type ZBalanceEventHandler = Arc<AsyncMutex<UnboundedReceiver<()>>>;

#[async_trait]
impl EventBehaviour for ZCoin {
const EVENT_NAME: &'static str = "COIN_BALANCE";
const ERROR_EVENT_NAME: &'static str = "COIN_BALANCE_ERROR";
Comment on lines +23 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for the future development: We may need to maintain all the available event names from a single source without needing to type them manually for each implementation.


async fn handle(self, _interval: f64, tx: Sender<EventInitStatus>) {
const RECEIVER_DROPPED_MSG: &str = "Receiver is dropped, which should never happen.";

macro_rules! send_status_on_err {
($match: expr, $sender: tt, $msg: literal) => {
match $match {
Some(t) => t,
None => {
$sender
.send(EventInitStatus::Failed($msg.to_owned()))
.expect(RECEIVER_DROPPED_MSG);
panic!("{}", $msg);
},
}
};
}

let ctx = send_status_on_err!(
MmArc::from_weak(&self.as_ref().ctx),
tx,
"MM context must have been initialized already."
);
let z_balance_change_handler = send_status_on_err!(
self.z_fields.z_balance_event_handler.as_ref(),
tx,
"Z balance change receiver can not be empty."
);

tx.send(EventInitStatus::Success).expect(RECEIVER_DROPPED_MSG);

// Locks the balance change handler, iterates through received events, and updates balance changes accordingly.
let mut bal = z_balance_change_handler.lock().await;
while (bal.next().await).is_some() {
match self.my_balance().compat().await {
Ok(balance) => {
let payload = json!({
"ticker": self.ticker(),
"address": self.my_z_address_encoded(),
"balance": { "spendable": balance.spendable, "unspendable": balance.unspendable }
});

ctx.stream_channel_controller
.broadcast(Event::new(Self::EVENT_NAME.to_string(), payload.to_string()))
.await;
},
Err(err) => {
let ticker = self.ticker();
error!("Failed getting balance for '{ticker}'. Error: {err}");
let e = serde_json::to_value(err).expect("Serialization should't fail.");
return ctx
.stream_channel_controller
.broadcast(Event::new(
format!("{}:{}", Self::ERROR_EVENT_NAME, ticker),
e.to_string(),
))
.await;
},
};
}
}

async fn spawn_if_active(self, config: &EventStreamConfiguration) -> EventInitStatus {
if let Some(event) = config.get_event(Self::EVENT_NAME) {
info!(
"{} event is activated for {} address {}. `stream_interval_seconds`({}) has no effect on this.",
Self::EVENT_NAME,
self.ticker(),
self.my_z_address_encoded(),
event.stream_interval_seconds
);

let (tx, rx): (Sender<EventInitStatus>, Receiver<EventInitStatus>) = oneshot::channel();
let fut = self.clone().handle(event.stream_interval_seconds, tx);
let settings =
AbortSettings::info_on_abort(format!("{} event is stopped for {}.", Self::EVENT_NAME, self.ticker()));
self.spawner().spawn_with_settings(fut, settings);

rx.await.unwrap_or_else(|e| {
EventInitStatus::Failed(format!("Event initialization status must be received: {}", e))
})
} else {
EventInitStatus::Inactive
}
}
}
Loading
Loading