diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index a6d867e5a..bd1e4fc43 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -325,7 +325,6 @@ enum NodeError { dictionary NodeStatus { boolean is_running; - boolean is_listening; BestBlock current_best_block; u64? latest_lightning_wallet_sync_timestamp; u64? latest_onchain_wallet_sync_timestamp; diff --git a/src/builder.rs b/src/builder.rs index cf414ec57..0b3ea3101 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -9,7 +9,6 @@ use std::collections::HashMap; use std::convert::TryInto; use std::default::Default; use std::path::PathBuf; -use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex, Once, RwLock}; use std::time::SystemTime; use std::{fmt, fs}; @@ -1133,7 +1132,6 @@ fn build_with_store_internal( } // Initialize the status fields. - let is_listening = Arc::new(AtomicBool::new(false)); let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { @@ -1734,7 +1732,6 @@ fn build_with_store_internal( peer_store, payment_store, is_running, - is_listening, node_metrics, om_mailbox, async_payments_role, diff --git a/src/lib.rs b/src/lib.rs index 0f547ce1d..a075cfac5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,7 +101,6 @@ mod wallet; use std::default::Default; use std::net::ToSocketAddrs; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -189,7 +188,6 @@ pub struct Node { peer_store: Arc>>, payment_store: Arc, is_running: Arc>, - is_listening: Arc, node_metrics: Arc>, om_mailbox: Option>, async_payments_role: Option, @@ -293,9 +291,7 @@ impl Node { if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); - let mut stop_listen = self.stop_sender.subscribe(); let listening_logger = Arc::clone(&self.logger); - let listening_indicator = Arc::clone(&self.is_listening); let mut bind_addrs = Vec::with_capacity(listening_addresses.len()); @@ -313,45 +309,62 @@ impl Node { bind_addrs.extend(resolved_address); } - self.runtime.spawn_cancellable_background_task(async move { - { - let listener = - tokio::net::TcpListener::bind(&*bind_addrs).await - .unwrap_or_else(|e| { - log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e); - panic!( - "Failed to bind to listen address/port - is something else already listening on it?", - ); - }); - - listening_indicator.store(true, Ordering::Release); - - loop { - let peer_mgr = Arc::clone(&peer_manager_connection_handler); - tokio::select! { - _ = stop_listen.changed() => { - log_debug!( - listening_logger, - "Stopping listening to inbound connections." + let logger = Arc::clone(&listening_logger); + let listeners = self.runtime.block_on(async move { + let mut listeners = Vec::new(); + + // Try to bind to all addresses + for addr in &*bind_addrs { + match tokio::net::TcpListener::bind(addr).await { + Ok(listener) => { + log_trace!(logger, "Listener bound to {}", addr); + listeners.push(listener); + }, + Err(e) => { + log_error!( + logger, + "Failed to bind to {}: {} - is something else already listening?", + addr, + e ); - break; - } - res = listener.accept() => { - let tcp_stream = res.unwrap().0; - tokio::spawn(async move { - lightning_net_tokio::setup_inbound( - Arc::clone(&peer_mgr), - tcp_stream.into_std().unwrap(), - ) - .await; - }); - } + return Err(Error::InvalidSocketAddress); + }, } } - } - listening_indicator.store(false, Ordering::Release); - }); + Ok(listeners) + })?; + + for listener in listeners { + let logger = Arc::clone(&listening_logger); + let peer_mgr = Arc::clone(&peer_manager_connection_handler); + let mut stop_listen = self.stop_sender.subscribe(); + let runtime = Arc::clone(&self.runtime); + self.runtime.spawn_cancellable_background_task(async move { + loop { + tokio::select! { + _ = stop_listen.changed() => { + log_debug!( + logger, + "Stopping listening to inbound connections." + ); + break; + } + res = listener.accept() => { + let tcp_stream = res.unwrap().0; + let peer_mgr = Arc::clone(&peer_mgr); + runtime.spawn_cancellable_background_task(async move { + lightning_net_tokio::setup_inbound( + Arc::clone(&peer_mgr), + tcp_stream.into_std().unwrap(), + ) + .await; + }); + } + } + } + }); + } } // Regularly reconnect to persisted peers. @@ -666,7 +679,6 @@ impl Node { /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { let is_running = *self.is_running.read().unwrap(); - let is_listening = self.is_listening.load(Ordering::Acquire); let current_best_block = self.channel_manager.current_best_block().into(); let locked_node_metrics = self.node_metrics.read().unwrap(); let latest_lightning_wallet_sync_timestamp = @@ -684,7 +696,6 @@ impl Node { NodeStatus { is_running, - is_listening, current_best_block, latest_lightning_wallet_sync_timestamp, latest_onchain_wallet_sync_timestamp, @@ -1495,9 +1506,6 @@ impl Drop for Node { pub struct NodeStatus { /// Indicates whether the [`Node`] is running. pub is_running: bool, - /// Indicates whether the [`Node`] is listening for incoming connections on the addresses - /// configured via [`Config::listening_addresses`]. - pub is_listening: bool, /// The best block to which our Lightning wallet is currently synced. pub current_best_block: BestBlock, /// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 98c96e307..1331fc047 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -195,7 +195,7 @@ pub(crate) fn random_storage_path() -> PathBuf { pub(crate) fn random_port() -> u16 { let mut rng = thread_rng(); - rng.gen_range(5000..65535) + rng.gen_range(5000..32768) } pub(crate) fn random_listening_addresses() -> Vec { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 0db30ea1c..cca52ae2d 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -817,6 +817,21 @@ fn sign_verify_msg() { assert!(node.verify_signature(msg, sig.as_str(), &pkey)); } +#[test] +fn connection_multi_listen() { + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); + + let node_id_b = node_b.node_id(); + + let node_addrs_b = node_b.listening_addresses().unwrap(); + for node_addr_b in &node_addrs_b { + node_a.connect(node_id_b, node_addr_b.clone(), false).unwrap(); + node_a.disconnect(node_id_b).unwrap(); + } +} + #[test] fn connection_restart_behavior() { do_connection_restart_behavior(true); @@ -832,11 +847,6 @@ fn do_connection_restart_behavior(persist: bool) { let node_id_b = node_b.node_id(); let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone(); - - while !node_b.status().is_listening { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - node_a.connect(node_id_b, node_addr_b, persist).unwrap(); let peer_details_a = node_a.list_peers().first().unwrap().clone(); @@ -886,10 +896,6 @@ fn concurrent_connections_succeed() { let node_id_b = node_b.node_id(); let node_addr_b = node_b.listening_addresses().unwrap().first().unwrap().clone(); - while !node_b.status().is_listening { - std::thread::sleep(std::time::Duration::from_millis(10)); - } - let mut handles = Vec::new(); for _ in 0..10 { let thread_node = Arc::clone(&node_a);