Skip to content

Commit

Permalink
feat(example): use changeset staging with rpc polling example
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin authored and notmandatory committed Jun 13, 2024
1 parent 8063848 commit 6323d55
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions example-crates/example_bitcoind_rpc_polling/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use bdk_bitcoind_rpc::{
bitcoincore_rpc::{Auth, Client, RpcApi},
Emitter,
};
use bdk_chain::persist::PersistBackend;
use bdk_chain::persist::{PersistBackend, StageExt};
use bdk_chain::{
bitcoin::{constants::genesis_block, Block, Transaction},
indexed_tx_graph, keychain,
local_chain::{self, LocalChain},
ConfirmationTimeHeightAnchor, IndexedTxGraph,
Append, ConfirmationTimeHeightAnchor, IndexedTxGraph,
};
use example_cli::{
anyhow,
Expand Down Expand Up @@ -176,6 +176,7 @@ fn main() -> anyhow::Result<()> {
let chain_tip = chain.lock().unwrap().tip();
let rpc_client = rpc_args.new_client()?;
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
let mut db_stage = ChangeSet::default();

let mut last_db_commit = Instant::now();
let mut last_print = Instant::now();
Expand All @@ -185,17 +186,18 @@ fn main() -> anyhow::Result<()> {

let mut chain = chain.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut db = db.lock().unwrap();

let chain_changeset = chain
.apply_update(emission.checkpoint)
.expect("must always apply as we receive blocks in order from emitter");
let graph_changeset = graph.apply_block_relevant(&emission.block, height);
db.write_changes(&(chain_changeset, graph_changeset))?;
db_stage.append((chain_changeset, graph_changeset));

// commit staged db changes in intervals
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
let db = &mut *db.lock().unwrap();
last_db_commit = Instant::now();
db_stage.commit_to(db)?;
println!(
"[{:>10}s] committed to db (took {}s)",
start.elapsed().as_secs_f32(),
Expand Down Expand Up @@ -230,8 +232,11 @@ fn main() -> anyhow::Result<()> {
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
);
{
let mut db = db.lock().unwrap();
db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?;
let db = &mut *db.lock().unwrap();
db_stage.append_and_commit_to(
(local_chain::ChangeSet::default(), graph_changeset),
db,
)?;
}
}
RpcCommands::Live { rpc_args } => {
Expand Down Expand Up @@ -287,9 +292,9 @@ fn main() -> anyhow::Result<()> {
let mut tip_height = 0_u32;
let mut last_db_commit = Instant::now();
let mut last_print = Option::<Instant>::None;
let mut db_stage = ChangeSet::default();

for emission in rx {
let mut db = db.lock().unwrap();
let mut graph = graph.lock().unwrap();
let mut chain = chain.lock().unwrap();

Expand All @@ -314,11 +319,12 @@ fn main() -> anyhow::Result<()> {
continue;
}
};

db.write_changes(&changeset)?;
db_stage.append(changeset);

if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
let db = &mut *db.lock().unwrap();
last_db_commit = Instant::now();
db_stage.commit_to(db)?;
println!(
"[{:>10}s] committed to db (took {}s)",
start.elapsed().as_secs_f32(),
Expand Down

0 comments on commit 6323d55

Please sign in to comment.