Skip to content

Commit

Permalink
Introduce RpcImportParams to make rpc sync more efficient
Browse files Browse the repository at this point in the history
`RpcImportParams` keeps track of the scriptPubKey derivation index to
start from for the next call to `importdescripts`/`importmulti`, thus
avoiding re-importing into Bitcoin Core.
  • Loading branch information
evanlinjin committed Sep 1, 2022
1 parent 6fd25bd commit 4dce1e6
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/blockchain/esplora/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl fmt::Display for EsploraError {
}

/// Configuration for an [`EsploraBlockchain`]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
pub struct EsploraBlockchainConfig {
/// Base URL of the esplora service
///
Expand Down
111 changes: 96 additions & 15 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
//! network: bdk::bitcoin::Network::Testnet,
//! wallet_name: "wallet_name".to_string(),
//! sync_params: None,
//! import_params: None,
//! };
//! let blockchain = RpcBlockchain::from_config(&config);
//! ```
Expand All @@ -47,8 +48,9 @@ use bitcoincore_rpc::json::{
use bitcoincore_rpc::jsonrpc::serde_json::{json, Value};
use bitcoincore_rpc::Auth as RpcAuth;
use bitcoincore_rpc::{Client, RpcApi};
use log::{debug, info};
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::path::PathBuf;
Expand All @@ -66,6 +68,8 @@ pub struct RpcBlockchain {
capabilities: HashSet<Capability>,
/// Sync parameters.
sync_params: RpcSyncParams,
/// Import parameters.
import_params: RefCell<RpcImportParams>,
}

impl Deref for RpcBlockchain {
Expand All @@ -89,6 +93,10 @@ pub struct RpcConfig {
pub wallet_name: String,
/// Sync parameters
pub sync_params: Option<RpcSyncParams>,
/// Initial `scriptPubKey` import parameters. [`RpcImportParams`] will be mutated in
/// [`RpcBlockchain`] for every sync to reflect what was imported. The updated params can be
/// obtained via [`RpcBlockchain::import_parameters`].
pub import_params: Option<RpcImportParams>,
}

/// Sync parameters for Bitcoin Core RPC.
Expand Down Expand Up @@ -120,6 +128,18 @@ impl Default for RpcSyncParams {
}
}

/// `scriptPubKey` import parameters for Bitcoin Core RPC.
///
/// This defines which derivation index(es) to start importing from so that BDK can avoid
/// re-importing `scriptPubKey`s into the Core wallet.
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub struct RpcImportParams {
/// External derivation index to start import from.
pub external_start_index: usize,
/// Internal derivation index to start import from.
pub internal_start_index: usize,
}

/// This struct is equivalent to [bitcoincore_rpc::Auth] but it implements [serde::Serialize]
/// To be removed once upstream equivalent is implementing Serialize (json serialization format
/// should be the same), see [rust-bitcoincore-rpc/pull/181](https://github.com/rust-bitcoin/rust-bitcoincore-rpc/pull/181)
Expand Down Expand Up @@ -153,6 +173,18 @@ impl From<Auth> for RpcAuth {
}
}

impl RpcBlockchain {
/// Returns the current import parameters.
pub fn get_import_parameters(&self) -> RpcImportParams {
*self.import_params.borrow()
}

/// Replaces the import parameters and returns the old value.
pub fn replace_import_parameters(&self, params: RpcImportParams) -> RpcImportParams {
self.import_params.replace(params)
}
}

impl Blockchain for RpcBlockchain {
fn get_capabilities(&self) -> HashSet<Capability> {
self.capabilities.clone()
Expand Down Expand Up @@ -198,7 +230,11 @@ impl WalletSync for RpcBlockchain {
D: BatchDatabase,
{
let batch = DbState::new(db, &self.sync_params, &*prog)?
.sync_with_core(&self.client, self.is_descriptors)?
.sync_with_core(
&self.client,
&mut *self.import_params.borrow_mut(),
self.is_descriptors,
)?
.as_db_batch()?;

db.commit_batch(batch)
Expand Down Expand Up @@ -274,6 +310,7 @@ impl ConfigurableBlockchain for RpcBlockchain {
capabilities,
is_descriptors,
sync_params: config.sync_params.clone().unwrap_or_default(),
import_params: RefCell::new(config.import_params.unwrap_or_default()),
})
}
}
Expand Down Expand Up @@ -314,7 +351,11 @@ struct DbState<'a, D> {

impl<'a, D: BatchDatabase> DbState<'a, D> {
/// Obtain [DbState] from [crate::database::Database].
fn new(db: &'a D, params: &'a RpcSyncParams, prog: &'a dyn Progress) -> Result<Self, Error> {
fn new(
db: &'a D,
sync_params: &'a RpcSyncParams,
prog: &'a dyn Progress,
) -> Result<Self, Error> {
let ext_spks = db.iter_script_pubkeys(Some(KeychainKind::External))?;
let int_spks = db.iter_script_pubkeys(Some(KeychainKind::Internal))?;

Expand All @@ -325,10 +366,10 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

// If at least one descriptor is derivable, we need to ensure scriptPubKeys are sufficiently
// cached.
if has_derivable && last_count < params.start_script_count {
if has_derivable && last_count < sync_params.start_script_count {
let inner_err = MissingCachedScripts {
last_count,
missing_count: params.start_script_count - last_count,
missing_count: sync_params.start_script_count - last_count,
};
debug!("requesting more spks with: {:?}", inner_err);
return Err(Error::MissingCachedScripts(inner_err));
Expand Down Expand Up @@ -359,7 +400,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

Ok(Self {
db,
params,
params: sync_params,
prog,
ext_spks,
int_spks,
Expand All @@ -374,7 +415,12 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

/// Sync states of [BatchDatabase] and Core wallet.
/// First we import all `scriptPubKey`s from database into core wallet
fn sync_with_core(&mut self, client: &Client, use_desc: bool) -> Result<&mut Self, Error> {
fn sync_with_core(
&mut self,
client: &Client,
import_params: &mut RpcImportParams,
use_desc: bool,
) -> Result<&mut Self, Error> {
// this tells Core wallet where to sync from for imported scripts
let start_epoch = if self.params.force_start_time {
self.params.start_time
Expand All @@ -384,23 +430,55 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
.map_or(self.params.start_time, |st| st.block_time.timestamp)
};

// sync scriptPubKeys from Database to Core wallet
let scripts_iter = self.ext_spks.iter().chain(&self.int_spks);
if use_desc {
import_descriptors(client, start_epoch, scripts_iter)?;
} else {
import_multi(client, start_epoch, scripts_iter)?;
// sync scriptPubKeys from Database into Core wallet, starting from derivation indexes
// defined in `import_params`
let (scripts_count, scripts_iter) = {
let ext_spks = self
.ext_spks
.iter()
.skip(import_params.external_start_index);
let int_spks = self
.int_spks
.iter()
.skip(import_params.internal_start_index);

let scripts_count = ext_spks.len() + int_spks.len();
let scripts_iter = ext_spks.chain(int_spks);
println!("scripts count: {}", scripts_count);

(scripts_count, scripts_iter)
};

if scripts_count > 0 {
if use_desc {
import_descriptors(client, start_epoch, scripts_iter)?;
} else {
import_multi(client, start_epoch, scripts_iter)?;
}
}

// wait for Core wallet to rescan (TODO: maybe make this async)
await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;

// update import_params
import_params.external_start_index = self.ext_spks.len();
import_params.internal_start_index = self.int_spks.len();

// obtain iterator of pagenated `listtransactions` RPC calls
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
item.info.confirmations > 0 || client.get_mempool_entry(&item.info.txid).is_ok()
let confirmed = item.info.confirmations > 0;
let in_mempool = client.get_mempool_entry(&item.info.txid).is_ok();

let keep = confirmed || in_mempool;
if !keep {
warn!(
"transaction {} is skipped: confirmed={}, in_mempool={}",
item.info.txid, confirmed, in_mempool
);
}
keep
});

// iterate through chronological results of `listtransactions`
Expand Down Expand Up @@ -865,6 +943,8 @@ impl BlockchainFactory for RpcBlockchainFactory {
checksum
),
sync_params: self.sync_params.clone(),
// TODO @evanlinjin: How can be set this individually for each build?
import_params: Default::default(),
})
}
}
Expand Down Expand Up @@ -892,6 +972,7 @@ mod test {
network: Network::Regtest,
wallet_name: format!("client-wallet-test-{}", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos() ),
sync_params: None,
import_params: None,
};
RpcBlockchain::from_config(&config).unwrap()
}
Expand Down
6 changes: 6 additions & 0 deletions src/testutils/blockchain_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ macro_rules! bdk_blockchain_tests {

blockchain.broadcast(&tx1).expect("broadcasting first");
blockchain.broadcast(&tx2).expect("broadcasting replacement");

// TODO @evanlinjin: Core's `listtransactions` RPC call does not return conflicting
// unconfirmed transactions (unless we re-import associated scriptPubKey/descriptor)
#[cfg(feature = "rpc")]
blockchain.replace_import_parameters(Default::default());

receiver_wallet.sync(&blockchain, SyncOptions::default()).expect("syncing receiver");
assert_eq!(receiver_wallet.get_balance().expect("balance").untrusted_pending, 49_000, "should have received coins once and only once");
}
Expand Down

0 comments on commit 4dce1e6

Please sign in to comment.