Skip to content

Commit 80daed4

Browse files
authored
Merge pull request #86 from buffrr/wallet-cbf
Support syncing with compact block filters BIP-157/BIP-158
2 parents 9b58d36 + 20b374e commit 80daed4

File tree

12 files changed

+618
-83
lines changed

12 files changed

+618
-83
lines changed

client/src/bin/spaced.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use spaces_client::{
88
rpc::{AsyncChainState, RpcServerImpl, WalletLoadRequest, WalletManager},
99
source::{BitcoinBlockSource, BitcoinRpc},
1010
store,
11-
sync::Spaced,
11+
spaces::Spaced,
1212
wallets::RpcWallet,
1313
};
1414
use store::LiveSnapshot;
@@ -53,14 +53,15 @@ impl Composer {
5353
}
5454
}
5555

56-
async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver<WalletLoadRequest>) {
56+
async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver<WalletLoadRequest>, cbf: bool) {
5757
let wallet_service = RpcWallet::service(
5858
spaced.network,
5959
spaced.rpc.clone(),
6060
spaced.chain.state.clone(),
6161
rx,
6262
self.shutdown.clone(),
6363
spaced.num_workers,
64+
cbf
6465
);
6566

6667
self.services.spawn(async move {
@@ -107,7 +108,7 @@ impl Composer {
107108
.map_err(|e| anyhow!("RPC Server error: {}", e))
108109
});
109110

110-
self.setup_rpc_wallet(spaced, wallet_loader_rx).await;
111+
self.setup_rpc_wallet(spaced, wallet_loader_rx, spaced.cbf).await;
111112
}
112113

113114
async fn setup_sync_service(&mut self, mut spaced: Spaced) {

client/src/cbf.rs

+291
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
use std::collections::{BTreeMap, HashSet, VecDeque};
2+
use std::time::Duration;
3+
use anyhow::anyhow;
4+
use log::info;
5+
use tokio::time::Instant;
6+
use spaces_protocol::bitcoin::BlockHash;
7+
use spaces_protocol::constants::ChainAnchor;
8+
use spaces_wallet::bdk_wallet::chain::{local_chain, BlockId, ConfirmationBlockTime, IndexedTxGraph, TxUpdate};
9+
use spaces_wallet::bdk_wallet::chain::keychain_txout::KeychainTxOutIndex;
10+
use spaces_wallet::bdk_wallet::{KeychainKind, Update};
11+
use spaces_wallet::bitcoin::bip158::BlockFilter;
12+
use spaces_wallet::bitcoin::ScriptBuf;
13+
use spaces_wallet::SpacesWallet;
14+
use crate::client::{BlockSource, BlockchainInfo};
15+
use crate::source::BitcoinBlockSource;
16+
use crate::wallets::WalletProgressUpdate;
17+
18+
pub struct CompactFilterSync {
19+
graph: IndexedTxGraph<ConfirmationBlockTime, KeychainTxOutIndex<KeychainKind>>,
20+
chain: local_chain::LocalChain,
21+
chain_changeset: BTreeMap<u32, Option<BlockHash>>,
22+
scripts: HashSet<ScriptBuf>,
23+
last_peek_index: u32,
24+
initial_tip: ChainAnchor,
25+
queued_blocks: BTreeMap<u32, BlockHash>,
26+
queued_filters: VecDeque<u32>,
27+
filters_tip: u32,
28+
block_matches: u32,
29+
total_filters: u32,
30+
wait: Option<Instant>,
31+
state: SyncState,
32+
}
33+
34+
enum SyncState {
35+
SyncChecks,
36+
LoadFilterRange(BlockchainInfo),
37+
ProcessFilters,
38+
QueueBlocks,
39+
WaitForBlocks,
40+
ProcessBlocks,
41+
ApplyUpdate,
42+
Synced,
43+
}
44+
45+
impl CompactFilterSync {
46+
pub fn new(wallet: &SpacesWallet) -> Self {
47+
let initial_tip = {
48+
let tip = wallet.local_chain().tip();
49+
ChainAnchor { height: tip.height(), hash: tip.hash() }
50+
};
51+
52+
let mut cbf = Self {
53+
graph: IndexedTxGraph::new(wallet.spk_index().clone()),
54+
chain: wallet.local_chain().clone(),
55+
chain_changeset: BTreeMap::new(),
56+
scripts: HashSet::new(),
57+
last_peek_index: 0,
58+
initial_tip,
59+
queued_blocks: BTreeMap::new(),
60+
queued_filters: Default::default(),
61+
filters_tip: 0,
62+
block_matches: 0,
63+
total_filters: 0,
64+
wait: None,
65+
state: SyncState::SyncChecks,
66+
};
67+
cbf.load_scripts(wallet);
68+
cbf
69+
}
70+
71+
fn load_scripts(&mut self, wallet: &SpacesWallet) {
72+
let lookahead = wallet.spk_index().lookahead();
73+
let mut max_idx = 0;
74+
for keychain in [KeychainKind::External, KeychainKind::Internal] {
75+
let last_revealed = wallet
76+
.spk_index()
77+
.last_revealed_index(keychain)
78+
.unwrap_or(0);
79+
let chain_limit = last_revealed + lookahead;
80+
for idx in 0..=chain_limit {
81+
let script = wallet.peek_address(keychain, idx).script_pubkey();
82+
self.scripts.insert(script);
83+
}
84+
max_idx = max_idx.max(chain_limit);
85+
}
86+
self.last_peek_index = max_idx;
87+
}
88+
89+
/// Expand scripts by an additional fixed window beyond the last peek
90+
fn load_more_scripts(&mut self, wallet: &SpacesWallet) {
91+
let end = self.last_peek_index + 10;
92+
for keychain in [KeychainKind::External, KeychainKind::Internal] {
93+
for idx in self.last_peek_index..=end {
94+
let script = wallet.peek_address(keychain, idx).script_pubkey();
95+
self.scripts.insert(script);
96+
}
97+
}
98+
self.last_peek_index = end;
99+
}
100+
101+
pub fn synced(&self) -> bool {
102+
matches!(self.state, SyncState::Synced)
103+
}
104+
105+
pub fn sync_next(
106+
&mut self,
107+
wallet: &mut SpacesWallet,
108+
source: &BitcoinBlockSource,
109+
progress: &mut WalletProgressUpdate,
110+
) -> anyhow::Result<()> {
111+
if self.wait.is_some_and(|w| w.elapsed() < Duration::from_secs(10)) {
112+
return Ok(());
113+
}
114+
self.wait = None;
115+
116+
match &self.state {
117+
SyncState::SyncChecks => {
118+
let info = source.get_blockchain_info()?;
119+
if info.headers != info.blocks {
120+
info!("Source still syncing, retrying...");
121+
*progress = WalletProgressUpdate::Syncing;
122+
self.wait = Some(Instant::now());
123+
return Ok(());
124+
}
125+
if info.filters != info.filter_headers {
126+
info!("Filters syncing, retrying...");
127+
*progress = WalletProgressUpdate::CbfFilterSync {
128+
total: info.filter_headers.unwrap_or(0),
129+
completed: info.filters.unwrap_or(0),
130+
};
131+
self.wait = Some(Instant::now());
132+
return Ok(());
133+
}
134+
// if wallet already past filter headers, we're done
135+
if let Some(filter_headers) = info.filter_headers {
136+
if self.initial_tip.height >= filter_headers {
137+
info!("wallet({}): tip {} >= filters {}, cbf done", wallet.name(), self.initial_tip.height, filter_headers);
138+
self.state = SyncState::Synced;
139+
return Ok(());
140+
}
141+
}
142+
self.state = SyncState::LoadFilterRange(info);
143+
}
144+
SyncState::LoadFilterRange(info) => {
145+
let checkpoint = info
146+
.checkpoint
147+
.ok_or_else(|| anyhow!("filter sync: checkpoint missing"))?;
148+
if self.initial_tip.height < checkpoint.height {
149+
return Err(anyhow!(
150+
"Wallet birthday {} < checkpoint {}", self.initial_tip.height, checkpoint.height
151+
));
152+
}
153+
154+
let start = self.initial_tip.height;
155+
let end = info
156+
.prune_height
157+
.ok_or(anyhow!("Prune height missing"))?;
158+
let available_filters = info.filters.ok_or(anyhow!("Filters missing"))?;
159+
if end > available_filters {
160+
return Err(anyhow!("Prune height {} > {} available filters", end, available_filters));
161+
}
162+
163+
if start >= end {
164+
return Ok(());
165+
}
166+
for height in start..=end {
167+
self.queued_filters.push_back(height);
168+
}
169+
self.filters_tip = end;
170+
self.total_filters = self.queued_filters.len() as u32;
171+
self.state = SyncState::ProcessFilters;
172+
}
173+
SyncState::ProcessFilters => {
174+
let height = match self.queued_filters.pop_front() {
175+
None => {
176+
self.state = SyncState::QueueBlocks;
177+
return Ok(());
178+
}
179+
Some(f) => f,
180+
};
181+
let idx_filter = source.get_block_filter_by_height(height)?;
182+
let idx_filter = idx_filter
183+
.ok_or_else(|| anyhow!("filter sync: block filter missing {}", height))?;
184+
let filter = BlockFilter::new(&idx_filter.content);
185+
if filter.match_any(&idx_filter.hash, self.scripts.iter().map(|s| s.as_bytes()))? {
186+
self.queued_blocks.insert(height, idx_filter.hash);
187+
self.load_more_scripts(wallet);
188+
self.block_matches += 1;
189+
info!("wallet({}) processed block filter {} - match found", wallet.name(), height);
190+
} else {
191+
info!("wallet({}) processed block filter {} - no match", wallet.name(), height);
192+
}
193+
*progress = WalletProgressUpdate::CbfProcessFilters {
194+
total: self.total_filters,
195+
completed: self.total_filters - self.queued_filters.len() as u32,
196+
};
197+
}
198+
SyncState::QueueBlocks => {
199+
if !self.queued_blocks.is_empty() {
200+
let heights: Vec<u32> = self.queued_blocks.keys().copied().collect();
201+
info!("wallet({}): queueing {} blocks", wallet.name(), heights.len());
202+
source.queue_blocks(heights)?;
203+
}
204+
self.state = SyncState::WaitForBlocks;
205+
}
206+
SyncState::WaitForBlocks => {
207+
let info = source.get_blockchain_info()?;
208+
let status = info
209+
.block_queue
210+
.as_ref()
211+
.ok_or_else(|| anyhow!("filter sync: block queue missing"))?;
212+
213+
if status.pending > 0 {
214+
info!("wallet({}): waiting for {} pending blocks", wallet.name(), status.pending);
215+
216+
// The client has a global state for pending blocks in the queue
217+
// so we cap it just in case other things are queuing blocks
218+
// at the same time
219+
let pending = std::cmp::min(status.pending, self.block_matches);
220+
*progress = WalletProgressUpdate::CbfDownloadMatchingBlocks {
221+
total: self.block_matches,
222+
completed: self.block_matches - pending,
223+
};
224+
225+
self.wait = Some(Instant::now());
226+
return Ok(());
227+
}
228+
229+
if status.completed < self.queued_blocks.len() as u32 {
230+
return Err(anyhow!(
231+
"incomplete downloads: {} of {}", status.completed, self.queued_blocks.len()
232+
));
233+
}
234+
self.state = SyncState::ProcessBlocks;
235+
}
236+
SyncState::ProcessBlocks => {
237+
let (height, hash) = match self.queued_blocks.pop_first() {
238+
None => {
239+
*progress = WalletProgressUpdate::CbfApplyUpdate;
240+
self.state = SyncState::ApplyUpdate;
241+
return Ok(());
242+
}
243+
Some(f) => f,
244+
};
245+
info!("wallet({}): processing block {} {}", wallet.name(), height, hash);
246+
let block = source.get_block(&hash)?
247+
.ok_or(anyhow!("block {} {} not found", height, hash))?;
248+
self.chain_changeset.insert(height, Some(hash));
249+
let _ = self.graph.apply_block_relevant(&block, height);
250+
*progress = WalletProgressUpdate::CbfProcessMatchingBlocks {
251+
total: self.block_matches,
252+
completed: self.block_matches - self.queued_blocks.len() as u32 ,
253+
};
254+
}
255+
SyncState::ApplyUpdate => {
256+
info!("wallet({}): updating wallet tip to {}", wallet.name(), self.filters_tip);
257+
let filters_anchor = BlockId {
258+
height: self.filters_tip,
259+
hash: source.get_block_hash(self.filters_tip)?,
260+
};
261+
262+
let update = self.get_scan_response();
263+
wallet.apply_update(update)?;
264+
wallet.insert_checkpoint(filters_anchor)?;
265+
info!("wallet({}): compact filter sync portion complete at {}", wallet.name(), self.filters_tip);
266+
self.state = SyncState::Synced;
267+
// Only CBF portion is done
268+
*progress = WalletProgressUpdate::Syncing
269+
}
270+
SyncState::Synced => {},
271+
}
272+
Ok(())
273+
}
274+
275+
// based on https://github.com/bitcoindevkit/bdk-kyoto/blob/master/src/lib.rs#L137
276+
fn get_scan_response(&mut self) -> Update {
277+
let changes = std::mem::take(&mut self.chain_changeset);
278+
self.chain
279+
.apply_changeset(&local_chain::ChangeSet::from(changes))
280+
.expect("initialized from genesis");
281+
let tx_update = TxUpdate::from(self.graph.graph().clone());
282+
let graph = std::mem::take(&mut self.graph);
283+
let last_indices = graph.index.last_used_indices();
284+
self.graph = IndexedTxGraph::new(graph.index);
285+
Update {
286+
tx_update,
287+
last_active_indices: last_indices,
288+
chain: Some(self.chain.tip()),
289+
}
290+
}
291+
}

0 commit comments

Comments
 (0)