Skip to content

Commit

Permalink
refactor!(bdk): Update wallet start_sync, start_scan to return new Sy…
Browse files Browse the repository at this point in the history
…ncRequest, ScanRequest structs

Update esplora async_ext sync and scan to use new SyncRequest and ScanRequest
  • Loading branch information
notmandatory committed Nov 7, 2023
1 parent c97ddef commit 9e01dee
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 58 deletions.
94 changes: 55 additions & 39 deletions crates/bdk/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use core::fmt;
use core::ops::Deref;
use miniscript::psbt::{PsbtExt, PsbtInputExt, PsbtInputSatisfier};

use bdk_chain::request::{ScanRequest, SyncRequest};
use bdk_chain::tx_graph::CalculateFeeError;
#[allow(unused_imports)]
use log::{debug, error, info, trace};
Expand Down Expand Up @@ -113,9 +114,20 @@ pub struct Update {
pub chain: Option<local_chain::Update>,
}

impl From<(BTreeMap<KeychainKind, u32>, TxGraph<ConfirmationTimeAnchor>, local_chain::Update)> for Update
impl
From<(
BTreeMap<KeychainKind, u32>,
TxGraph<ConfirmationTimeAnchor>,
local_chain::Update,
)> for Update
{
fn from((last_active_indices, graph, chain): (BTreeMap<KeychainKind, u32>, TxGraph<ConfirmationTimeAnchor>, local_chain::Update)) -> Self {
fn from(
(last_active_indices, graph, chain): (
BTreeMap<KeychainKind, u32>,
TxGraph<ConfirmationTimeAnchor>,
local_chain::Update,
),
) -> Self {
Self {
last_active_indices,
graph,
Expand All @@ -124,8 +136,7 @@ impl From<(BTreeMap<KeychainKind, u32>, TxGraph<ConfirmationTimeAnchor>, local_c
}
}

impl From<(TxGraph<ConfirmationTimeAnchor>, local_chain::Update)> for Update
{
impl From<(TxGraph<ConfirmationTimeAnchor>, local_chain::Update)> for Update {
fn from((graph, chain): (TxGraph<ConfirmationTimeAnchor>, local_chain::Update)) -> Self {
Self {
graph,
Expand Down Expand Up @@ -2013,54 +2024,59 @@ impl<D> Wallet<D> {
///
/// Collect the wallet keychain script pub keys, local chain, and previous chain tip data needed
/// to start a blockchain scan.
pub fn start_scan(&self) -> (BTreeMap<KeychainKind, impl Iterator<Item = (u32, ScriptBuf)> + Clone>,
&LocalChain, Option<CheckPoint>) {

let keychain_spks = self.spks_of_all_keychains();
let local_chain = self.local_chain();
let prev_tip = self.latest_checkpoint();
pub fn start_scan(
&self,
) -> ScanRequest<KeychainKind, impl Iterator<Item = (u32, ScriptBuf)> + Clone> {
let spks_by_keychain = self.spks_of_all_keychains();
let checkpoint = self.latest_checkpoint();

(keychain_spks, local_chain, prev_tip)
ScanRequest {
spks_by_keychain,
checkpoint,
}
}

/// Get data needed to start a wallet sync
///
/// Collect the wallet keychain script pub keys, local chain, previous chain tip, UTXOs, and
/// unconfirmed transaction id data needed to start a blockchain sync.
pub fn start_sync(&self, unused_spks_only: bool) -> (Vec<ScriptBuf>, &LocalChain, Option<CheckPoint>, Vec<OutPoint>, Vec<Txid>) {

let local_chain = self.local_chain();
let prev_tip = self.latest_checkpoint();
pub fn start_sync(&self, unused_spks_only: bool) -> SyncRequest {
let checkpoint = self.latest_checkpoint();

// Sync only unused SPKs
let spks = if unused_spks_only {
self.spk_index()
.unused_spks(..)
.map(|((_keychain, _index), script)| ScriptBuf::from(script))
.collect::<Vec<ScriptBuf>>()
}
// Sync all SPKs
else {
self.spk_index()
.all_spks()
.into_iter()
.map(|((_keychain, _index), script)| (*script).clone())
.collect::<Vec<ScriptBuf>>()
};
self.spk_index()
.unused_spks(..)
.map(|((_keychain, _index), script)| ScriptBuf::from(script))
.collect::<Vec<ScriptBuf>>()
}
// Sync all SPKs
else {
self.spk_index()
.all_spks()
.into_iter()
.map(|((_keychain, _index), script)| (*script).clone())
.collect::<Vec<ScriptBuf>>()
};

// Sync UTXOs
// We want to search for whether our UTXOs are spent, and spent by which transaction.
let outpoints: Vec<OutPoint> = self.list_unspent().map(|utxo| utxo.outpoint).collect();
// Sync UTXOs
// We want to search for whether our UTXOs are spent, and spent by which transaction.
let outpoints: Vec<OutPoint> = self.list_unspent().map(|utxo| utxo.outpoint).collect();

// Sync unconfirmed TX
// We want to search for whether our unconfirmed transactions are now confirmed.
let txids: Vec<Txid> = self
.transactions()
.filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
.map(|canonical_tx| canonical_tx.tx_node.txid)
.collect();
// Sync unconfirmed TX
// We want to search for whether our unconfirmed transactions are now confirmed.
let txids: Vec<Txid> = self
.transactions()
.filter(|canonical_tx| !canonical_tx.chain_position.is_confirmed())
.map(|canonical_tx| canonical_tx.tx_node.txid)
.collect();

(spks, local_chain, prev_tip, outpoints, txids)
SyncRequest {
spks,
txids,
outpoints,
checkpoint,
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ mod spk_iter;
#[cfg(feature = "miniscript")]
pub use spk_iter::*;

/// Structures for requesting data needed to sync or scan the blockchain for chain related data.
pub mod request;

#[allow(unused_imports)]
#[macro_use]
extern crate alloc;
Expand Down
24 changes: 24 additions & 0 deletions crates/chain/src/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::local_chain::CheckPoint;
use alloc::vec::Vec;
use bitcoin::{OutPoint, ScriptBuf, Txid};
use std::collections::BTreeMap;

/// A list of blockchain entities for which we want to receive any related transaction data.
pub struct SyncRequest {
/// transactions that spend from or two these script pubkeys
pub spks: Vec<ScriptBuf>,
/// Transactions with these txids
pub txids: Vec<Txid>,
/// Transactions with these outpoints or spend from these outpoints
pub outpoints: Vec<OutPoint>,
/// The local chain checkpoint. The sync process will return a new chain that extends this one.
pub checkpoint: Option<CheckPoint>,
}

/// Script pubkeys indexed by their keychain.
pub struct ScanRequest<K, I> {
/// Iterators of script pubkeys indexed by the keychain index
pub spks_by_keychain: BTreeMap<K, I>,
/// The local chain checkpoint. The scan process will return a new chain that extends this one.
pub checkpoint: Option<CheckPoint>,
}
46 changes: 31 additions & 15 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::local_chain::LocalChain;
use bdk_chain::request::{ScanRequest, SyncRequest};
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
collections::{BTreeMap, BTreeSet},
Expand All @@ -8,7 +10,6 @@ use bdk_chain::{
};
use esplora_client::{Error, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};
use bdk_chain::local_chain::LocalChain;

use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};

Expand All @@ -33,29 +34,37 @@ pub trait EsploraAsyncExt {
/// which is used to update a wallets [`KeychainTxOutIndex`].
/// * graph_update, contains an update to a wallet's internal [`TxGraph`].
/// * chain_update, contains an update to a wallet's internal [`LocalChain`].
async fn scan<K: Clone + Ord + Send>(
async fn scan<K: Clone + Ord + Send, I: Iterator<Item = (u32, ScriptBuf)> + Send>(
&self,
start_scan: (BTreeMap<K, impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,>,
&LocalChain,
Option<CheckPoint>),
scan_request: ScanRequest<K, I>,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(BTreeMap<K, u32>, TxGraph<ConfirmationTimeAnchor>, local_chain::Update), Error> {
let (keychain_spks, local_chain, local_tip) = start_scan;
) -> Result<
(
BTreeMap<K, u32>,
TxGraph<ConfirmationTimeAnchor>,
local_chain::Update,
),
Error,
> {
let (graph_update, last_active_indices) = self
.scan_txs_with_keychains(
keychain_spks,
scan_request.spks_by_keychain,
core::iter::empty(),
core::iter::empty(),
stop_gap,
parallel_requests,
)
.await?;

let missing_heights = graph_update.missing_heights(local_chain);
let local_chain = scan_request
.checkpoint
.map(|cp| LocalChain::from_tip(cp))
.unwrap_or_default();
let missing_heights = graph_update.missing_heights(&local_chain);

let chain_update = self
.update_local_chain(local_tip, missing_heights)
.update_local_chain(local_chain.tip(), missing_heights)
.await?;

Ok((last_active_indices, graph_update, chain_update))
Expand All @@ -69,16 +78,23 @@ pub trait EsploraAsyncExt {
/// * chain_update, contains an update to a wallet's internal [`LocalChain`].
async fn sync(
&self,
start_sync: (Vec<ScriptBuf>, &LocalChain, Option<CheckPoint>, Vec<OutPoint>, Vec<Txid>),
sync_request: SyncRequest,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeAnchor>, local_chain::Update), Error> {
let (spks, local_chain, local_tip, outpoints, txids) = start_sync;
let graph_update = self
.scan_txs(spks.into_iter(), txids.into_iter(), outpoints.into_iter(), parallel_requests)
.scan_txs(
sync_request.spks.into_iter(),
sync_request.txids.into_iter(),
sync_request.outpoints.into_iter(),
parallel_requests,
)
.await?;

let missing_heights = graph_update.missing_heights(local_chain);
let chain_update = self.update_local_chain(local_tip, missing_heights).await?;
let local_chain = LocalChain::from_tip(sync_request.checkpoint.unwrap());
let missing_heights = graph_update.missing_heights(&local_chain);
let chain_update = self
.update_local_chain(local_chain.tip(), missing_heights)
.await?;

Ok((graph_update, chain_update))
}
Expand Down
10 changes: 6 additions & 4 deletions example-crates/wallet_esplora_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// wallet restoration. It is a special case. Applications should use "sync" style updates
// after an initial scan.
if prompt("Scan wallet") {
let start_scan = wallet.start_scan();
let wallet_update = client.scan(start_scan, STOP_GAP, PARALLEL_REQUESTS).await?;
let scan_request = wallet.start_scan();
let wallet_update = client
.scan(scan_request, STOP_GAP, PARALLEL_REQUESTS)
.await?;
wallet.apply_update(wallet_update.into())?;
wallet.commit()?;
println!("Scan completed.");
Expand All @@ -52,8 +54,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// status or fetch missing transactions.
else {
let unused_spks_only = prompt("Sync only unused SPKs");
let start_sync = wallet.start_sync(unused_spks_only);
let wallet_update = client.sync(start_sync, PARALLEL_REQUESTS).await?;
let sync_request = wallet.start_sync(unused_spks_only);
let wallet_update = client.sync(sync_request, PARALLEL_REQUESTS).await?;
wallet.apply_update(wallet_update.into())?;
wallet.commit()?;
println!("Sync completed.");
Expand Down

0 comments on commit 9e01dee

Please sign in to comment.