Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Merge #3197
Browse files Browse the repository at this point in the history
3197: Self-implemented batching for script_get_history r=da-kami a=da-kami

This is an attempt to fix the memory growth caused by `rust-electrum-client`'s `batch_script_get_history` implementation.
We chunk up the scripts to be checked into batches and call `script_get_history` for the batches in parallel in separate worker threads.
The results are communicated back using a channel and accumulated before processing.

Includes tests that simulate the behavior. The tests rely on a mainnet Electrum instance and are ignored on CI.

Co-authored-by: Daniel Karzel <daniel@comit.network>
  • Loading branch information
bors[bot] and da-kami authored Oct 19, 2022
2 parents 6af7ff9 + c020c98 commit 0516c5a
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ xtras = { path = "../xtras" }
[dev-dependencies]
serde_test = "1"
time = { version = "0.3.15", features = ["std"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
156 changes: 148 additions & 8 deletions crates/daemon/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bdk::bitcoin::Txid;
use bdk::descriptor::Descriptor;
use bdk::electrum_client;
use bdk::electrum_client::ElectrumApi;
use bdk::electrum_client::GetHistoryRes;
use bdk::miniscript::DescriptorTrait;
use btsieve::ScriptStatus;
use btsieve::State;
Expand All @@ -24,6 +25,7 @@ use model::CET_TIMELOCK;
use serde_json::Value;
use sqlite_db;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::Instrument;
use xtra_productivity::xtra_productivity;
Expand All @@ -34,6 +36,14 @@ const CLOSE_FINALITY_CONFIRMATIONS: u32 = 3;
const COMMIT_FINALITY_CONFIRMATIONS: u32 = 1;
const CET_FINALITY_CONFIRMATIONS: u32 = 3;
const REFUND_FINALITY_CONFIRMATIONS: u32 = 3;
const BATCH_SIZE: usize = 25;

/// Electrum client timeout in seconds
///
/// This timeout is used when establishing the connection and for all requests of the electrum
/// client. We explicitly set the timeout because otherwise the underlying TCP connection timeout is
/// used which is hard to be predicted.
const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 120;

pub struct MonitorAfterContractSetup {
order_id: OrderId,
Expand Down Expand Up @@ -107,7 +117,7 @@ pub struct Sync;
// -> Might as well just send out all events independent of sending to the cfd actor.
pub struct Actor {
executor: command::Executor,
client: bdk::electrum_client::Client,
client: Arc<bdk::electrum_client::Client>,
state: State<Event>,
db: sqlite_db::Connection,
}
Expand Down Expand Up @@ -328,8 +338,13 @@ impl Actor {
electrum_rpc_url: String,
executor: command::Executor,
) -> Result<Self> {
let client = bdk::electrum_client::Client::new(&electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
let client = bdk::electrum_client::Client::from_config(
&electrum_rpc_url,
electrum_client::ConfigBuilder::new()
.timeout(Some(ELECTRUM_CLIENT_TIMEOUT_SECS))?
.build(),
)
.context("Failed to initialize Electrum RPC client")?;

// Initially fetch the latest block for storing the height.
// We do not act on this subscription after this call.
Expand All @@ -340,7 +355,7 @@ impl Actor {
.into();

Ok(Self {
client,
client: Arc::new(client),
executor,
state: State::new(latest_block),
db,
Expand Down Expand Up @@ -466,10 +481,13 @@ impl Actor {

tracing::trace!("Updating status of {num_transactions} transactions",);

let histories = self
.client
.batch_script_get_history(self.state.monitoring_scripts())
.context("Failed to get script histories")?;
let scripts = self
.state
.monitoring_scripts()
.cloned()
.collect::<Vec<Script>>();

let histories = batch_script_get_history(self.client.clone(), scripts).await;

let mut ready_events = self.state.update(
latest_block_height,
Expand Down Expand Up @@ -1011,3 +1029,125 @@ static TRANSACTION_BROADCAST_COUNTER: conquer_once::Lazy<prometheus::IntCounterV
)
.unwrap()
});

async fn batch_script_get_history(
client: Arc<electrum_client::Client>,
scripts: Vec<Script>,
) -> Vec<Vec<GetHistoryRes>> {
let (tx_script_updates, mut rx_script_updates) = tokio::sync::mpsc::channel(BATCH_SIZE * 4);

let scripts_len = scripts.len();
let batches = scripts.chunks(BATCH_SIZE).map(|batch| batch.to_owned());

// It's important to move here so the sender gets dropped and the receiver finishes correctly
batches.for_each(move |batch| {
let tx_script_updates = tx_script_updates.clone();
let client = client.clone();

tokio::task::spawn_blocking({
move || {
for script in batch {
match client.script_get_history(&script) {
Ok(script_history_response) => {
// We use blocking_send to stay within a sync context here
// One should not use async code in a spawn_blocking block
if let Err(e) = tx_script_updates.blocking_send(script_history_response)
{
tracing::error!(
"Error when processing script_get_history response: {e:#}"
)
}
}
Err(e) => {
tracing::error!("Error when fetching script history: {e:#}")
}
}
}
}
});
});

let mut histories = Vec::with_capacity(scripts_len);
while let Some(script_history) = rx_script_updates.recv().await {
histories.push(script_history)
}

histories
}

#[cfg(test)]
mod test {
use crate::monitor::batch_script_get_history;
use crate::monitor::ELECTRUM_CLIENT_TIMEOUT_SECS;
use bdk::bitcoin;
use bdk::bitcoin::Script;
use bdk::electrum_client;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use tracing_subscriber::util::SubscriberInitExt;

fn get_test_server() -> String {
std::env::var("TEST_ELECTRUM_SERVER")
.unwrap_or_else(|_| "electrum.blockstream.info:50001".into())
}

/// Test sanity of batch_script_get_history by simulating what the production code does
///
/// Ignored on CI because it requires a mainnet Electrum instance to run properly
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[ignore]
async fn given_many_scripts_then_batch_script_get_history_works() {
// Mt.Gox hack address
let script = bitcoin::Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF")
.unwrap()
.script_pubkey();

let mut scripts = Vec::new();

for _ in 0..100 {
scripts.push(script.clone());
}

test_batch_script_get_history(scripts).await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[ignore]
async fn given_script_not_on_chain_then_batch_script_get_history_returns_empty_response() {
// Some testnet address (to simulate a script where we get an empty response)
let script = bitcoin::Address::from_str("2MsQEtPJ6JJZszMYrD6udjUyTDFLczWQrv9")
.unwrap()
.script_pubkey();

test_batch_script_get_history(vec![script]).await;
}

async fn test_batch_script_get_history(scripts: Vec<Script>) {
let _guard = tracing_subscriber::fmt()
.with_env_filter("info")
.with_test_writer()
.set_default();

tracing::info!("Test runner started...");
let start_time = SystemTime::now();

let client = bdk::electrum_client::Client::from_config(
get_test_server().as_str(),
electrum_client::ConfigBuilder::new()
.timeout(Some(ELECTRUM_CLIENT_TIMEOUT_SECS))
.unwrap()
.build(),
)
.unwrap();

let scripts_len = scripts.len();
let rx_script_history = batch_script_get_history(Arc::new(client), scripts).await;

assert_eq!(scripts_len, rx_script_history.len());

let end_time = SystemTime::now();
let execution_duration = end_time.duration_since(start_time).unwrap();
tracing::info!("Total execution duration: {execution_duration:?}");
}
}

0 comments on commit 0516c5a

Please sign in to comment.