diff --git a/src/builder.rs b/src/builder.rs index 30a1649d2..85ec70d18 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,11 +5,11 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL}; +use crate::chain::ChainSource; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, BitcoindRestClientConfig, Config, - ElectrumSyncConfig, EsploraSyncConfig, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, - WALLET_KEYS_SEED_LEN, + ElectrumSyncConfig, EsploraSyncConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, + DEFAULT_LOG_LEVEL, WALLET_KEYS_SEED_LEN, }; use crate::connection::ConnectionManager; diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 98e77cac7..b87ee13ed 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -5,26 +5,578 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use super::WalletSyncStatus; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use bitcoin::{BlockHash, FeeRate, Transaction, Txid}; +use crate::config::{ + BitcoindRestClientConfig, Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS, +}; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + ConfirmationTarget, OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; +use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::{Error, NodeMetrics}; + +use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; use lightning::chain::Listen; +use lightning::util::ser::Writeable; + use lightning_block_sync::gossip::UtxoSource; use lightning_block_sync::http::{HttpEndpoint, JsonResponse}; -use lightning_block_sync::poll::ValidatedBlockHeader; +use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header}; +use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; use lightning_block_sync::rest::RestClient; use lightning_block_sync::rpc::{RpcClient, RpcError}; use lightning_block_sync::{ AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache, }; +use lightning_block_sync::{BlockSourceErrorKind, SpvClient}; use serde::Serialize; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bitcoin::{BlockHash, FeeRate, Network, Transaction, Txid}; + use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; + +pub(super) struct BitcoindChainSource { + api_client: Arc, + header_cache: tokio::sync::Mutex, + latest_chain_tip: RwLock>, + onchain_wallet: Arc, + wallet_polling_status: Mutex, + fee_estimator: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: Arc, + node_metrics: Arc>, +} + +impl BitcoindChainSource { + pub(crate) fn new_rpc( + rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, + onchain_wallet: Arc, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> Self { + let api_client = Arc::new(BitcoindClient::new_rpc( + rpc_host.clone(), + rpc_port.clone(), + rpc_user.clone(), + rpc_password.clone(), + )); + + let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); + let latest_chain_tip = RwLock::new(None); + let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); + Self { + api_client, + header_cache, + latest_chain_tip, + onchain_wallet, + wallet_polling_status, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger: Arc::clone(&logger), + node_metrics, + } + } + + pub(crate) fn new_rest( + rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, + onchain_wallet: Arc, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + rest_client_config: BitcoindRestClientConfig, logger: Arc, + node_metrics: Arc>, + ) -> Self { + let api_client = Arc::new(BitcoindClient::new_rest( + rest_client_config.rest_host, + rest_client_config.rest_port, + rpc_host, + rpc_port, + rpc_user, + rpc_password, + )); + + let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); + let latest_chain_tip = RwLock::new(None); + let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); + + Self { + api_client, + header_cache, + latest_chain_tip, + wallet_polling_status, + onchain_wallet, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger: Arc::clone(&logger), + node_metrics, + } + } + + pub(super) fn as_utxo_source(&self) -> Arc { + self.api_client.utxo_source() + } + + pub(super) async fn continuously_sync_wallets( + &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, + channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) { + // First register for the wallet polling status to make sure `Node::sync_wallets` calls + // wait on the result before proceeding. + { + let mut status_lock = self.wallet_polling_status.lock().unwrap(); + if status_lock.register_or_subscribe_pending_sync().is_some() { + debug_assert!(false, "Sync already in progress. This should never happen."); + } + } + + log_info!( + self.logger, + "Starting initial synchronization of chain listeners. This might take a while..", + ); + + let mut backoff = CHAIN_POLLING_INTERVAL_SECS; + const MAX_BACKOFF_SECS: u64 = 300; + + loop { + let channel_manager_best_block_hash = channel_manager.current_best_block().block_hash; + let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; + let onchain_wallet_best_block_hash = + self.onchain_wallet.current_best_block().block_hash; + + let mut chain_listeners = vec![ + ( + onchain_wallet_best_block_hash, + &*self.onchain_wallet as &(dyn Listen + Send + Sync), + ), + (channel_manager_best_block_hash, &*channel_manager as &(dyn Listen + Send + Sync)), + (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), + ]; + + // TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s + // before giving them to `ChainMonitor` it the first place. However, this isn't + // trivial as we load them on initialization (in the `Builder`) and only gain + // network access during `start`. For now, we just make sure we get the worst known + // block hash and sychronize them via `ChainMonitor`. + if let Some(worst_channel_monitor_block_hash) = chain_monitor + .list_monitors() + .iter() + .flat_map(|(txo, _)| chain_monitor.get_monitor(*txo)) + .map(|m| m.current_best_block()) + .min_by_key(|b| b.height) + .map(|b| b.block_hash) + { + chain_listeners.push(( + worst_channel_monitor_block_hash, + &*chain_monitor as &(dyn Listen + Send + Sync), + )); + } + + let mut locked_header_cache = self.header_cache.lock().await; + let now = SystemTime::now(); + match synchronize_listeners( + self.api_client.as_ref(), + self.config.network, + &mut *locked_header_cache, + chain_listeners.clone(), + ) + .await + { + Ok(chain_tip) => { + { + log_info!( + self.logger, + "Finished synchronizing listeners in {}ms", + now.elapsed().unwrap().as_millis() + ); + *self.latest_chain_tip.write().unwrap() = Some(chain_tip); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = + unix_time_secs_opt; + locked_node_metrics.latest_onchain_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + ) + .unwrap_or_else(|e| { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + }); + } + break; + }, + + Err(e) => { + log_error!(self.logger, "Failed to synchronize chain listeners: {:?}", e); + if e.kind() == BlockSourceErrorKind::Transient { + log_info!( + self.logger, + "Transient error syncing chain listeners: {:?}. Retrying in {} seconds.", + e, + backoff + ); + tokio::time::sleep(Duration::from_secs(backoff)).await; + backoff = std::cmp::min(backoff * 2, MAX_BACKOFF_SECS); + } else { + log_error!( + self.logger, + "Persistent error syncing chain listeners: {:?}. Retrying in {} seconds.", + e, + MAX_BACKOFF_SECS + ); + tokio::time::sleep(Duration::from_secs(MAX_BACKOFF_SECS)).await; + } + }, + } + } + + // Now propagate the initial result to unblock waiting subscribers. + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(Ok(())); + + let mut chain_polling_interval = + tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); + chain_polling_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); + // When starting up, we just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + log_info!(self.logger, "Starting continuous polling for chain updates."); + + // Start the polling loop. + loop { + tokio::select! { + _ = stop_sync_receiver.changed() => { + log_trace!( + self.logger, + "Stopping polling for new chain data.", + ); + return; + } + _ = chain_polling_interval.tick() => { + let _ = self.poll_and_update_listeners( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&output_sweeper) + ).await; + } + _ = fee_rate_update_interval.tick() => { + let _ = self.update_fee_rate_estimates().await; + } + } + } + } + + pub(super) async fn poll_and_update_listeners( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.wallet_polling_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + + if let Some(mut sync_receiver) = receiver_res { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet polling result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet polling result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); + let chain_tip = if let Some(tip) = latest_chain_tip_opt { + tip + } else { + match validate_best_block_header(self.api_client.as_ref()).await { + Ok(tip) => { + *self.latest_chain_tip.write().unwrap() = Some(tip); + tip + }, + Err(e) => { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + let res = Err(Error::TxSyncFailed); + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + }; + + let mut locked_header_cache = self.header_cache.lock().await; + let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network); + let chain_listener = ChainListener { + onchain_wallet: Arc::clone(&self.onchain_wallet), + channel_manager: Arc::clone(&channel_manager), + chain_monitor, + output_sweeper, + }; + let mut spv_client = + SpvClient::new(chain_tip, chain_poller, &mut *locked_header_cache, &chain_listener); + + let now = SystemTime::now(); + match spv_client.poll_best_tip().await { + Ok((ChainTip::Better(tip), true)) => { + log_trace!( + self.logger, + "Finished polling best tip in {}ms", + now.elapsed().unwrap().as_millis() + ); + *self.latest_chain_tip.write().unwrap() = Some(tip); + }, + Ok(_) => {}, + Err(e) => { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + let res = Err(Error::TxSyncFailed); + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let cur_height = channel_manager.current_best_block().height; + + let now = SystemTime::now(); + let unconfirmed_txids = self.onchain_wallet.get_unconfirmed_txids(); + match self.api_client.get_updated_mempool_transactions(cur_height, unconfirmed_txids).await + { + Ok((unconfirmed_txs, evicted_txids)) => { + log_trace!( + self.logger, + "Finished polling mempool of size {} and {} evicted transactions in {}ms", + unconfirmed_txs.len(), + evicted_txids.len(), + now.elapsed().unwrap().as_millis() + ); + self.onchain_wallet + .apply_mempool_txs(unconfirmed_txs, evicted_txids) + .unwrap_or_else(|e| { + log_error!(self.logger, "Failed to apply mempool transactions: {:?}", e); + }); + }, + Err(e) => { + log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e); + let res = Err(Error::TxSyncFailed); + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; + locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; + + let write_res = write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + ); + match write_res { + Ok(()) => (), + Err(e) => { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + let res = Err(Error::PersistenceFailed); + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let res = Ok(()); + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + res + } + + pub(super) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + macro_rules! get_fee_rate_update { + ($estimation_fut: expr) => {{ + let update_res = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + $estimation_fut, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })?; + update_res + }}; + } + let confirmation_targets = get_all_conf_targets(); + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + let now = Instant::now(); + for target in confirmation_targets { + let fee_rate_update_res = match target { + ConfirmationTarget::Lightning( + LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + ) => { + let estimation_fut = self.api_client.get_mempool_minimum_fee_rate(); + get_fee_rate_update!(estimation_fut) + }, + ConfirmationTarget::Lightning(LdkConfirmationTarget::MaximumFeeEstimate) => { + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Conservative; + let estimation_fut = + self.api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + ConfirmationTarget::Lightning(LdkConfirmationTarget::UrgentOnChainSweep) => { + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Conservative; + let estimation_fut = + self.api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + _ => { + // Otherwise, we default to economical block-target estimate. + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Economical; + let estimation_fut = + self.api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + }; + + let fee_rate = match (fee_rate_update_res, self.config.network) { + (Ok(rate), _) => rate, + (Err(e), Network::Bitcoin) => { + // Strictly fail on mainnet. + log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); + return Err(Error::FeerateEstimationUpdateFailed); + }, + (Err(e), n) if n == Network::Regtest || n == Network::Signet => { + // On regtest/signet we just fall back to the usual 1 sat/vb == 250 + // sat/kwu default. + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: {}. Falling back to default of 1 sat/vb.", + e, + ); + FeeRate::from_sat_per_kwu(250) + }, + (Err(e), _) => { + // On testnet `estimatesmartfee` can be unreliable so we just skip in + // case of a failure, which will have us falling back to defaults. + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: {}. Falling back to defaults.", + e, + ); + return Ok(()); + }, + }; + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + if self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache) { + // We only log if the values changed, as it might be very spammy otherwise. + log_info!( + self.logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + } + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + + Ok(()) + } + + pub(crate) async fn process_broadcast_queue(&self) { + // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 + // features, we should eventually switch to use `submitpackage` via the + // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual + // transactions. + let mut receiver = self.tx_broadcaster.get_broadcast_queue().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + let txid = tx.compute_txid(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + self.api_client.broadcast_transaction(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(id) => { + debug_assert_eq!(id, txid); + log_trace!(self.logger, "Successfully broadcast transaction {}", txid); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + } + } + } + } +} pub enum BitcoindClient { Rpc { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 9882e652b..44a637cc3 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -5,16 +5,21 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use super::{periodically_archive_fully_resolved_monitors, WalletSyncStatus}; + use crate::config::{ - Config, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, - LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS, + Config, ElectrumSyncConfig, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, + FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS, }; use crate::error::Error; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, - ConfirmationTarget, + ConfirmationTarget, OnchainFeeEstimator, }; +use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::NodeMetrics; use lightning::chain::{Confirm, Filter, WatchedOutput}; use lightning::util::ser::Writeable; @@ -25,6 +30,7 @@ use bdk_chain::bdk_core::spk_client::FullScanResponse as BdkFullScanResponse; use bdk_chain::bdk_core::spk_client::SyncRequest as BdkSyncRequest; use bdk_chain::bdk_core::spk_client::SyncResponse as BdkSyncResponse; use bdk_wallet::KeychainKind as BdkKeyChainKind; +use bdk_wallet::Update as BdkUpdate; use bdk_electrum::BdkElectrumClient; @@ -32,17 +38,358 @@ use electrum_client::Client as ElectrumClient; use electrum_client::ConfigBuilder as ElectrumConfigBuilder; use electrum_client::{Batch, ElectrumApi}; -use bitcoin::{FeeRate, Network, Script, Transaction, Txid}; +use bitcoin::{FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; use std::collections::HashMap; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3; const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10; -pub(crate) struct ElectrumRuntimeClient { +pub(super) struct ElectrumChainSource { + server_url: String, + pub(super) sync_config: ElectrumSyncConfig, + electrum_runtime_status: RwLock, + onchain_wallet: Arc, + onchain_wallet_sync_status: Mutex, + lightning_wallet_sync_status: Mutex, + fee_estimator: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: Arc, + node_metrics: Arc>, +} + +impl ElectrumChainSource { + pub(super) fn new( + server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> Self { + let electrum_runtime_status = RwLock::new(ElectrumRuntimeStatus::new()); + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + Self { + server_url, + sync_config, + electrum_runtime_status, + onchain_wallet, + onchain_wallet_sync_status, + lightning_wallet_sync_status, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger: Arc::clone(&logger), + node_metrics, + } + } + + pub(super) fn start(&self, runtime: Arc) -> Result<(), Error> { + self.electrum_runtime_status.write().unwrap().start( + self.server_url.clone(), + Arc::clone(&runtime), + Arc::clone(&self.config), + Arc::clone(&self.logger), + ) + } + + pub(super) fn stop(&self) { + self.electrum_runtime_status.write().unwrap().stop(); + } + + pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!( + false, + "We should have started the chain source before syncing the onchain wallet" + ); + return Err(Error::FeerateEstimationUpdateFailed); + }; + let receiver_res = { + let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + // If this is our first sync, do a full scan with the configured gap limit. + // Otherwise just do an incremental sync. + let incremental_sync = + self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + + let apply_wallet_update = + |update_res: Result, now: Instant| match update_res { + Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(()) => { + log_info!( + self.logger, + "{} of on-chain wallet finished in {}ms.", + if incremental_sync { "Incremental sync" } else { "Sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_onchain_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + Ok(()) + }, + Err(e) => Err(e), + }, + Err(e) => Err(e), + }; + + let cached_txs = self.onchain_wallet.get_cached_txs(); + + let res = if incremental_sync { + let incremental_sync_request = self.onchain_wallet.get_incremental_sync_request(); + let incremental_sync_fut = electrum_client + .get_incremental_sync_wallet_update(incremental_sync_request, cached_txs); + + let now = Instant::now(); + let update_res = incremental_sync_fut.await.map(|u| u.into()); + apply_wallet_update(update_res, now) + } else { + let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let full_scan_fut = + electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs); + let now = Instant::now(); + let update_res = full_scan_fut.await.map(|u| u.into()); + apply_wallet_update(update_res, now) + }; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + pub(crate) async fn sync_lightning_wallet( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!( + false, + "We should have started the chain source before syncing the lightning wallet" + ); + return Err(Error::TxSyncFailed); + }; + + let sync_cman = Arc::clone(&channel_manager); + let sync_cmon = Arc::clone(&chain_monitor); + let sync_sweeper = Arc::clone(&output_sweeper); + let confirmables = vec![ + sync_cman as Arc, + sync_cmon as Arc, + sync_sweeper as Arc, + ]; + + let receiver_res = { + let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::TxSyncFailed + })?; + } + + let res = electrum_client.sync_confirmables(confirmables).await; + + if let Ok(_) = res { + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + + periodically_archive_fully_resolved_monitors( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + Arc::clone(&self.node_metrics), + )?; + } + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + let electrum_client: Arc = if let Some(client) = + self.electrum_runtime_status.read().unwrap().client().as_ref() + { + Arc::clone(client) + } else { + debug_assert!(false, "We should have started the chain source before updating fees"); + return Err(Error::FeerateEstimationUpdateFailed); + }; + + let now = Instant::now(); + + let new_fee_rate_cache = electrum_client.get_fee_rate_cache_update().await?; + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + log_info!( + self.logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + + Ok(()) + } + + pub(crate) async fn process_broadcast_queue(&self) { + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!(false, "We should have started the chain source before broadcasting"); + return; + }; + + let mut receiver = self.tx_broadcaster.get_broadcast_queue().await; + while let Some(next_package) = receiver.recv().await { + for tx in next_package { + electrum_client.broadcast(tx).await; + } + } + } +} + +impl Filter for ElectrumChainSource { + fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { + self.electrum_runtime_status.write().unwrap().register_tx(txid, script_pubkey) + } + fn register_output(&self, output: lightning::chain::WatchedOutput) { + self.electrum_runtime_status.write().unwrap().register_output(output) + } +} + +enum ElectrumRuntimeStatus { + Started(Arc), + Stopped { + pending_registered_txs: Vec<(Txid, ScriptBuf)>, + pending_registered_outputs: Vec, + }, +} + +impl ElectrumRuntimeStatus { + fn new() -> Self { + let pending_registered_txs = Vec::new(); + let pending_registered_outputs = Vec::new(); + Self::Stopped { pending_registered_txs, pending_registered_outputs } + } + + pub(super) fn start( + &mut self, server_url: String, runtime: Arc, config: Arc, + logger: Arc, + ) -> Result<(), Error> { + match self { + Self::Stopped { pending_registered_txs, pending_registered_outputs } => { + let client = Arc::new(ElectrumRuntimeClient::new( + server_url.clone(), + runtime, + config, + logger, + )?); + + // Apply any pending `Filter` entries + for (txid, script_pubkey) in pending_registered_txs.drain(..) { + client.register_tx(&txid, &script_pubkey); + } + + for output in pending_registered_outputs.drain(..) { + client.register_output(output) + } + + *self = Self::Started(client); + }, + Self::Started(_) => { + debug_assert!(false, "We shouldn't call start if we're already started") + }, + } + Ok(()) + } + + pub(super) fn stop(&mut self) { + *self = Self::new() + } + + fn client(&self) -> Option> { + match self { + Self::Started(client) => Some(Arc::clone(&client)), + Self::Stopped { .. } => None, + } + } + + fn register_tx(&mut self, txid: &Txid, script_pubkey: &Script) { + match self { + Self::Started(client) => client.register_tx(txid, script_pubkey), + Self::Stopped { pending_registered_txs, .. } => { + pending_registered_txs.push((*txid, script_pubkey.to_owned())) + }, + } + } + + fn register_output(&mut self, output: lightning::chain::WatchedOutput) { + match self { + Self::Started(client) => client.register_output(output), + Self::Stopped { pending_registered_outputs, .. } => { + pending_registered_outputs.push(output) + }, + } + } +} + +struct ElectrumRuntimeClient { electrum_client: Arc, bdk_electrum_client: Arc>, tx_sync: Arc>>, @@ -52,7 +399,7 @@ pub(crate) struct ElectrumRuntimeClient { } impl ElectrumRuntimeClient { - pub(crate) fn new( + fn new( server_url: String, runtime: Arc, config: Arc, logger: Arc, ) -> Result { @@ -82,7 +429,7 @@ impl ElectrumRuntimeClient { Ok(Self { electrum_client, bdk_electrum_client, tx_sync, runtime, config, logger }) } - pub(crate) async fn sync_confirmables( + async fn sync_confirmables( &self, confirmables: Vec>, ) -> Result<(), Error> { let now = Instant::now(); @@ -116,7 +463,7 @@ impl ElectrumRuntimeClient { Ok(res) } - pub(crate) async fn get_full_scan_wallet_update( + async fn get_full_scan_wallet_update( &self, request: BdkFullScanRequest, cached_txs: impl IntoIterator>>, ) -> Result, Error> { @@ -150,7 +497,7 @@ impl ElectrumRuntimeClient { }) } - pub(crate) async fn get_incremental_sync_wallet_update( + async fn get_incremental_sync_wallet_update( &self, request: BdkSyncRequest<(BdkKeyChainKind, u32)>, cached_txs: impl IntoIterator>>, ) -> Result { @@ -179,7 +526,7 @@ impl ElectrumRuntimeClient { }) } - pub(crate) async fn broadcast(&self, tx: Transaction) { + async fn broadcast(&self, tx: Transaction) { let electrum_client = Arc::clone(&self.electrum_client); let txid = tx.compute_txid(); @@ -221,7 +568,7 @@ impl ElectrumRuntimeClient { } } - pub(crate) async fn get_fee_rate_cache_update( + async fn get_fee_rate_cache_update( &self, ) -> Result, Error> { let electrum_client = Arc::clone(&self.electrum_client); diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs new file mode 100644 index 000000000..3a911394c --- /dev/null +++ b/src/chain/esplora.rs @@ -0,0 +1,448 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use super::{periodically_archive_fully_resolved_monitors, WalletSyncStatus}; + +use crate::config::{ + Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, + BDK_WALLET_SYNC_TIMEOUT_SECS, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, + FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS, +}; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; +use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::{Error, NodeMetrics}; + +use lightning::chain::{Confirm, Filter, WatchedOutput}; +use lightning::util::ser::Writeable; + +use lightning_transaction_sync::EsploraSyncClient; + +use bdk_esplora::EsploraAsyncExt; + +use esplora_client::AsyncClient as EsploraAsyncClient; + +use bitcoin::{FeeRate, Network, Script, Txid}; + +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +pub(super) struct EsploraChainSource { + pub(super) sync_config: EsploraSyncConfig, + esplora_client: EsploraAsyncClient, + onchain_wallet: Arc, + onchain_wallet_sync_status: Mutex, + tx_sync: Arc>>, + lightning_wallet_sync_status: Mutex, + fee_estimator: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: Arc, + node_metrics: Arc>, +} + +impl EsploraChainSource { + pub(crate) fn new( + server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, + onchain_wallet: Arc, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> Self { + // FIXME / TODO: We introduced this to make `bdk_esplora` work separately without updating + // `lightning-transaction-sync`. We should revert this as part of of the upgrade to LDK 0.2. + let mut client_builder_0_11 = esplora_client_0_11::Builder::new(&server_url); + client_builder_0_11 = client_builder_0_11.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + + for (header_name, header_value) in &headers { + client_builder_0_11 = client_builder_0_11.header(header_name, header_value); + } + + let esplora_client_0_11 = client_builder_0_11.build_async().unwrap(); + let tx_sync = + Arc::new(EsploraSyncClient::from_client(esplora_client_0_11, Arc::clone(&logger))); + + let mut client_builder = esplora_client::Builder::new(&server_url); + client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + + for (header_name, header_value) in &headers { + client_builder = client_builder.header(header_name, header_value); + } + + let esplora_client = client_builder.build_async().unwrap(); + + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + Self { + sync_config, + esplora_client, + onchain_wallet, + onchain_wallet_sync_status, + tx_sync, + lightning_wallet_sync_status, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger, + node_metrics, + } + } + + pub(super) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let res = { + // If this is our first sync, do a full scan with the configured gap limit. + // Otherwise just do an incremental sync. + let incremental_sync = + self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + + macro_rules! get_and_apply_wallet_update { + ($sync_future: expr) => {{ + let now = Instant::now(); + match $sync_future.await { + Ok(res) => match res { + Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(()) => { + log_info!( + self.logger, + "{} of on-chain wallet finished in {}ms.", + if incremental_sync { "Incremental sync" } else { "Sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger) + )?; + } + Ok(()) + }, + Err(e) => Err(e), + }, + Err(e) => match *e { + esplora_client::Error::Reqwest(he) => { + log_error!( + self.logger, + "{} of on-chain wallet failed due to HTTP connection error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + he + ); + Err(Error::WalletOperationFailed) + }, + _ => { + log_error!( + self.logger, + "{} of on-chain wallet failed due to Esplora error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationFailed) + }, + }, + }, + Err(e) => { + log_error!( + self.logger, + "{} of on-chain wallet timed out: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationTimeout) + }, + } + }} + } + + if incremental_sync { + let sync_request = self.onchain_wallet.get_incremental_sync_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } else { + let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + self.esplora_client.full_scan( + full_scan_request, + BDK_CLIENT_STOP_GAP, + BDK_CLIENT_CONCURRENCY, + ), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } + }; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + pub(super) async fn sync_lightning_wallet( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let sync_cman = Arc::clone(&channel_manager); + let sync_cmon = Arc::clone(&chain_monitor); + let sync_sweeper = Arc::clone(&output_sweeper); + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), + ]; + + let receiver_res = { + let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + let res = { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), + self.tx_sync.sync(confirmables), + ); + let now = Instant::now(); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_info!( + self.logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + + periodically_archive_fully_resolved_monitors( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + Arc::clone(&self.node_metrics), + )?; + Ok(()) + }, + Err(e) => { + log_error!(self.logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) + }, + }, + Err(e) => { + log_error!(self.logger, "Lightning wallet sync timed out: {}", e); + Err(Error::TxSyncTimeout) + }, + } + }; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + let now = Instant::now(); + let estimates = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + self.esplora_client.get_fee_estimates(), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { + log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })?; + + if estimates.is_empty() && self.config.network == Network::Bitcoin { + // Ensure we fail if we didn't receive any estimates. + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: empty fee estimates are dissallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + let confirmation_targets = get_all_conf_targets(); + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + + // Convert the retrieved fee rate and fall back to 1 sat/vb if we fail or it + // yields less than that. This is mostly necessary to continue on + // `signet`/`regtest` where we might not get estimates (or bogus values). + let converted_estimate_sat_vb = + esplora_client::convert_fee_rate(num_blocks, estimates.clone()) + .map_or(1.0, |converted| converted.max(1.0)); + + let fee_rate = FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + log_info!( + self.logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + )?; + } + + Ok(()) + } + + pub(crate) async fn process_broadcast_queue(&self) { + let mut receiver = self.tx_broadcaster.get_broadcast_queue().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + let txid = tx.compute_txid(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + self.esplora_client.broadcast(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!(self.logger, "Successfully broadcast transaction {}", txid); + }, + Err(e) => match e { + esplora_client::Error::HttpResponse { status, message } => { + if status == 400 { + // Log 400 at lesser level, as this often just means bitcoind already knows the + // transaction. + // FIXME: We can further differentiate here based on the error + // message which will be available with rust-esplora-client 0.7 and + // later. + log_trace!( + self.logger, + "Failed to broadcast due to HTTP connection error: {}", + message + ); + } else { + log_error!( + self.logger, + "Failed to broadcast due to HTTP connection error: {} - {}", + status, + message + ); + } + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + _ => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + } + } + } + } +} + +impl Filter for EsploraChainSource { + fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { + self.tx_sync.register_tx(txid, script_pubkey); + } + fn register_output(&self, output: WatchedOutput) { + self.tx_sync.register_output(output); + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 2f8eeaac4..91cce1fe3 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -7,56 +7,30 @@ mod bitcoind; mod electrum; +mod esplora; -use crate::chain::bitcoind::{ - BitcoindClient, BoundedHeaderCache, ChainListener, FeeRateEstimationMode, -}; -use crate::chain::electrum::ElectrumRuntimeClient; +use crate::chain::bitcoind::BitcoindChainSource; +use crate::chain::electrum::ElectrumChainSource; +use crate::chain::esplora::EsploraChainSource; use crate::config::{ BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, - BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, - FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, - RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, TX_BROADCAST_TIMEOUT_SECS, - WALLET_SYNC_INTERVAL_MINIMUM_SECS, -}; -use crate::fee_estimator::{ - apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, - ConfirmationTarget, OnchainFeeEstimator, + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; +use crate::fee_estimator::OnchainFeeEstimator; use crate::io::utils::write_node_metrics; -use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_info, log_trace, LdkLogger, Logger}; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; -use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; -use lightning::chain::{Confirm, Filter, Listen, WatchedOutput}; -use lightning::util::ser::Writeable; - -use lightning_transaction_sync::EsploraSyncClient; +use lightning::chain::Filter; use lightning_block_sync::gossip::UtxoSource; -use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header}; -use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; -use lightning_block_sync::{BlockSourceErrorKind, SpvClient}; - -use bdk_esplora::EsploraAsyncExt; -use bdk_wallet::Update as BdkUpdate; - -use esplora_client::AsyncClient as EsploraAsyncClient; -use bitcoin::{FeeRate, Network, Script, ScriptBuf, Txid}; +use bitcoin::{Script, Txid}; use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; - -// The default Esplora server we're using. -pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; - -// The default Esplora client timeout we're using. -pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; - -const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; +use std::sync::{Arc, RwLock}; +use std::time::Duration; pub(crate) enum WalletSyncStatus { Completed, @@ -111,124 +85,15 @@ impl WalletSyncStatus { } } -pub(crate) enum ElectrumRuntimeStatus { - Started(Arc), - Stopped { - pending_registered_txs: Vec<(Txid, ScriptBuf)>, - pending_registered_outputs: Vec, - }, +pub(crate) struct ChainSource { + kind: ChainSourceKind, + logger: Arc, } -impl ElectrumRuntimeStatus { - pub(crate) fn new() -> Self { - let pending_registered_txs = Vec::new(); - let pending_registered_outputs = Vec::new(); - Self::Stopped { pending_registered_txs, pending_registered_outputs } - } - - pub(crate) fn start( - &mut self, server_url: String, runtime: Arc, config: Arc, - logger: Arc, - ) -> Result<(), Error> { - match self { - Self::Stopped { pending_registered_txs, pending_registered_outputs } => { - let client = Arc::new(ElectrumRuntimeClient::new( - server_url.clone(), - runtime, - config, - logger, - )?); - - // Apply any pending `Filter` entries - for (txid, script_pubkey) in pending_registered_txs.drain(..) { - client.register_tx(&txid, &script_pubkey); - } - - for output in pending_registered_outputs.drain(..) { - client.register_output(output) - } - - *self = Self::Started(client); - }, - Self::Started(_) => { - debug_assert!(false, "We shouldn't call start if we're already started") - }, - } - Ok(()) - } - - pub(crate) fn stop(&mut self) { - *self = Self::new() - } - - pub(crate) fn client(&self) -> Option> { - match self { - Self::Started(client) => Some(Arc::clone(&client)), - Self::Stopped { .. } => None, - } - } - - fn register_tx(&mut self, txid: &Txid, script_pubkey: &Script) { - match self { - Self::Started(client) => client.register_tx(txid, script_pubkey), - Self::Stopped { pending_registered_txs, .. } => { - pending_registered_txs.push((*txid, script_pubkey.to_owned())) - }, - } - } - - fn register_output(&mut self, output: lightning::chain::WatchedOutput) { - match self { - Self::Started(client) => client.register_output(output), - Self::Stopped { pending_registered_outputs, .. } => { - pending_registered_outputs.push(output) - }, - } - } -} - -pub(crate) enum ChainSource { - Esplora { - sync_config: EsploraSyncConfig, - esplora_client: EsploraAsyncClient, - onchain_wallet: Arc, - onchain_wallet_sync_status: Mutex, - tx_sync: Arc>>, - lightning_wallet_sync_status: Mutex, - fee_estimator: Arc, - tx_broadcaster: Arc, - kv_store: Arc, - config: Arc, - logger: Arc, - node_metrics: Arc>, - }, - Electrum { - server_url: String, - sync_config: ElectrumSyncConfig, - electrum_runtime_status: RwLock, - onchain_wallet: Arc, - onchain_wallet_sync_status: Mutex, - lightning_wallet_sync_status: Mutex, - fee_estimator: Arc, - tx_broadcaster: Arc, - kv_store: Arc, - config: Arc, - logger: Arc, - node_metrics: Arc>, - }, - Bitcoind { - api_client: Arc, - header_cache: tokio::sync::Mutex, - latest_chain_tip: RwLock>, - onchain_wallet: Arc, - wallet_polling_status: Mutex, - fee_estimator: Arc, - tx_broadcaster: Arc, - kv_store: Arc, - config: Arc, - logger: Arc, - node_metrics: Arc>, - }, +enum ChainSourceKind { + Esplora(EsploraChainSource), + Electrum(ElectrumChainSource), + Bitcoind(BitcoindChainSource), } impl ChainSource { @@ -238,44 +103,20 @@ impl ChainSource { tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - // FIXME / TODO: We introduced this to make `bdk_esplora` work separately without updating - // `lightning-transaction-sync`. We should revert this as part of of the upgrade to LDK 0.2. - let mut client_builder_0_11 = esplora_client_0_11::Builder::new(&server_url); - client_builder_0_11 = client_builder_0_11.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); - - for (header_name, header_value) in &headers { - client_builder_0_11 = client_builder_0_11.header(header_name, header_value); - } - - let esplora_client_0_11 = client_builder_0_11.build_async().unwrap(); - let tx_sync = - Arc::new(EsploraSyncClient::from_client(esplora_client_0_11, Arc::clone(&logger))); - - let mut client_builder = esplora_client::Builder::new(&server_url); - client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); - - for (header_name, header_value) in &headers { - client_builder = client_builder.header(header_name, header_value); - } - - let esplora_client = client_builder.build_async().unwrap(); - - let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - Self::Esplora { + let esplora_chain_source = EsploraChainSource::new( + server_url, + headers, sync_config, - esplora_client, onchain_wallet, - onchain_wallet_sync_status, - tx_sync, - lightning_wallet_sync_status, fee_estimator, tx_broadcaster, kv_store, config, - logger, + Arc::clone(&logger), node_metrics, - } + ); + let kind = ChainSourceKind::Esplora(esplora_chain_source); + Self { kind, logger } } pub(crate) fn new_electrum( @@ -284,23 +125,19 @@ impl ChainSource { kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - let electrum_runtime_status = RwLock::new(ElectrumRuntimeStatus::new()); - let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - Self::Electrum { + let electrum_chain_source = ElectrumChainSource::new( server_url, sync_config, - electrum_runtime_status, onchain_wallet, - onchain_wallet_sync_status, - lightning_wallet_sync_status, fee_estimator, tx_broadcaster, kv_store, config, - logger, + Arc::clone(&logger), node_metrics, - } + ); + let kind = ChainSourceKind::Electrum(electrum_chain_source); + Self { kind, logger } } pub(crate) fn new_bitcoind_rpc( @@ -309,29 +146,21 @@ impl ChainSource { tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - let api_client = Arc::new(BitcoindClient::new_rpc( - rpc_host.clone(), - rpc_port.clone(), - rpc_user.clone(), - rpc_password.clone(), - )); - - let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); - let latest_chain_tip = RwLock::new(None); - let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); - Self::Bitcoind { - api_client, - header_cache, - latest_chain_tip, + let bitcoind_chain_source = BitcoindChainSource::new_rpc( + rpc_host, + rpc_port, + rpc_user, + rpc_password, onchain_wallet, - wallet_polling_status, fee_estimator, tx_broadcaster, kv_store, config, - logger, + Arc::clone(&logger), node_metrics, - } + ); + let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); + Self { kind, logger } } pub(crate) fn new_bitcoind_rest( @@ -341,43 +170,28 @@ impl ChainSource { rest_client_config: BitcoindRestClientConfig, logger: Arc, node_metrics: Arc>, ) -> Self { - let api_client = Arc::new(BitcoindClient::new_rest( - rest_client_config.rest_host, - rest_client_config.rest_port, + let bitcoind_chain_source = BitcoindChainSource::new_rest( rpc_host, rpc_port, rpc_user, rpc_password, - )); - - let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); - let latest_chain_tip = RwLock::new(None); - let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); - - Self::Bitcoind { - api_client, - header_cache, - latest_chain_tip, - wallet_polling_status, onchain_wallet, fee_estimator, tx_broadcaster, kv_store, config, - logger, + rest_client_config, + Arc::clone(&logger), node_metrics, - } + ); + let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); + Self { kind, logger } } pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { - match self { - Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => { - electrum_runtime_status.write().unwrap().start( - server_url.clone(), - Arc::clone(&runtime), - Arc::clone(&config), - Arc::clone(&logger), - )?; + match &self.kind { + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.start(runtime)? }, _ => { // Nothing to do for other chain sources. @@ -387,10 +201,8 @@ impl ChainSource { } pub(crate) fn stop(&self) { - match self { - Self::Electrum { electrum_runtime_status, .. } => { - electrum_runtime_status.write().unwrap().stop(); - }, + match &self.kind { + ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(), _ => { // Nothing to do for other chain sources. }, @@ -398,223 +210,81 @@ impl ChainSource { } pub(crate) fn as_utxo_source(&self) -> Option> { - match self { - Self::Bitcoind { api_client, .. } => Some(api_client.utxo_source()), + match &self.kind { + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + Some(bitcoind_chain_source.as_utxo_source()) + }, _ => None, } } + pub(crate) fn is_transaction_based(&self) -> bool { + match &self.kind { + ChainSourceKind::Esplora(_) => true, + ChainSourceKind::Electrum { .. } => true, + ChainSourceKind::Bitcoind { .. } => false, + } + } + pub(crate) async fn continuously_sync_wallets( - &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, + &self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) { - match self { - Self::Esplora { sync_config, logger, .. } => { - if let Some(background_sync_config) = sync_config.background_sync_config.as_ref() { + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + if let Some(background_sync_config) = + esplora_chain_source.sync_config.background_sync_config.as_ref() + { self.start_tx_based_sync_loop( stop_sync_receiver, channel_manager, chain_monitor, output_sweeper, background_sync_config, - Arc::clone(&logger), + Arc::clone(&self.logger), ) .await } else { // Background syncing is disabled log_info!( - logger, + self.logger, "Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.", ); return; } }, - Self::Electrum { sync_config, logger, .. } => { - if let Some(background_sync_config) = sync_config.background_sync_config.as_ref() { + ChainSourceKind::Electrum(electrum_chain_source) => { + if let Some(background_sync_config) = + electrum_chain_source.sync_config.background_sync_config.as_ref() + { self.start_tx_based_sync_loop( stop_sync_receiver, channel_manager, chain_monitor, output_sweeper, background_sync_config, - Arc::clone(&logger), + Arc::clone(&self.logger), ) .await } else { // Background syncing is disabled log_info!( - logger, + self.logger, "Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.", ); return; } }, - Self::Bitcoind { - api_client, - header_cache, - latest_chain_tip, - onchain_wallet, - wallet_polling_status, - kv_store, - config, - logger, - node_metrics, - .. - } => { - // First register for the wallet polling status to make sure `Node::sync_wallets` calls - // wait on the result before proceeding. - { - let mut status_lock = wallet_polling_status.lock().unwrap(); - if status_lock.register_or_subscribe_pending_sync().is_some() { - debug_assert!(false, "Sync already in progress. This should never happen."); - } - } - - log_info!( - logger, - "Starting initial synchronization of chain listeners. This might take a while..", - ); - - let mut backoff = CHAIN_POLLING_INTERVAL_SECS; - const MAX_BACKOFF_SECS: u64 = 300; - - loop { - let channel_manager_best_block_hash = - channel_manager.current_best_block().block_hash; - let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; - let onchain_wallet_best_block_hash = - onchain_wallet.current_best_block().block_hash; - - let mut chain_listeners = vec![ - ( - onchain_wallet_best_block_hash, - &**onchain_wallet as &(dyn Listen + Send + Sync), - ), - ( - channel_manager_best_block_hash, - &*channel_manager as &(dyn Listen + Send + Sync), - ), - (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), - ]; - - // TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s - // before giving them to `ChainMonitor` it the first place. However, this isn't - // trivial as we load them on initialization (in the `Builder`) and only gain - // network access during `start`. For now, we just make sure we get the worst known - // block hash and sychronize them via `ChainMonitor`. - if let Some(worst_channel_monitor_block_hash) = chain_monitor - .list_monitors() - .iter() - .flat_map(|(txo, _)| chain_monitor.get_monitor(*txo)) - .map(|m| m.current_best_block()) - .min_by_key(|b| b.height) - .map(|b| b.block_hash) - { - chain_listeners.push(( - worst_channel_monitor_block_hash, - &*chain_monitor as &(dyn Listen + Send + Sync), - )); - } - - let mut locked_header_cache = header_cache.lock().await; - let now = SystemTime::now(); - match synchronize_listeners( - api_client.as_ref(), - config.network, - &mut *locked_header_cache, - chain_listeners.clone(), + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + bitcoind_chain_source + .continuously_sync_wallets( + stop_sync_receiver, + channel_manager, + chain_monitor, + output_sweeper, ) .await - { - Ok(chain_tip) => { - { - log_info!( - logger, - "Finished synchronizing listeners in {}ms", - now.elapsed().unwrap().as_millis() - ); - *latest_chain_tip.write().unwrap() = Some(chain_tip); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_lightning_wallet_sync_timestamp = - unix_time_secs_opt; - locked_node_metrics.latest_onchain_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .unwrap_or_else(|e| { - log_error!(logger, "Failed to persist node metrics: {}", e); - }); - } - break; - }, - - Err(e) => { - log_error!(logger, "Failed to synchronize chain listeners: {:?}", e); - if e.kind() == BlockSourceErrorKind::Transient { - log_info!( - logger, - "Transient error syncing chain listeners: {:?}. Retrying in {} seconds.", - e, - backoff - ); - tokio::time::sleep(Duration::from_secs(backoff)).await; - backoff = std::cmp::min(backoff * 2, MAX_BACKOFF_SECS); - } else { - log_error!( - logger, - "Persistent error syncing chain listeners: {:?}. Retrying in {} seconds.", - e, - MAX_BACKOFF_SECS - ); - tokio::time::sleep(Duration::from_secs(MAX_BACKOFF_SECS)).await; - } - }, - } - } - - // Now propagate the initial result to unblock waiting subscribers. - wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(Ok(())); - - let mut chain_polling_interval = - tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); - chain_polling_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - let mut fee_rate_update_interval = - tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); - // When starting up, we just blocked on updating, so skip the first tick. - fee_rate_update_interval.reset(); - fee_rate_update_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - log_info!(logger, "Starting continuous polling for chain updates."); - - // Start the polling loop. - loop { - tokio::select! { - _ = stop_sync_receiver.changed() => { - log_trace!( - logger, - "Stopping polling for new chain data.", - ); - return; - } - _ = chain_polling_interval.tick() => { - let _ = self.poll_and_update_listeners(Arc::clone(&channel_manager), Arc::clone(&chain_monitor), Arc::clone(&output_sweeper)).await; - } - _ = fee_rate_update_interval.tick() => { - let _ = self.update_fee_rate_estimates().await; - } - } - } }, } } @@ -681,213 +351,14 @@ impl ChainSource { // Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum, // etc.) pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { - match self { - Self::Esplora { - esplora_client, - onchain_wallet, - onchain_wallet_sync_status, - kv_store, - logger, - node_metrics, - .. - } => { - let receiver_res = { - let mut status_lock = onchain_wallet_sync_status.lock().unwrap(); - status_lock.register_or_subscribe_pending_sync() - }; - if let Some(mut sync_receiver) = receiver_res { - log_info!(logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); - log_error!(logger, "Failed to receive wallet sync result: {:?}", e); - Error::WalletOperationFailed - })?; - } - - let res = { - // If this is our first sync, do a full scan with the configured gap limit. - // Otherwise just do an incremental sync. - let incremental_sync = - node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); - - macro_rules! get_and_apply_wallet_update { - ($sync_future: expr) => {{ - let now = Instant::now(); - match $sync_future.await { - Ok(res) => match res { - Ok(update) => match onchain_wallet.apply_update(update) { - Ok(()) => { - log_info!( - logger, - "{} of on-chain wallet finished in {}ms.", - if incremental_sync { "Incremental sync" } else { "Sync" }, - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), Arc::clone(&logger))?; - } - Ok(()) - }, - Err(e) => Err(e), - }, - Err(e) => match *e { - esplora_client::Error::Reqwest(he) => { - log_error!( - logger, - "{} of on-chain wallet failed due to HTTP connection error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - he - ); - Err(Error::WalletOperationFailed) - }, - _ => { - log_error!( - logger, - "{} of on-chain wallet failed due to Esplora error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationFailed) - }, - }, - }, - Err(e) => { - log_error!( - logger, - "{} of on-chain wallet timed out: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationTimeout) - }, - } - }} - } - - if incremental_sync { - let sync_request = onchain_wallet.get_incremental_sync_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - } else { - let full_scan_request = onchain_wallet.get_full_scan_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - esplora_client.full_scan( - full_scan_request, - BDK_CLIENT_STOP_GAP, - BDK_CLIENT_CONCURRENCY, - ), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - } - }; - - onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - - res + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source.sync_onchain_wallet().await }, - Self::Electrum { - electrum_runtime_status, - onchain_wallet, - onchain_wallet_sync_status, - kv_store, - logger, - node_metrics, - .. - } => { - let electrum_client: Arc = if let Some(client) = - electrum_runtime_status.read().unwrap().client().as_ref() - { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before syncing the onchain wallet" - ); - return Err(Error::FeerateEstimationUpdateFailed); - }; - let receiver_res = { - let mut status_lock = onchain_wallet_sync_status.lock().unwrap(); - status_lock.register_or_subscribe_pending_sync() - }; - if let Some(mut sync_receiver) = receiver_res { - log_info!(logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); - log_error!(logger, "Failed to receive wallet sync result: {:?}", e); - Error::WalletOperationFailed - })?; - } - - // If this is our first sync, do a full scan with the configured gap limit. - // Otherwise just do an incremental sync. - let incremental_sync = - node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); - - let apply_wallet_update = - |update_res: Result, now: Instant| match update_res { - Ok(update) => match onchain_wallet.apply_update(update) { - Ok(()) => { - log_info!( - logger, - "{} of on-chain wallet finished in {}ms.", - if incremental_sync { "Incremental sync" } else { "Sync" }, - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_onchain_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - Ok(()) - }, - Err(e) => Err(e), - }, - Err(e) => Err(e), - }; - - let cached_txs = onchain_wallet.get_cached_txs(); - - let res = if incremental_sync { - let incremental_sync_request = onchain_wallet.get_incremental_sync_request(); - let incremental_sync_fut = electrum_client - .get_incremental_sync_wallet_update(incremental_sync_request, cached_txs); - - let now = Instant::now(); - let update_res = incremental_sync_fut.await.map(|u| u.into()); - apply_wallet_update(update_res, now) - } else { - let full_scan_request = onchain_wallet.get_full_scan_request(); - let full_scan_fut = - electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs); - let now = Instant::now(); - let update_res = full_scan_fut.await.map(|u| u.into()); - apply_wallet_update(update_res, now) - }; - - onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - - res + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.sync_onchain_wallet().await }, - Self::Bitcoind { .. } => { + ChainSourceKind::Bitcoind { .. } => { // In BitcoindRpc mode we sync lightning and onchain wallet in one go via // `ChainPoller`. So nothing to do here. unreachable!("Onchain wallet will be synced via chain polling") @@ -901,163 +372,18 @@ impl ChainSource { &self, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - match self { - Self::Esplora { - tx_sync, - lightning_wallet_sync_status, - kv_store, - logger, - node_metrics, - .. - } => { - let sync_cman = Arc::clone(&channel_manager); - let sync_cmon = Arc::clone(&chain_monitor); - let sync_sweeper = Arc::clone(&output_sweeper); - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - &*sync_sweeper as &(dyn Confirm + Sync + Send), - ]; - - let receiver_res = { - let mut status_lock = lightning_wallet_sync_status.lock().unwrap(); - status_lock.register_or_subscribe_pending_sync() - }; - if let Some(mut sync_receiver) = receiver_res { - log_info!(logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); - log_error!(logger, "Failed to receive wallet sync result: {:?}", e); - Error::WalletOperationFailed - })?; - } - let res = { - let timeout_fut = tokio::time::timeout( - Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), - tx_sync.sync(confirmables), - ); - let now = Instant::now(); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_info!( - logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_lightning_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - - periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&kv_store), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )?; - Ok(()) - }, - Err(e) => { - log_error!(logger, "Sync of Lightning wallet failed: {}", e); - Err(e.into()) - }, - }, - Err(e) => { - log_error!(logger, "Lightning wallet sync timed out: {}", e); - Err(Error::TxSyncTimeout) - }, - } - }; - - lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - - res + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source + .sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper) + .await }, - Self::Electrum { - electrum_runtime_status, - lightning_wallet_sync_status, - kv_store, - logger, - node_metrics, - .. - } => { - let electrum_client: Arc = if let Some(client) = - electrum_runtime_status.read().unwrap().client().as_ref() - { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before syncing the lightning wallet" - ); - return Err(Error::TxSyncFailed); - }; - - let sync_cman = Arc::clone(&channel_manager); - let sync_cmon = Arc::clone(&chain_monitor); - let sync_sweeper = Arc::clone(&output_sweeper); - let confirmables = vec![ - sync_cman as Arc, - sync_cmon as Arc, - sync_sweeper as Arc, - ]; - - let receiver_res = { - let mut status_lock = lightning_wallet_sync_status.lock().unwrap(); - status_lock.register_or_subscribe_pending_sync() - }; - if let Some(mut sync_receiver) = receiver_res { - log_info!(logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); - log_error!(logger, "Failed to receive wallet sync result: {:?}", e); - Error::TxSyncFailed - })?; - } - - let res = electrum_client.sync_confirmables(confirmables).await; - - if let Ok(_) = res { - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_lightning_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - - periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&kv_store), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )?; - } - - lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - - res + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source + .sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper) + .await }, - Self::Bitcoind { .. } => { + ChainSourceKind::Bitcoind { .. } => { // In BitcoindRpc mode we sync lightning and onchain wallet in one go via // `ChainPoller`. So nothing to do here. unreachable!("Lightning wallet will be synced via chain polling") @@ -1069,570 +395,49 @@ impl ChainSource { &self, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - match self { - Self::Esplora { .. } => { + match &self.kind { + ChainSourceKind::Esplora { .. } => { // In Esplora mode we sync lightning and onchain wallets via // `sync_onchain_wallet` and `sync_lightning_wallet`. So nothing to do here. unreachable!("Listeners will be synced via transction-based syncing") }, - Self::Electrum { .. } => { + ChainSourceKind::Electrum { .. } => { // In Electrum mode we sync lightning and onchain wallets via // `sync_onchain_wallet` and `sync_lightning_wallet`. So nothing to do here. unreachable!("Listeners will be synced via transction-based syncing") }, - Self::Bitcoind { - api_client, - header_cache, - latest_chain_tip, - onchain_wallet, - wallet_polling_status, - kv_store, - config, - logger, - node_metrics, - .. - } => { - let receiver_res = { - let mut status_lock = wallet_polling_status.lock().unwrap(); - status_lock.register_or_subscribe_pending_sync() - }; - - if let Some(mut sync_receiver) = receiver_res { - log_info!(logger, "Sync in progress, skipping."); - return sync_receiver.recv().await.map_err(|e| { - debug_assert!(false, "Failed to receive wallet polling result: {:?}", e); - log_error!(logger, "Failed to receive wallet polling result: {:?}", e); - Error::WalletOperationFailed - })?; - } - - let latest_chain_tip_opt = latest_chain_tip.read().unwrap().clone(); - let chain_tip = if let Some(tip) = latest_chain_tip_opt { - tip - } else { - match validate_best_block_header(api_client.as_ref()).await { - Ok(tip) => { - *latest_chain_tip.write().unwrap() = Some(tip); - tip - }, - Err(e) => { - log_error!(logger, "Failed to poll for chain data: {:?}", e); - let res = Err(Error::TxSyncFailed); - wallet_polling_status - .lock() - .unwrap() - .propagate_result_to_subscribers(res); - return res; - }, - } - }; - - let mut locked_header_cache = header_cache.lock().await; - let chain_poller = ChainPoller::new(Arc::clone(&api_client), config.network); - let chain_listener = ChainListener { - onchain_wallet: Arc::clone(&onchain_wallet), - channel_manager: Arc::clone(&channel_manager), - chain_monitor, - output_sweeper, - }; - let mut spv_client = SpvClient::new( - chain_tip, - chain_poller, - &mut *locked_header_cache, - &chain_listener, - ); - - let now = SystemTime::now(); - match spv_client.poll_best_tip().await { - Ok((ChainTip::Better(tip), true)) => { - log_trace!( - logger, - "Finished polling best tip in {}ms", - now.elapsed().unwrap().as_millis() - ); - *latest_chain_tip.write().unwrap() = Some(tip); - }, - Ok(_) => {}, - Err(e) => { - log_error!(logger, "Failed to poll for chain data: {:?}", e); - let res = Err(Error::TxSyncFailed); - wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; - }, - } - - let cur_height = channel_manager.current_best_block().height; - - let now = SystemTime::now(); - let unconfirmed_txids = onchain_wallet.get_unconfirmed_txids(); - match api_client - .get_updated_mempool_transactions(cur_height, unconfirmed_txids) + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + bitcoind_chain_source + .poll_and_update_listeners(channel_manager, chain_monitor, output_sweeper) .await - { - Ok((unconfirmed_txs, evicted_txids)) => { - log_trace!( - logger, - "Finished polling mempool of size {} and {} evicted transactions in {}ms", - unconfirmed_txs.len(), - evicted_txids.len(), - now.elapsed().unwrap().as_millis() - ); - onchain_wallet - .apply_mempool_txs(unconfirmed_txs, evicted_txids) - .unwrap_or_else(|e| { - log_error!(logger, "Failed to apply mempool transactions: {:?}", e); - }); - }, - Err(e) => { - log_error!(logger, "Failed to poll for mempool transactions: {:?}", e); - let res = Err(Error::TxSyncFailed); - wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; - }, - } - - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; - locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - - let write_res = write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - ); - match write_res { - Ok(()) => (), - Err(e) => { - log_error!(logger, "Failed to persist node metrics: {}", e); - let res = Err(Error::PersistenceFailed); - wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; - }, - } - - let res = Ok(()); - wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - res }, } } pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { - match self { - Self::Esplora { - esplora_client, - fee_estimator, - config, - kv_store, - logger, - node_metrics, - .. - } => { - let now = Instant::now(); - let estimates = tokio::time::timeout( - Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), - esplora_client.get_fee_estimates(), - ) - .await - .map_err(|e| { - log_error!(logger, "Updating fee rate estimates timed out: {}", e); - Error::FeerateEstimationUpdateTimeout - })? - .map_err(|e| { - log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); - Error::FeerateEstimationUpdateFailed - })?; - - if estimates.is_empty() && config.network == Network::Bitcoin { - // Ensure we fail if we didn't receive any estimates. - log_error!( - logger, - "Failed to retrieve fee rate estimates: empty fee estimates are dissallowed on Mainnet.", - ); - return Err(Error::FeerateEstimationUpdateFailed); - } - - let confirmation_targets = get_all_conf_targets(); - - let mut new_fee_rate_cache = HashMap::with_capacity(10); - for target in confirmation_targets { - let num_blocks = get_num_block_defaults_for_target(target); - - // Convert the retrieved fee rate and fall back to 1 sat/vb if we fail or it - // yields less than that. This is mostly necessary to continue on - // `signet`/`regtest` where we might not get estimates (or bogus values). - let converted_estimate_sat_vb = - esplora_client::convert_fee_rate(num_blocks, estimates.clone()) - .map_or(1.0, |converted| converted.max(1.0)); - - let fee_rate = - FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); - - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); - - new_fee_rate_cache.insert(target, adjusted_fee_rate); - - log_trace!( - logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.to_sat_per_kwu(), - ); - } - - fee_estimator.set_fee_rate_cache(new_fee_rate_cache); - - log_info!( - logger, - "Fee rate cache update finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - - Ok(()) + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source.update_fee_rate_estimates().await }, - Self::Electrum { - electrum_runtime_status, - fee_estimator, - kv_store, - logger, - node_metrics, - .. - } => { - let electrum_client: Arc = if let Some(client) = - electrum_runtime_status.read().unwrap().client().as_ref() - { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before updating fees" - ); - return Err(Error::FeerateEstimationUpdateFailed); - }; - - let now = Instant::now(); - - let new_fee_rate_cache = electrum_client.get_fee_rate_cache_update().await?; - fee_estimator.set_fee_rate_cache(new_fee_rate_cache); - - log_info!( - logger, - "Fee rate cache update finished in {}ms.", - now.elapsed().as_millis() - ); - - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - - Ok(()) + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.update_fee_rate_estimates().await }, - Self::Bitcoind { - api_client, - fee_estimator, - config, - kv_store, - logger, - node_metrics, - .. - } => { - macro_rules! get_fee_rate_update { - ($estimation_fut: expr) => {{ - let update_res = tokio::time::timeout( - Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), - $estimation_fut, - ) - .await - .map_err(|e| { - log_error!(logger, "Updating fee rate estimates timed out: {}", e); - Error::FeerateEstimationUpdateTimeout - })?; - update_res - }}; - } - let confirmation_targets = get_all_conf_targets(); - - let mut new_fee_rate_cache = HashMap::with_capacity(10); - let now = Instant::now(); - for target in confirmation_targets { - let fee_rate_update_res = match target { - ConfirmationTarget::Lightning( - LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - ) => { - let estimation_fut = api_client.get_mempool_minimum_fee_rate(); - get_fee_rate_update!(estimation_fut) - }, - ConfirmationTarget::Lightning( - LdkConfirmationTarget::MaximumFeeEstimate, - ) => { - let num_blocks = get_num_block_defaults_for_target(target); - let estimation_mode = FeeRateEstimationMode::Conservative; - let estimation_fut = - api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); - get_fee_rate_update!(estimation_fut) - }, - ConfirmationTarget::Lightning( - LdkConfirmationTarget::UrgentOnChainSweep, - ) => { - let num_blocks = get_num_block_defaults_for_target(target); - let estimation_mode = FeeRateEstimationMode::Conservative; - let estimation_fut = - api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); - get_fee_rate_update!(estimation_fut) - }, - _ => { - // Otherwise, we default to economical block-target estimate. - let num_blocks = get_num_block_defaults_for_target(target); - let estimation_mode = FeeRateEstimationMode::Economical; - let estimation_fut = - api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); - get_fee_rate_update!(estimation_fut) - }, - }; - - let fee_rate = match (fee_rate_update_res, config.network) { - (Ok(rate), _) => rate, - (Err(e), Network::Bitcoin) => { - // Strictly fail on mainnet. - log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); - return Err(Error::FeerateEstimationUpdateFailed); - }, - (Err(e), n) if n == Network::Regtest || n == Network::Signet => { - // On regtest/signet we just fall back to the usual 1 sat/vb == 250 - // sat/kwu default. - log_error!( - logger, - "Failed to retrieve fee rate estimates: {}. Falling back to default of 1 sat/vb.", - e, - ); - FeeRate::from_sat_per_kwu(250) - }, - (Err(e), _) => { - // On testnet `estimatesmartfee` can be unreliable so we just skip in - // case of a failure, which will have us falling back to defaults. - log_error!( - logger, - "Failed to retrieve fee rate estimates: {}. Falling back to defaults.", - e, - ); - return Ok(()); - }, - }; - - // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that - // require some post-estimation adjustments to the fee rates, which we do here. - let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); - - new_fee_rate_cache.insert(target, adjusted_fee_rate); - - log_trace!( - logger, - "Fee rate estimation updated for {:?}: {} sats/kwu", - target, - adjusted_fee_rate.to_sat_per_kwu(), - ); - } - - if fee_estimator.set_fee_rate_cache(new_fee_rate_cache) { - // We only log if the values changed, as it might be very spammy otherwise. - log_info!( - logger, - "Fee rate cache update finished in {}ms.", - now.elapsed().as_millis() - ); - } - - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = node_metrics.write().unwrap(); - locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - )?; - } - - Ok(()) + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + bitcoind_chain_source.update_fee_rate_estimates().await }, } } pub(crate) async fn process_broadcast_queue(&self) { - match self { - Self::Esplora { esplora_client, tx_broadcaster, logger, .. } => { - let mut receiver = tx_broadcaster.get_broadcast_queue().await; - while let Some(next_package) = receiver.recv().await { - for tx in &next_package { - let txid = tx.compute_txid(); - let timeout_fut = tokio::time::timeout( - Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), - esplora_client.broadcast(tx), - ); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_trace!( - logger, - "Successfully broadcast transaction {}", - txid - ); - }, - Err(e) => match e { - esplora_client::Error::HttpResponse { status, message } => { - if status == 400 { - // Log 400 at lesser level, as this often just means bitcoind already knows the - // transaction. - // FIXME: We can further differentiate here based on the error - // message which will be available with rust-esplora-client 0.7 and - // later. - log_trace!( - logger, - "Failed to broadcast due to HTTP connection error: {}", - message - ); - } else { - log_error!( - logger, - "Failed to broadcast due to HTTP connection error: {} - {}", - status, message - ); - } - log_trace!( - logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - _ => { - log_error!( - logger, - "Failed to broadcast transaction {}: {}", - txid, - e - ); - log_trace!( - logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - }, - }, - Err(e) => { - log_error!( - logger, - "Failed to broadcast transaction due to timeout {}: {}", - txid, - e - ); - log_trace!( - logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - } - } - } + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source.process_broadcast_queue().await }, - Self::Electrum { electrum_runtime_status, tx_broadcaster, .. } => { - let electrum_client: Arc = if let Some(client) = - electrum_runtime_status.read().unwrap().client().as_ref() - { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before broadcasting" - ); - return; - }; - - let mut receiver = tx_broadcaster.get_broadcast_queue().await; - while let Some(next_package) = receiver.recv().await { - for tx in next_package { - electrum_client.broadcast(tx).await; - } - } + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.process_broadcast_queue().await }, - Self::Bitcoind { api_client, tx_broadcaster, logger, .. } => { - // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 - // features, we should eventually switch to use `submitpackage` via the - // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual - // transactions. - let mut receiver = tx_broadcaster.get_broadcast_queue().await; - while let Some(next_package) = receiver.recv().await { - for tx in &next_package { - let txid = tx.compute_txid(); - let timeout_fut = tokio::time::timeout( - Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), - api_client.broadcast_transaction(tx), - ); - match timeout_fut.await { - Ok(res) => match res { - Ok(id) => { - debug_assert_eq!(id, txid); - log_trace!( - logger, - "Successfully broadcast transaction {}", - txid - ); - }, - Err(e) => { - log_error!( - logger, - "Failed to broadcast transaction {}: {}", - txid, - e - ); - log_trace!( - logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - }, - Err(e) => { - log_error!( - logger, - "Failed to broadcast transaction due to timeout {}: {}", - txid, - e - ); - log_trace!( - logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, - } - } - } + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + bitcoind_chain_source.process_broadcast_queue().await }, } } @@ -1640,21 +445,25 @@ impl ChainSource { impl Filter for ChainSource { fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { - match self { - Self::Esplora { tx_sync, .. } => tx_sync.register_tx(txid, script_pubkey), - Self::Electrum { electrum_runtime_status, .. } => { - electrum_runtime_status.write().unwrap().register_tx(txid, script_pubkey) + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source.register_tx(txid, script_pubkey) + }, + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.register_tx(txid, script_pubkey) }, - Self::Bitcoind { .. } => (), + ChainSourceKind::Bitcoind { .. } => (), } } fn register_output(&self, output: lightning::chain::WatchedOutput) { - match self { - Self::Esplora { tx_sync, .. } => tx_sync.register_output(output), - Self::Electrum { electrum_runtime_status, .. } => { - electrum_runtime_status.write().unwrap().register_output(output) + match &self.kind { + ChainSourceKind::Esplora(esplora_chain_source) => { + esplora_chain_source.register_output(output) + }, + ChainSourceKind::Electrum(electrum_chain_source) => { + electrum_chain_source.register_output(output) }, - Self::Bitcoind { .. } => (), + ChainSourceKind::Bitcoind { .. } => (), } } } diff --git a/src/config.rs b/src/config.rs index 7b7ed8156..a5048e64f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -39,6 +39,12 @@ pub const DEFAULT_LOG_FILENAME: &'static str = "ldk_node.log"; /// The default storage directory. pub const DEFAULT_STORAGE_DIR_PATH: &str = "/tmp/ldk_node"; +// The default Esplora server we're using. +pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; + +// The default Esplora client timeout we're using. +pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; + // The 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold // number of derivation indexes after which BDK stops looking for new scripts belonging to the wallet. pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20; diff --git a/src/lib.rs b/src/lib.rs index 0a53fbbb3..89a17ab03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1250,27 +1250,17 @@ impl Node { tokio::task::block_in_place(move || { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { - match chain_source.as_ref() { - ChainSource::Esplora { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) - .await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::Electrum { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) - .await?; - chain_source.sync_onchain_wallet().await?; - }, - ChainSource::Bitcoind { .. } => { - chain_source.update_fee_rate_estimates().await?; - chain_source - .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) - .await?; - }, + if chain_source.is_transaction_based() { + chain_source.update_fee_rate_estimates().await?; + chain_source + .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) + .await?; + chain_source.sync_onchain_wallet().await?; + } else { + chain_source.update_fee_rate_estimates().await?; + chain_source + .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) + .await?; } Ok(()) },