Skip to content

Commit

Permalink
Introduce pagenated import logic to RpcBlockchain
Browse files Browse the repository at this point in the history
* Add `RpcSyncParams::page_size` that restricts req/resp array count
  for various RPC calls.
* Add `pagenated_import` function.
  • Loading branch information
evanlinjin committed Sep 1, 2022
1 parent 5319816 commit ff398ec
Showing 1 changed file with 49 additions and 20 deletions.
69 changes: 49 additions & 20 deletions src/blockchain/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub struct RpcSyncParams {
pub force_start_time: bool,
/// RPC poll rate (in seconds) to get state updates.
pub poll_rate_sec: u64,
/// Page size for RPC calls (`importdescriptors`, `importmulti` and `listtransactions`).
pub page_size: usize,
}

impl Default for RpcSyncParams {
Expand All @@ -123,7 +125,8 @@ impl Default for RpcSyncParams {
start_script_count: 100,
start_time: 0,
force_start_time: false,
poll_rate_sec: 3,
poll_rate_sec: 2,
page_size: 200,
}
}
}
Expand Down Expand Up @@ -432,7 +435,7 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {

// sync scriptPubKeys from Database into Core wallet, starting from derivation indexes
// defined in `import_params`
let (scripts_count, scripts_iter) = {
let scripts_iter = {
let ext_spks = self
.ext_spks
.iter()
Expand All @@ -441,31 +444,25 @@ impl<'a, D: BatchDatabase> DbState<'a, D> {
.int_spks
.iter()
.skip(import_params.internal_start_index);

let scripts_count = ext_spks.len() + int_spks.len();
let scripts_iter = ext_spks.chain(int_spks);
println!("scripts count: {}", scripts_count);

(scripts_count, scripts_iter)
ext_spks.chain(int_spks)
};

if scripts_count > 0 {
if use_desc {
import_descriptors(client, start_epoch, scripts_iter)?;
} else {
import_multi(client, start_epoch, scripts_iter)?;
}
}

await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;
pagenated_import(
client,
use_desc,
start_epoch,
self.params.poll_rate_sec,
self.params.page_size,
scripts_iter,
self.prog,
)?;
// await_wallet_scan(client, self.params.poll_rate_sec, self.prog)?;

// update import_params
import_params.external_start_index = self.ext_spks.len();
import_params.internal_start_index = self.int_spks.len();

// obtain iterator of pagenated `listtransactions` RPC calls
const LIST_TX_PAGE_SIZE: usize = 100; // item count per page
let tx_iter = list_transactions(client, LIST_TX_PAGE_SIZE)?.filter(|item| {
let tx_iter = list_transactions(client, self.params.page_size)?.filter(|item| {
// filter out conflicting transactions - only accept transactions that are already
// confirmed, or exists in mempool
let confirmed = item.info.confirmations > 0;
Expand Down Expand Up @@ -789,6 +786,38 @@ where
Ok(())
}

fn pagenated_import<'a, S>(
client: &Client,
use_desc: bool,
start_epoch: u64,
poll_rate_sec: u64,
page_size: usize,
scripts_iter: S,
progress: &dyn Progress,
) -> Result<(), Error>
where
S: Iterator<Item = &'a Script> + Clone,
{
(0_usize..)
.map(|page_index| {
scripts_iter
.clone()
.skip(page_index * page_size)
.take(page_size)
.cloned()
.collect::<Vec<_>>()
})
.take_while(|scripts| !scripts.is_empty())
.try_for_each(|scripts| {
if use_desc {
import_descriptors(client, start_epoch, scripts.iter())?;
} else {
import_multi(client, start_epoch, scripts.iter())?;
}
await_wallet_scan(client, poll_rate_sec, progress)
})
}

/// Calls the `listtransactions` RPC method in `page_size`s and returns iterator of the tx results
/// in chronological order.
///
Expand Down

0 comments on commit ff398ec

Please sign in to comment.