diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 4f09b7ced..992899000 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -42,6 +42,8 @@ interface LDKNode { void start(); [Throws=NodeError] void stop(); + NodeStatus status(); + Config config(); Event? next_event(); Event wait_next_event(); [Async] @@ -97,7 +99,6 @@ interface LDKNode { [Throws=NodeError] string sign_message([ByRef]sequence msg); boolean verify_signature([ByRef]sequence msg, [ByRef]string sig, [ByRef]PublicKey pkey); - boolean is_running(); }; [Error] @@ -137,6 +138,22 @@ enum NodeError { "LiquidityFeeTooHigh", }; +dictionary NodeStatus { + boolean is_running; + boolean is_listening; + BestBlock current_best_block; + u64? latest_wallet_sync_timestamp; + u64? latest_onchain_wallet_sync_timestamp; + u64? latest_fee_rate_cache_update_timestamp; + u64? latest_rgs_snapshot_timestamp; + u64? latest_node_announcement_broadcast_timestamp; +}; + +dictionary BestBlock { + BlockHash block_hash; + u32 height; +}; + [Error] enum BuildError { "InvalidSeedBytes", diff --git a/src/builder.rs b/src/builder.rs index a09b2563f..161c3bbbb 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -65,6 +65,7 @@ use std::fmt; use std::fs; use std::io::Cursor; use std::path::PathBuf; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, RwLock}; use std::time::SystemTime; @@ -945,6 +946,13 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); + let is_listening = Arc::new(AtomicBool::new(false)); + let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); + let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None)); + let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None)); + let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None)); + let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None)); + Ok(Node { runtime, stop_sender, @@ -968,6 +976,12 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + is_listening, + latest_wallet_sync_timestamp, + latest_onchain_wallet_sync_timestamp, + latest_fee_rate_cache_update_timestamp, + latest_rgs_snapshot_timestamp, + latest_node_announcement_broadcast_timestamp, }) } diff --git a/src/lib.rs b/src/lib.rs index 24b2123f5..b9508ad60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,7 +107,7 @@ pub use error::Error as NodeError; use error::Error; pub use event::Event; -pub use types::ChannelConfig; +pub use types::{BestBlock, ChannelConfig}; pub use io::utils::generate_entropy_mnemonic; @@ -167,8 +167,9 @@ use rand::Rng; use std::default::Default; use std::net::ToSocketAddrs; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(feature = "uniffi")] uniffi::include_scaffolding!("ldk_node"); @@ -199,6 +200,12 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc>>, + is_listening: Arc, + latest_wallet_sync_timestamp: Arc>>, + latest_onchain_wallet_sync_timestamp: Arc>>, + latest_fee_rate_cache_update_timestamp: Arc>>, + latest_rgs_snapshot_timestamp: Arc>>, + latest_node_announcement_broadcast_timestamp: Arc>>, } impl Node { @@ -222,6 +229,8 @@ impl Node { // Block to ensure we update our fee rate cache once on startup let fee_estimator = Arc::clone(&self.fee_estimator); let sync_logger = Arc::clone(&self.logger); + let sync_fee_rate_update_timestamp = + Arc::clone(&self.latest_fee_rate_cache_update_timestamp); let runtime_ref = &runtime; tokio::task::block_in_place(move || { runtime_ref.block_on(async move { @@ -233,6 +242,9 @@ impl Node { "Initial fee rate cache update finished in {}ms.", now.elapsed().as_millis() ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt; Ok(()) }, Err(e) => { @@ -246,6 +258,7 @@ impl Node { // Setup wallet sync let wallet = Arc::clone(&self.wallet); let sync_logger = Arc::clone(&self.logger); + let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp); let mut stop_sync = self.stop_sender.subscribe(); let onchain_wallet_sync_interval_secs = self .config @@ -267,11 +280,16 @@ impl Node { _ = onchain_wallet_sync_interval.tick() => { let now = Instant::now(); match wallet.sync().await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( sync_logger, "Background sync of on-chain wallet finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(err) => { log_error!( sync_logger, @@ -289,6 +307,7 @@ impl Node { let mut stop_fee_updates = self.stop_sender.subscribe(); let fee_update_logger = Arc::clone(&self.logger); + let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp); let fee_estimator = Arc::clone(&self.fee_estimator); let fee_rate_cache_update_interval_secs = self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); @@ -307,11 +326,16 @@ impl Node { _ = fee_rate_update_interval.tick() => { let now = Instant::now(); match fee_estimator.update_fee_estimates().await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( fee_update_logger, "Background update of fee rate cache finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *fee_update_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(err) => { log_error!( fee_update_logger, @@ -330,6 +354,7 @@ impl Node { let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); + let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp); let mut stop_sync = self.stop_sender.subscribe(); let wallet_sync_interval_secs = self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); @@ -350,11 +375,16 @@ impl Node { ]; let now = Instant::now(); match tx_sync.sync(confirmables).await { - Ok(()) => log_trace!( + Ok(()) => { + log_trace!( sync_logger, "Background sync of Lightning wallet finished in {}ms.", now.elapsed().as_millis() - ), + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + } Err(e) => { log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) } @@ -368,6 +398,7 @@ impl Node { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); + let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp); let mut stop_gossip_sync = self.stop_sender.subscribe(); runtime.spawn(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); @@ -395,6 +426,7 @@ impl Node { log_error!(gossip_sync_logger, "Persistence failed: {}", e); panic!("Persistence failed"); }); + *gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64); } Err(e) => log_error!( gossip_sync_logger, @@ -413,6 +445,7 @@ impl Node { let peer_manager_connection_handler = Arc::clone(&self.peer_manager); let mut stop_listen = self.stop_sender.subscribe(); let listening_logger = Arc::clone(&self.logger); + let listening_indicator = Arc::clone(&self.is_listening); let mut bind_addrs = Vec::with_capacity(listening_addresses.len()); @@ -431,6 +464,7 @@ impl Node { } runtime.spawn(async move { + { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await .unwrap_or_else(|e| { @@ -440,11 +474,13 @@ impl Node { ); }); + listening_indicator.store(true, Ordering::Release); + loop { let peer_mgr = Arc::clone(&peer_manager_connection_handler); tokio::select! { _ = stop_listen.changed() => { - return; + break; } res = listener.accept() => { let tcp_stream = res.unwrap().0; @@ -458,6 +494,9 @@ impl Node { } } } + } + + listening_indicator.store(false, Ordering::Release); }); } @@ -508,6 +547,7 @@ impl Node { let bcast_config = Arc::clone(&self.config); let bcast_store = Arc::clone(&self.kv_store); let bcast_logger = Arc::clone(&self.logger); + let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp); let mut stop_bcast = self.stop_sender.subscribe(); runtime.spawn(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. @@ -553,12 +593,17 @@ impl Node { bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses); - let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) - .unwrap_or_else(|e| { - log_error!(bcast_logger, "Persistence failed: {}", e); - panic!("Persistence failed"); - }); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt; + + if let Some(unix_time_secs) = unix_time_secs_opt { + io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + .unwrap_or_else(|e| { + log_error!(bcast_logger, "Persistence failed: {}", e); + panic!("Persistence failed"); + }); + } } } } @@ -662,11 +707,6 @@ impl Node { Ok(()) } - /// Returns whether the [`Node`] is running. - pub fn is_running(&self) -> bool { - self.runtime.read().unwrap().is_some() - } - /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. /// /// After this returns most API methods will return [`Error::NotRunning`]. @@ -697,6 +737,37 @@ impl Node { Ok(()) } + /// Returns the status of the [`Node`]. + pub fn status(&self) -> NodeStatus { + let is_running = self.runtime.read().unwrap().is_some(); + let is_listening = self.is_listening.load(Ordering::Acquire); + let current_best_block = self.channel_manager.current_best_block().into(); + let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap(); + let latest_onchain_wallet_sync_timestamp = + *self.latest_onchain_wallet_sync_timestamp.read().unwrap(); + let latest_fee_rate_cache_update_timestamp = + *self.latest_fee_rate_cache_update_timestamp.read().unwrap(); + let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap(); + let latest_node_announcement_broadcast_timestamp = + *self.latest_node_announcement_broadcast_timestamp.read().unwrap(); + + NodeStatus { + is_running, + is_listening, + current_best_block, + latest_wallet_sync_timestamp, + latest_onchain_wallet_sync_timestamp, + latest_fee_rate_cache_update_timestamp, + latest_rgs_snapshot_timestamp, + latest_node_announcement_broadcast_timestamp, + } + } + + /// Returns the config with which the [`Node`] was initialized. + pub fn config(&self) -> Config { + self.config.as_ref().clone() + } + /// Returns the next event in the event queue, if currently available. /// /// Will return `Some(..)` if an event is available and `None` otherwise. @@ -1746,6 +1817,43 @@ impl Drop for Node { } } +/// Represents the status of the [`Node`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct NodeStatus { + /// Indicates whether the [`Node`] is running. + pub is_running: bool, + /// Indicates whether the [`Node`] is listening for incoming connections on the addresses + /// configured via [`Config::listening_addresses`]. + pub is_listening: bool, + /// The best block to which our Lightning wallet is currently synced. + pub current_best_block: BestBlock, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced + /// our Lightning wallet to the chain tip. + /// + /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. + pub latest_wallet_sync_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced + /// our on-chain wallet to the chain tip. + /// + /// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized. + pub latest_onchain_wallet_sync_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update + /// our fee rate cache. + /// + /// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized. + pub latest_fee_rate_cache_update_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync + /// (RGS) snapshot we successfully applied was generated. + /// + /// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized. + pub latest_rgs_snapshot_timestamp: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node + /// announcement. + /// + /// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized. + pub latest_node_announcement_broadcast_timestamp: Option, +} + async fn connect_peer_if_necessary( node_id: PublicKey, addr: SocketAddress, peer_manager: Arc>, logger: Arc, diff --git a/src/types.rs b/src/types.rs index 6269b3ddf..4e082498e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -4,6 +4,7 @@ use crate::sweep::OutputSweeper; use lightning::blinded_path::BlindedPath; use lightning::chain::chainmonitor; +use lightning::chain::BestBlock as LdkBestBlock; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; use lightning::ln::msgs::RoutingMessageHandler; use lightning::ln::msgs::SocketAddress; @@ -20,7 +21,7 @@ use lightning_net_tokio::SocketDescriptor; use lightning_transaction_sync::EsploraSyncClient; use bitcoin::secp256k1::{self, PublicKey, Secp256k1}; -use bitcoin::OutPoint; +use bitcoin::{BlockHash, OutPoint}; use std::sync::{Arc, Mutex, RwLock}; @@ -456,3 +457,20 @@ impl Default for ChannelConfig { LdkChannelConfig::default().into() } } + +/// The best known block as identified by its hash and height. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BestBlock { + /// The block's hash + pub block_hash: BlockHash, + /// The height at which the block was confirmed. + pub height: u32, +} + +impl From for BestBlock { + fn from(value: LdkBestBlock) -> Self { + let block_hash = value.block_hash(); + let height = value.height(); + Self { block_hash, height } + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 474ac14a8..3be36869d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -220,6 +220,8 @@ pub(crate) fn setup_node(electrsd: &ElectrsD, config: Config) -> TestNode