From 4dce1e64a62c6478825e573626bf33bae5d916e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 1 Sep 2022 15:31:01 +0800 Subject: [PATCH] Introduce `RpcImportParams` to make rpc sync more efficient `RpcImportParams` keeps track of the scriptPubKey derivation index to start from for the next call to `importdescripts`/`importmulti`, thus avoiding re-importing into Bitcoin Core. --- src/blockchain/esplora/mod.rs | 2 +- src/blockchain/rpc.rs | 111 ++++++++++++++++++++++++++---- src/testutils/blockchain_tests.rs | 6 ++ 3 files changed, 103 insertions(+), 16 deletions(-) diff --git a/src/blockchain/esplora/mod.rs b/src/blockchain/esplora/mod.rs index 30d29d6477..eecd8dfe81 100644 --- a/src/blockchain/esplora/mod.rs +++ b/src/blockchain/esplora/mod.rs @@ -97,7 +97,7 @@ impl fmt::Display for EsploraError { } /// Configuration for an [`EsploraBlockchain`] -#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)] pub struct EsploraBlockchainConfig { /// Base URL of the esplora service /// diff --git a/src/blockchain/rpc.rs b/src/blockchain/rpc.rs index 7fad78876e..4c3538e636 100644 --- a/src/blockchain/rpc.rs +++ b/src/blockchain/rpc.rs @@ -27,6 +27,7 @@ //! network: bdk::bitcoin::Network::Testnet, //! wallet_name: "wallet_name".to_string(), //! sync_params: None, +//! import_params: None, //! }; //! let blockchain = RpcBlockchain::from_config(&config); //! ``` @@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{ use bitcoincore_rpc::jsonrpc::serde_json::{json, Value}; use bitcoincore_rpc::Auth as RpcAuth; use bitcoincore_rpc::{Client, RpcApi}; -use log::{debug, info}; +use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; use std::ops::Deref; use std::path::PathBuf; @@ -66,6 +68,8 @@ pub struct RpcBlockchain { capabilities: HashSet, /// Sync parameters. sync_params: RpcSyncParams, + /// Import parameters. + import_params: RefCell, } impl Deref for RpcBlockchain { @@ -89,6 +93,10 @@ pub struct RpcConfig { pub wallet_name: String, /// Sync parameters pub sync_params: Option, + /// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in + /// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be + /// obtained via [`RpcBlockchain::import_parameters`]. + pub import_params: Option, } /// Sync parameters for Bitcoin Core RPC. @@ -120,6 +128,18 @@ impl Default for RpcSyncParams { } } +/// `scriptPubKey` import parameters for Bitcoin Core RPC. +/// +/// This defines which derivation index(es) to start importing from so that BDK can avoid +/// re-importing `scriptPubKey`s into the Core wallet. +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct RpcImportParams { + /// External derivation index to start import from. + pub external_start_index: usize, + /// Internal derivation index to start import from. + pub internal_start_index: usize, +} + /// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize] /// To be removed once upstream equivalent is implementing Serialize (json serialization format /// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181) @@ -153,6 +173,18 @@ impl From for RpcAuth { } } +impl RpcBlockchain { + /// Returns the current import parameters. + pub fn get_import_parameters(&self) -> RpcImportParams { + *self.import_params.borrow() + } + + /// Replaces the import parameters and returns the old value. + pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams { + self.import_params.replace(params) + } +} + impl Blockchain for RpcBlockchain { fn get_capabilities(&self) -> HashSet { self.capabilities.clone() @@ -198,7 +230,11 @@ impl WalletSync for RpcBlockchain { D: BatchDatabase, { let batch = DbState::new(db, &self.sync_params, &*prog)? - .sync_with_core(&self.client, self.is_descriptors)? + .sync_with_core( + &self.client, + &mut *self.import_params.borrow_mut(), + self.is_descriptors, + )? .as_db_batch()?; db.commit_batch(batch) @@ -274,6 +310,7 @@ impl ConfigurableBlockchain for RpcBlockchain { capabilities, is_descriptors, sync_params: config.sync_params.clone().unwrap_or_default(), + import_params: RefCell::new(config.import_params.unwrap_or_default()), }) } } @@ -314,7 +351,11 @@ struct DbState<'a, D> { impl<'a, D: BatchDatabase> DbState<'a, D> { /// Obtain [DbState] from [crate::database::Database]. - fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result { + fn new( + db: &'a D, + sync_params: &'a RpcSyncParams, + prog: &'a dyn Progress, + ) -> Result { let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?; let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?; @@ -325,10 +366,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { // If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently // cached. - if has_derivable && last_count < params.start_script_count { + if has_derivable && last_count < sync_params.start_script_count { let inner_err = MissingCachedScripts { last_count, - missing_count: params.start_script_count - last_count, + missing_count: sync_params.start_script_count - last_count, }; debug!("requesting more spks with: {:?}", inner_err); return Err(Error::MissingCachedScripts(inner_err)); @@ -359,7 +400,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { Ok(Self { db, - params, + params: sync_params, prog, ext_spks, int_spks, @@ -374,7 +415,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { /// Sync states of [BatchDatabase] and Core wallet. /// First we import all `scriptPubKey`s from database into core wallet - fn sync_with_core(&mut self, client: &Client, use_desc: bool) -> Result<&mut Self, Error> { + fn sync_with_core( + &mut self, + client: &Client, + import_params: &mut RpcImportParams, + use_desc: bool, + ) -> Result<&mut Self, Error> { // this tells Core wallet where to sync from for imported scripts let start_epoch = if self.params.force_start_time { self.params.start_time @@ -384,23 +430,55 @@ impl<'a, D: BatchDatabase> DbState<'a, D> { .map_or(self.params.start_time, |st| st.block_time.timestamp) }; - // sync scriptPubKeys from Database to Core wallet - let scripts_iter = self.ext_spks.iter().chain(&self.int_spks); - if use_desc { - import_descriptors(client, start_epoch, scripts_iter)?; - } else { - import_multi(client, start_epoch, scripts_iter)?; + // sync scriptPubKeys from Database into Core wallet, starting from derivation indexes + // defined in `import_params` + let (scripts_count, scripts_iter) = { + let ext_spks = self + .ext_spks + .iter() + .skip(import_params.external_start_index); + let int_spks = self + .int_spks + .iter() + .skip(import_params.internal_start_index); + + let scripts_count = ext_spks.len() + int_spks.len(); + let scripts_iter = ext_spks.chain(int_spks); + println!("scripts count: {}", scripts_count); + + (scripts_count, scripts_iter) + }; + + if scripts_count > 0 { + if use_desc { + import_descriptors(client, start_epoch, scripts_iter)?; + } else { + import_multi(client, start_epoch, scripts_iter)?; + } } - // wait for Core wallet to rescan (TODO: maybe make this async) await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?; + // update import_params + import_params.external_start_index = self.ext_spks.len(); + import_params.internal_start_index = self.int_spks.len(); + // obtain iterator of pagenated `listtransactions` RPC calls const LIST_TX_PAGE_SIZE: usize = 100; // item count per page let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| { // filter out conflicting transactions - only accept transactions that are already // confirmed, or exists in mempool - item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok() + let confirmed = item.info.confirmations > 0; + let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok(); + + let keep = confirmed || in_mempool; + if !keep { + warn!( + "transaction {} is skipped: confirmed={}, in_mempool={}", + item.info.txid, confirmed, in_mempool + ); + } + keep }); // iterate through chronological results of `listtransactions` @@ -865,6 +943,8 @@ impl BlockchainFactory for RpcBlockchainFactory { checksum ), sync_params: self.sync_params.clone(), + // TODO @evanlinjin: How can be set this individually for each build? + import_params: Default::default(), }) } } @@ -892,6 +972,7 @@ mod test { network: Network::Regtest, wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ), sync_params: None, + import_params: None, }; RpcBlockchain::from_config(&config).unwrap() } diff --git a/src/testutils/blockchain_tests.rs b/src/testutils/blockchain_tests.rs index a3d7c2b171..2c18ac7f3a 100644 --- a/src/testutils/blockchain_tests.rs +++ b/src/testutils/blockchain_tests.rs @@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests { blockchain.broadcast(&tx1).expect("broadcasting first"); blockchain.broadcast(&tx2).expect("broadcasting replacement"); + + // TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting + // unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor) + #[cfg(feature = "rpc")] + blockchain.replace_import_parameters(Default::default()); + receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver"); assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once"); }