Skip to content

Commit

Permalink
Switch to use NetAddress for peer addresses
Browse files Browse the repository at this point in the history
While we're still blocked on upstream changes, we now switch our peer info
to use a newtype around `NetAddress` so that we won't have to break
serialization compatibility when the upstream changes becom available
post-0.1.
  • Loading branch information
tnull committed May 9, 2023
1 parent 7c067a7 commit 38c5707
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 112 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ LDK Node is a non-custodial Lightning node in library form. Its central goal is
The primary abstraction of the library is the `Node`, which can be retrieved by setting up and configuring a `Builder` to your liking and calling `build()`. `Node` can then be controlled via commands such as `start`, `stop`, `connect_open_channel`, `send_payment`, etc.:

```rust
use ldk_node::Builder;
use ldk_node::{Builder, NetAddress};
use ldk_node::lightning_invoice::Invoice;
use ldk_node::bitcoin::secp256k1::PublicKey;
use std::str::FromStr;
Expand All @@ -28,7 +28,7 @@ fn main() {
node.sync_wallets().unwrap();

let node_id = PublicKey::from_str("NODE_ID").unwrap();
let node_addr = "IP_ADDR:PORT".parse().unwrap();
let node_addr = NetAddress::from_str("IP_ADDR:PORT").unwrap();
node.connect_open_channel(node_id, node_addr, 10000, None, false).unwrap();

let invoice = Invoice::from_str("INVOICE_STR").unwrap();
Expand Down
8 changes: 6 additions & 2 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ interface Node {
[Throws=NodeError]
u64 total_onchain_balance_sats();
[Throws=NodeError]
void connect(PublicKey node_id, SocketAddr address, boolean permanently);
void connect(PublicKey node_id, NetAddress address, boolean permanently);
[Throws=NodeError]
void disconnect(PublicKey node_id);
[Throws=NodeError]
void connect_open_channel(PublicKey node_id, SocketAddr address, u64 channel_amount_sats, u64? push_to_counterparty_msat, boolean announce_channel);
void connect_open_channel(PublicKey node_id, NetAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, boolean announce_channel);
[Throws=NodeError]
void close_channel([ByRef]ChannelId channel_id, PublicKey counterparty_node_id);
[Throws=NodeError]
Expand Down Expand Up @@ -69,6 +69,7 @@ enum NodeError {
"InvoiceCreationFailed",
"PaymentFailed",
"PeerInfoParseFailed",
"PeerInfoNotFound",
"ChannelCreationFailed",
"ChannelClosingFailed",
"PersistenceFailed",
Expand Down Expand Up @@ -129,6 +130,9 @@ typedef string Txid;
[Custom]
typedef string SocketAddr;

[Custom]
typedef string NetAddress;

[Custom]
typedef string PublicKey;

Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum Error {
PaymentFailed,
/// A given peer info could not be parsed.
PeerInfoParseFailed,
/// A given peer info could not be found.
PeerInfoNotFound,
/// A channel could not be opened.
ChannelCreationFailed,
/// A channel could not be closed.
Expand Down Expand Up @@ -65,6 +67,7 @@ impl fmt::Display for Error {
Self::InvoiceCreationFailed => write!(f, "Failed to create invoice."),
Self::PaymentFailed => write!(f, "Failed to send the given payment."),
Self::PeerInfoParseFailed => write!(f, "Failed to parse the given peer information."),
Self::PeerInfoNotFound => write!(f, "Failed to resolve the given peer information."),
Self::ChannelCreationFailed => write!(f, "Failed to create channel."),
Self::ChannelClosingFailed => write!(f, "Failed to close channel."),
Self::PersistenceFailed => write!(f, "Failed to persist data."),
Expand Down
66 changes: 37 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! [`send_payment`], etc.:
//!
//! ```no_run
//! use ldk_node::Builder;
//! use ldk_node::{Builder, NetAddress};
//! use ldk_node::lightning_invoice::Invoice;
//! use ldk_node::bitcoin::secp256k1::PublicKey;
//! use std::str::FromStr;
Expand All @@ -46,7 +46,7 @@
//! node.sync_wallets().unwrap();
//!
//! let node_id = PublicKey::from_str("NODE_ID").unwrap();
//! let node_addr = "IP_ADDR:PORT".parse().unwrap();
//! let node_addr = NetAddress::from_str("IP_ADDR:PORT").unwrap();
//! node.connect_open_channel(node_id, node_addr, 10000, None, false).unwrap();
//!
//! let invoice = Invoice::from_str("INVOICE_STR").unwrap();
Expand Down Expand Up @@ -95,6 +95,8 @@ pub use error::Error as NodeError;
use error::Error;

pub use event::Event;
pub use types::NetAddress;

use event::{EventHandler, EventQueue};
use io::fs_store::FilesystemStore;
use io::{KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE};
Expand Down Expand Up @@ -150,7 +152,7 @@ use rand::Rng;
use std::convert::TryInto;
use std::default::Default;
use std::fs;
use std::net::SocketAddr;
use std::net::{SocketAddr, ToSocketAddrs};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -758,7 +760,7 @@ impl Node {
{
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
let _ = do_connect_peer(
peer_info.pubkey,
peer_info.node_id,
peer_info.address,
Arc::clone(&connect_pm),
Arc::clone(&connect_logger),
Expand Down Expand Up @@ -910,18 +912,18 @@ impl Node {
///
/// If `permanently` is set to `true`, we'll remember the peer and reconnect to it on restart.
pub fn connect(
&self, node_id: PublicKey, address: SocketAddr, permanently: bool,
&self, node_id: PublicKey, address: NetAddress, permanently: bool,
) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
return Err(Error::NotRunning);
}
let runtime = rt_lock.as_ref().unwrap();

let peer_info = PeerInfo { pubkey: node_id, address };
let peer_info = PeerInfo { node_id, address };

let con_peer_pubkey = peer_info.pubkey;
let con_peer_addr = peer_info.address;
let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_success = Arc::new(AtomicBool::new(false));
let con_success_cloned = Arc::clone(&con_success);
let con_logger = Arc::clone(&self.logger);
Expand All @@ -930,8 +932,7 @@ impl Node {
tokio::task::block_in_place(move || {
runtime.block_on(async move {
let res =
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
.await;
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await;
con_success_cloned.store(res.is_ok(), Ordering::Release);
})
});
Expand All @@ -940,7 +941,7 @@ impl Node {
return Err(Error::ConnectionFailed);
}

log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.pubkey, peer_info.address,);
log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.node_id, peer_info.address);

if permanently {
self.peer_store.add_peer(peer_info)?;
Expand Down Expand Up @@ -982,7 +983,7 @@ impl Node {
///
/// Returns a temporary channel id.
pub fn connect_open_channel(
&self, node_id: PublicKey, address: SocketAddr, channel_amount_sats: u64,
&self, node_id: PublicKey, address: NetAddress, channel_amount_sats: u64,
push_to_counterparty_msat: Option<u64>, announce_channel: bool,
) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
Expand All @@ -997,10 +998,10 @@ impl Node {
return Err(Error::InsufficientFunds);
}

let peer_info = PeerInfo { pubkey: node_id, address };
let peer_info = PeerInfo { node_id, address };

let con_peer_pubkey = peer_info.pubkey;
let con_peer_addr = peer_info.address;
let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_success = Arc::new(AtomicBool::new(false));
let con_success_cloned = Arc::clone(&con_success);
let con_logger = Arc::clone(&self.logger);
Expand All @@ -1009,8 +1010,7 @@ impl Node {
tokio::task::block_in_place(move || {
runtime.block_on(async move {
let res =
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
.await;
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await;
con_success_cloned.store(res.is_ok(), Ordering::Release);
})
});
Expand All @@ -1036,7 +1036,7 @@ impl Node {
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();

match self.channel_manager.create_channel(
peer_info.pubkey,
peer_info.node_id,
channel_amount_sats,
push_msat,
user_channel_id,
Expand All @@ -1046,7 +1046,7 @@ impl Node {
log_info!(
self.logger,
"Initiated channel creation with peer {}. ",
peer_info.pubkey
peer_info.node_id
);
self.peer_store.add_peer(peer_info)?;
Ok(())
Expand Down Expand Up @@ -1447,44 +1447,52 @@ impl Drop for Node {
}

async fn connect_peer_if_necessary(
pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
node_id: PublicKey, addr: NetAddress, peer_manager: Arc<PeerManager>,
logger: Arc<FilesystemLogger>,
) -> Result<(), Error> {
for (node_pubkey, _addr) in peer_manager.get_peer_node_ids() {
if node_pubkey == pubkey {
for (pman_node_id, _pman_addr) in peer_manager.get_peer_node_ids() {
if node_id == pman_node_id {
return Ok(());
}
}

do_connect_peer(pubkey, peer_addr, peer_manager, logger).await
do_connect_peer(node_id, addr, peer_manager, logger).await
}

async fn do_connect_peer(
pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
node_id: PublicKey, addr: NetAddress, peer_manager: Arc<PeerManager>,
logger: Arc<FilesystemLogger>,
) -> Result<(), Error> {
log_info!(logger, "Connecting to peer: {}@{}", pubkey, peer_addr);
match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr).await
log_info!(logger, "Connecting to peer: {}@{}", node_id, addr);

let socket_addr = addr
.to_socket_addrs()
.map_err(|_| Error::PeerInfoNotFound)?
.next()
.ok_or(Error::ConnectionFailed)?;

match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), node_id, socket_addr)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
loop {
match futures::poll!(&mut connection_closed_future) {
std::task::Poll::Ready(_) => {
log_info!(logger, "Peer connection closed: {}@{}", pubkey, peer_addr);
log_info!(logger, "Peer connection closed: {}@{}", node_id, addr);
return Err(Error::ConnectionFailed);
}
std::task::Poll::Pending => {}
}
// Avoid blocking the tokio context by sleeping a bit
match peer_manager.get_peer_node_ids().iter().find(|(id, _addr)| *id == pubkey) {
match peer_manager.get_peer_node_ids().iter().find(|(id, _addr)| *id == node_id) {
Some(_) => return Ok(()),
None => tokio::time::sleep(Duration::from_millis(10)).await,
}
}
}
None => {
log_error!(logger, "Failed to connect to peer: {}@{}", pubkey, peer_addr);
log_error!(logger, "Failed to connect to peer: {}@{}", node_id, addr);
Err(Error::ConnectionFailed)
}
}
Expand Down
Loading

0 comments on commit 38c5707

Please sign in to comment.