diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index b87ee13ed..fc5f7048f 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use super::WalletSyncStatus; +use super::{periodically_archive_fully_resolved_monitors, WalletSyncStatus}; use crate::config::{ BitcoindRestClientConfig, Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS, @@ -306,6 +306,19 @@ impl BitcoindChainSource { })?; } + let res = self + .poll_and_update_listeners_inner(channel_manager, chain_monitor, output_sweeper) + .await; + + self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn poll_and_update_listeners_inner( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); let chain_tip = if let Some(tip) = latest_chain_tip_opt { tip @@ -317,9 +330,7 @@ impl BitcoindChainSource { }, Err(e) => { log_error!(self.logger, "Failed to poll for chain data: {:?}", e); - let res = Err(Error::TxSyncFailed); - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; + return Err(Error::TxSyncFailed); }, } }; @@ -329,7 +340,7 @@ impl BitcoindChainSource { let chain_listener = ChainListener { onchain_wallet: Arc::clone(&self.onchain_wallet), channel_manager: Arc::clone(&channel_manager), - chain_monitor, + chain_monitor: Arc::clone(&chain_monitor), output_sweeper, }; let mut spv_client = @@ -344,13 +355,19 @@ impl BitcoindChainSource { now.elapsed().unwrap().as_millis() ); *self.latest_chain_tip.write().unwrap() = Some(tip); + + periodically_archive_fully_resolved_monitors( + Arc::clone(&channel_manager), + chain_monitor, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + Arc::clone(&self.node_metrics), + )?; }, Ok(_) => {}, Err(e) => { log_error!(self.logger, "Failed to poll for chain data: {:?}", e); - let res = Err(Error::TxSyncFailed); - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; + return Err(Error::TxSyncFailed); }, } @@ -376,9 +393,7 @@ impl BitcoindChainSource { }, Err(e) => { log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e); - let res = Err(Error::TxSyncFailed); - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; + return Err(Error::TxSyncFailed); }, } @@ -388,24 +403,13 @@ impl BitcoindChainSource { locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - let write_res = write_node_metrics( + write_node_metrics( &*locked_node_metrics, Arc::clone(&self.kv_store), Arc::clone(&self.logger), - ); - match write_res { - Ok(()) => (), - Err(e) => { - log_error!(self.logger, "Failed to persist node metrics: {}", e); - let res = Err(Error::PersistenceFailed); - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - return res; - }, - } + )?; - let res = Ok(()); - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); - res + Ok(()) } pub(super) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 44a637cc3..6193c67b3 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -103,16 +103,6 @@ impl ElectrumChainSource { } pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { - let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before syncing the onchain wallet" - ); - return Err(Error::FeerateEstimationUpdateFailed); - }; let receiver_res = { let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -126,6 +116,24 @@ impl ElectrumChainSource { })?; } + let res = self.sync_onchain_wallet_inner().await; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!( + false, + "We should have started the chain source before syncing the onchain wallet" + ); + return Err(Error::FeerateEstimationUpdateFailed); + }; // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = @@ -179,8 +187,6 @@ impl ElectrumChainSource { apply_wallet_update(update_res, now) }; - self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - res } @@ -188,26 +194,6 @@ impl ElectrumChainSource { &self, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { - Arc::clone(client) - } else { - debug_assert!( - false, - "We should have started the chain source before syncing the lightning wallet" - ); - return Err(Error::TxSyncFailed); - }; - - let sync_cman = Arc::clone(&channel_manager); - let sync_cmon = Arc::clone(&chain_monitor); - let sync_sweeper = Arc::clone(&output_sweeper); - let confirmables = vec![ - sync_cman as Arc, - sync_cmon as Arc, - sync_sweeper as Arc, - ]; - let receiver_res = { let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -221,6 +207,38 @@ impl ElectrumChainSource { })?; } + let res = + self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_lightning_wallet_inner( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let sync_cman = Arc::clone(&channel_manager); + let sync_cmon = Arc::clone(&chain_monitor); + let sync_sweeper = Arc::clone(&output_sweeper); + let confirmables = vec![ + sync_cman as Arc, + sync_cmon as Arc, + sync_sweeper as Arc, + ]; + + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!( + false, + "We should have started the chain source before syncing the lightning wallet" + ); + return Err(Error::TxSyncFailed); + }; + let res = electrum_client.sync_confirmables(confirmables).await; if let Ok(_) = res { @@ -245,8 +263,6 @@ impl ElectrumChainSource { )?; } - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - res } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 3a911394c..5932426b7 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -112,115 +112,108 @@ impl EsploraChainSource { })?; } - let res = { - // If this is our first sync, do a full scan with the configured gap limit. - // Otherwise just do an incremental sync. - let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); - - macro_rules! get_and_apply_wallet_update { - ($sync_future: expr) => {{ - let now = Instant::now(); - match $sync_future.await { - Ok(res) => match res { - Ok(update) => match self.onchain_wallet.apply_update(update) { - Ok(()) => { - log_info!( - self.logger, - "{} of on-chain wallet finished in {}ms.", - if incremental_sync { "Incremental sync" } else { "Sync" }, - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); - locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger) - )?; - } - Ok(()) - }, - Err(e) => Err(e), - }, - Err(e) => match *e { - esplora_client::Error::Reqwest(he) => { - log_error!( - self.logger, - "{} of on-chain wallet failed due to HTTP connection error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - he - ); - Err(Error::WalletOperationFailed) - }, - _ => { - log_error!( - self.logger, - "{} of on-chain wallet failed due to Esplora error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationFailed) - }, - }, - }, - Err(e) => { - log_error!( - self.logger, - "{} of on-chain wallet timed out: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationTimeout) - }, - } - }} - } - - if incremental_sync { - let sync_request = self.onchain_wallet.get_incremental_sync_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - } else { - let full_scan_request = self.onchain_wallet.get_full_scan_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - self.esplora_client.full_scan( - full_scan_request, - BDK_CLIENT_STOP_GAP, - BDK_CLIENT_CONCURRENCY, - ), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - } - }; + let res = self.sync_onchain_wallet_inner().await; self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); res } + async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + // If this is our first sync, do a full scan with the configured gap limit. + // Otherwise just do an incremental sync. + let incremental_sync = + self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + + macro_rules! get_and_apply_wallet_update { + ($sync_future: expr) => {{ + let now = Instant::now(); + match $sync_future.await { + Ok(res) => match res { + Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(()) => { + log_info!( + self.logger, + "{} of on-chain wallet finished in {}ms.", + if incremental_sync { "Incremental sync" } else { "Sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger) + )?; + } + Ok(()) + }, + Err(e) => Err(e), + }, + Err(e) => match *e { + esplora_client::Error::Reqwest(he) => { + log_error!( + self.logger, + "{} of on-chain wallet failed due to HTTP connection error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + he + ); + Err(Error::WalletOperationFailed) + }, + _ => { + log_error!( + self.logger, + "{} of on-chain wallet failed due to Esplora error: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationFailed) + }, + }, + }, + Err(e) => { + log_error!( + self.logger, + "{} of on-chain wallet timed out: {}", + if incremental_sync { "Incremental sync" } else { "Sync" }, + e + ); + Err(Error::WalletOperationTimeout) + }, + } + }} + } + + if incremental_sync { + let sync_request = self.onchain_wallet.get_incremental_sync_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } else { + let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + self.esplora_client.full_scan( + full_scan_request, + BDK_CLIENT_STOP_GAP, + BDK_CLIENT_CONCURRENCY, + ), + ); + get_and_apply_wallet_update!(wallet_sync_timeout_fut) + } + } + pub(super) async fn sync_lightning_wallet( &self, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - let sync_cman = Arc::clone(&channel_manager); - let sync_cmon = Arc::clone(&chain_monitor); - let sync_sweeper = Arc::clone(&output_sweeper); - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - &*sync_sweeper as &(dyn Confirm + Sync + Send), - ]; - let receiver_res = { let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -233,58 +226,74 @@ impl EsploraChainSource { Error::WalletOperationFailed })?; } - let res = { - let timeout_fut = tokio::time::timeout( - Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), - self.tx_sync.sync(confirmables), - ); - let now = Instant::now(); - match timeout_fut.await { - Ok(res) => match res { - Ok(()) => { - log_info!( - self.logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); - locked_node_metrics.latest_lightning_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; - } - - periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), + let res = + self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_lightning_wallet_inner( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let sync_cman = Arc::clone(&channel_manager); + let sync_cmon = Arc::clone(&chain_monitor); + let sync_sweeper = Arc::clone(&output_sweeper); + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), + ]; + + let timeout_fut = tokio::time::timeout( + Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), + self.tx_sync.sync(confirmables), + ); + let now = Instant::now(); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_info!( + self.logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = self.node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, Arc::clone(&self.kv_store), Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), )?; - Ok(()) - }, - Err(e) => { - log_error!(self.logger, "Sync of Lightning wallet failed: {}", e); - Err(e.into()) - }, + } + + periodically_archive_fully_resolved_monitors( + Arc::clone(&channel_manager), + Arc::clone(&chain_monitor), + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + Arc::clone(&self.node_metrics), + )?; + Ok(()) }, Err(e) => { - log_error!(self.logger, "Lightning wallet sync timed out: {}", e); - Err(Error::TxSyncTimeout) + log_error!(self.logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) }, - } - }; - - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); - - res + }, + Err(e) => { + log_error!(self.logger, "Lightning wallet sync timed out: {}", e); + Err(Error::TxSyncTimeout) + }, + } } pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> {