From a4ad1ccb22357fa2b064bef7c9463b7891585c66 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 20 Dec 2023 09:58:37 -0500 Subject: [PATCH 01/24] multi: propagate shutdown signal from caller to lndk To allow external callers to shutdown lndk, we provide a shutdown trigger and listener to the onion messenger, replacing the single use channels that we previously used. This is useful for itests, so that we can cleanly shut down and for callers of the library to manage execution. --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/lib.rs | 10 ++++ src/main.rs | 5 +- src/onion_messenger.rs | 95 +++++++++++++++++++------------------- tests/integration_tests.rs | 8 ++++ 6 files changed, 78 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1fc2c90..f6b37e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1108,6 +1108,7 @@ dependencies = [ "tokio", "tonic 0.8.3", "tonic_lnd", + "triggered", ] [[package]] @@ -2303,6 +2304,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "triggered" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce148eae0d1a376c1b94ae651fc3261d9cb8294788b962b7382066376503a2d1" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index a584456f..e3a0946a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tonic_lnd = { git = "https://github.com/orbitalturtle/tonic_lnd", branch = "upda hex = "0.4.3" configure_me = "0.4.0" bytes = "1.4.0" +triggered = "0.1.2" [dev-dependencies] bitcoincore-rpc = { package="core-rpc", version = "0.17.0" } diff --git a/src/lib.rs b/src/lib.rs index 7e3e5734..2831efa1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,12 +23,21 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Once; use tonic_lnd::lnrpc::GetInfoRequest; +use triggered::{Listener, Trigger}; static INIT: Once = Once::new(); pub struct Cfg { pub lnd: LndCfg, pub log_dir: Option, + pub signals: LifecycleSignals, +} + +pub struct LifecycleSignals { + // Use to externally trigger shutdown. + pub shutdown: Trigger, + // Used to listen for the signal to shutdown. + pub listener: Listener, } pub fn init_logger(config: LogConfig) { @@ -136,6 +145,7 @@ pub async fn run(args: Cfg) -> Result<(), ()> { &mut peers_client, onion_messenger, network.unwrap(), + args.signals, ) .await } diff --git a/src/main.rs b/src/main.rs index 746308c8..700d74ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ mod internal { use internal::*; use lndk::lnd::LndCfg; -use lndk::Cfg; +use lndk::{Cfg, LifecycleSignals}; #[macro_use] extern crate configure_me; @@ -22,9 +22,12 @@ async fn main() -> Result<(), ()> { .0; let lnd_args = LndCfg::new(config.address, config.cert, config.macaroon); + let (shutdown, listener) = triggered::trigger(); + let signals = LifecycleSignals { shutdown, listener }; let args = Cfg { lnd: lnd_args, log_dir: config.log_dir, + signals, }; lndk::run(args).await diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index 6a8b9df0..e3b6712b 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -1,6 +1,7 @@ use crate::clock::TokioClock; use crate::lnd::{features_support_onion_messages, ONION_MESSAGES_OPTIONAL}; use crate::rate_limit::{RateLimiter, TokenLimiter}; +use crate::LifecycleSignals; use async_trait::async_trait; use bitcoin::blockdata::constants::ChainHash; use bitcoin::network::constants::Network; @@ -32,6 +33,7 @@ use tonic_lnd::{ lnrpc::CustomMessage, lnrpc::PeerEvent, lnrpc::SendCustomMessageRequest, lnrpc::SendCustomMessageResponse, tonic::Status, LightningClient, }; +use triggered::Listener; /// ONION_MESSAGE_TYPE is the message type number used in BOLT1 message types for onion messages. const ONION_MESSAGE_TYPE: u32 = 513; @@ -111,6 +113,7 @@ pub(crate) async fn run_onion_messenger< ln_client: &mut tonic_lnd::LightningClient, onion_messenger: OnionMessenger, network: Network, + signals: LifecycleSignals, ) -> Result<(), ()> where ES::Target: EntropySource, @@ -134,12 +137,6 @@ where })? } - // Setup channels that we'll use to signal to spawned producers that an exit has occurred elsewhere so they should - // exit, and a tokio task set to track all our spawned tasks. - // TODO: Combine these channels into a single channel. - let (peers_exit_sender, peers_exit_receiver) = channel(1); - let (in_messages_exit_sender, in_messages_exit_receiver) = channel(1); - let (out_messages_exit_sender, out_messages_exit_receiver) = channel(1); let mut set = tokio::task::JoinSet::new(); // Subscribe to peer events from LND first thing so that we don't miss any online/offline events while we are @@ -148,6 +145,7 @@ where // could take very long), so we get the subscription itself inside of our producer thread. let mut peers_client = ln_client.clone(); let peers_sender = sender.clone(); + let (peers_shutdown, peers_listener) = (signals.shutdown.clone(), signals.listener.clone()); set.spawn(async move { let peer_subscription = peers_client .subscribe_peer_events(tonic_lnd::lnrpc::PeerEventSubscription {}) @@ -160,15 +158,20 @@ where client: peers_client, }; - match produce_peer_events(peer_stream, peers_sender, peers_exit_receiver).await { + match produce_peer_events(peer_stream, peers_sender, peers_listener).await { Ok(_) => debug!("Peer events producer exited."), - Err(e) => error!("Peer events producer exited: {e}."), + Err(e) => { + peers_shutdown.trigger(); + error!("Peer events producer exited: {e}."); + } }; }); // Subscribe to custom messaging events from LND so that we can receive incoming messages. let mut messages_client = ln_client.clone(); let in_msg_sender = sender.clone(); + let (messages_shutdown, messages_listener) = + (signals.shutdown.clone(), signals.listener.clone()); set.spawn(async move { let message_subscription = messages_client .subscribe_custom_messages(tonic_lnd::lnrpc::SubscribeCustomMessagesRequest {}) @@ -180,25 +183,28 @@ where message_subscription, }; - match produce_incoming_message_events( - message_stream, - in_msg_sender, - in_messages_exit_receiver, - ) - .await + match produce_incoming_message_events(message_stream, in_msg_sender, messages_listener) + .await { Ok(_) => debug!("Message events producer exited."), - Err(e) => error!("Message events producer exited: {e}."), + Err(e) => { + messages_shutdown.trigger(); + error!("Message events producer exited: {e}."); + } } }); // Spin up a ticker that polls at an interval for any outgoing messages so that we can pass on outgoing messages to // LND. let interval = time::interval(MSG_POLL_INTERVAL); + let (events_shutdown, events_listener) = (signals.shutdown.clone(), signals.listener.clone()); set.spawn(async move { - match produce_outgoing_message_events(sender, out_messages_exit_receiver, interval).await { + match produce_outgoing_message_events(sender, events_listener, interval).await { Ok(_) => debug!("Outgoing message events producer exited."), - Err(e) => error!("Outgoing message events producer exited: {e}."), + Err(e) => { + events_shutdown.trigger(); + error!("Outgoing message events producer exited: {e}."); + } } }); @@ -225,15 +231,12 @@ where .await; match consume_result { Ok(_) => info!("Consume messenger events exited."), - Err(e) => error!("Consume messenger events exited: {e}."), + Err(e) => { + signals.shutdown.trigger(); + error!("Consume messenger events exited: {e}."); + } } - // Once the consumer has exited, we drop our exit signal channel's sender so that the receiving channels will close. - // This signals to all producers that it's time to exit, so we can await their exit once we've done this. - drop(peers_exit_sender); - drop(in_messages_exit_sender); - drop(out_messages_exit_sender); - // Tasks will independently exit, so we can assert that they do so in any order. let mut task_err = false; while let Some(res) = set.join_next().await { @@ -338,7 +341,7 @@ impl PeerEventProducer for PeerStream { async fn produce_peer_events( mut source: impl PeerEventProducer, events: Sender, - mut exit: Receiver<()>, + listener: Listener, ) -> Result<(), ProducerError> { loop { select! ( @@ -347,7 +350,7 @@ async fn produce_peer_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Peer events received signal to quit."); return Ok(()) } @@ -421,7 +424,7 @@ impl IncomingMessageProducer for MessageStream { async fn produce_incoming_message_events( mut source: impl IncomingMessageProducer, events: Sender, - mut exit: Receiver<()>, + listener: Listener, ) -> Result<(), ProducerError> { loop { select! ( @@ -430,7 +433,7 @@ async fn produce_incoming_message_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Peer events received signal to quit."); return Ok(()) } @@ -641,7 +644,7 @@ impl SendCustomMessage for CustomMessenger { /// events anyway. async fn produce_outgoing_message_events( events: Sender, - mut exit: Receiver<()>, + listener: Listener, mut interval: Interval, ) -> Result<(), ProducerError> { loop { @@ -651,7 +654,7 @@ async fn produce_outgoing_message_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Outgoing messenger events received signal to quit."); return Ok(()); } @@ -942,7 +945,7 @@ mod tests { #[tokio::test] async fn test_produce_peer_events() { let (sender, mut receiver) = channel(4); - let (_exit_sender, exit) = channel(1); + let (_shutdown, listener) = triggered::trigger(); let mut mock = MockPeerProducer::new(); @@ -977,7 +980,7 @@ mod tests { .returning(|| Err(Status::unknown("mock stream err"))); matches!( - produce_peer_events(mock, sender, exit) + produce_peer_events(mock, sender, listener) .await .expect_err("producer should error"), ProducerError::StreamError(_) @@ -1007,11 +1010,11 @@ mod tests { #[tokio::test] async fn test_produce_peer_events_exit() { let (sender, _receiver) = channel(1); - let (exit_sender, exit) = channel(1); + let (shutdown, listener) = triggered::trigger(); let mock = MockPeerProducer::new(); - drop(exit_sender); - assert!(produce_peer_events(mock, sender, exit).await.is_ok()); + shutdown.trigger(); + assert!(produce_peer_events(mock, sender, listener).await.is_ok()); } mock! { @@ -1026,7 +1029,7 @@ mod tests { #[tokio::test] async fn test_produce_incoming_message_events() { let (sender, mut receiver) = channel(2); - let (_exit_sender, exit) = channel(1); + let (_shutdown, listener) = triggered::trigger(); let mut mock = MockMessageProducer::new(); @@ -1056,7 +1059,7 @@ mod tests { .returning(|| Err(Status::unknown("mock stream err"))); matches!( - produce_incoming_message_events(mock, sender, exit) + produce_incoming_message_events(mock, sender, listener) .await .expect_err("producer should error"), ProducerError::StreamError(_), @@ -1076,11 +1079,11 @@ mod tests { #[tokio::test] async fn test_produce_incoming_message_exit() { let (sender, _receiver) = channel(2); - let (exit_sender, exit) = channel(1); + let (shutdown, listener) = triggered::trigger(); let mock = MockMessageProducer::new(); - drop(exit_sender); - assert!(produce_incoming_message_events(mock, sender, exit) + shutdown.trigger(); + assert!(produce_incoming_message_events(mock, sender, listener) .await .is_ok()); } @@ -1088,16 +1091,14 @@ mod tests { #[tokio::test] async fn test_produce_outgoing_message_events_exit() { let (sender, _) = channel(1); - let (exit_sender, exit_receiver) = channel(1); + let (shutdown, listener) = triggered::trigger(); let interval = time::interval(MSG_POLL_INTERVAL); // Let's test that produce_outgoing_message_events successfully exits when it receives the signal, rather than // loop infinitely. - drop(exit_sender); - assert!( - produce_outgoing_message_events(sender, exit_receiver, interval) - .await - .is_ok() - ); + shutdown.trigger(); + assert!(produce_outgoing_message_events(sender, listener, interval) + .await + .is_ok()); } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 2bfa72d1..e660fa85 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -4,6 +4,7 @@ use lndk; use bitcoin::secp256k1::PublicKey; use chrono::Utc; use ldk_sample::node_api::Node as LdkNode; +use lndk::LifecycleSignals; use std::path::PathBuf; use std::str::FromStr; use tokio::select; @@ -59,6 +60,7 @@ async fn test_lndk_forwards_onion_message() { // Now we'll spin up lndk. Even though ldk1 and ldk2 are not directly connected, we'll show that lndk // successfully helps lnd forward the onion message from ldk1 to ldk2. + let (shutdown, listener) = triggered::trigger(); let lnd_cfg = lndk::lnd::LndCfg::new( lnd.address, PathBuf::from_str(&lnd.cert_path).unwrap(), @@ -66,6 +68,10 @@ async fn test_lndk_forwards_onion_message() { ); let now_timestamp = Utc::now(); let timestamp = now_timestamp.format("%d-%m-%Y-%H%M"); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + }; let lndk_cfg = lndk::Cfg { lnd: lnd_cfg, log_dir: Some( @@ -75,6 +81,7 @@ async fn test_lndk_forwards_onion_message() { .unwrap() .to_string(), ), + signals, }; select! { val = lndk::run(lndk_cfg) => { @@ -82,6 +89,7 @@ async fn test_lndk_forwards_onion_message() { }, // We wait for ldk2 to receive the onion message. (ldk1, ldk2) = wait_to_receive_onion_message(ldk1, ldk2, PublicKey::from_str(&lnd_info.identity_pubkey).unwrap()) => { + shutdown.trigger(); ldk1.stop().await; ldk2.stop().await; } From b104164bf2e73e52f2782037bab4931896669cb3 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 20 Dec 2023 10:21:01 -0500 Subject: [PATCH 02/24] multi: refactor to create OfferHandler and LndkOnionMessenger Provides the end user with something to interact with when they want to pay offers. --- src/lib.rs | 206 ++++++++++++++----------- src/main.rs | 6 +- src/onion_messenger.rs | 308 +++++++++++++++++++------------------ tests/integration_tests.rs | 5 +- 4 files changed, 279 insertions(+), 246 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2831efa1..7cd5433f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ use crate::lnd::{ features_support_onion_messages, get_lnd_client, string_to_network, LndCfg, LndNodeSigner, }; -use crate::onion_messenger::{run_onion_messenger, MessengerUtilities}; +use crate::onion_messenger::MessengerUtilities; use bitcoin::secp256k1::PublicKey; use home::home_dir; use lightning::ln::peer_handler::IgnoringMessageHandler; @@ -46,108 +46,132 @@ pub fn init_logger(config: LogConfig) { }); } -pub async fn run(args: Cfg) -> Result<(), ()> { - let log_dir = args.log_dir.unwrap_or_else(|| { - home_dir() - .unwrap() - .join(".lndk") - .join("lndk.log") - .as_path() - .to_str() - .unwrap() - .to_string() - }); +pub struct LndkOnionMessenger { + pub offer_handler: OfferHandler, +} - // Log both to stdout and a log file. - let stdout = ConsoleAppender::builder().build(); - let lndk_logs = FileAppender::builder() - .encoder(Box::new(PatternEncoder::new("{d} - {m}{n}"))) - .build(log_dir) - .unwrap(); - - let config = LogConfig::builder() - .appender(Appender::builder().build("stdout", Box::new(stdout))) - .appender(Appender::builder().build("lndk_logs", Box::new(lndk_logs))) - .build( - Root::builder() - .appender("stdout") - .appender("lndk_logs") - .build(LevelFilter::Info), - ) - .unwrap(); +impl LndkOnionMessenger { + pub fn new(offer_handler: OfferHandler) -> Self { + LndkOnionMessenger { offer_handler } + } - init_logger(config); + pub async fn run(&self, args: Cfg) -> Result<(), ()> { + let log_dir = args.log_dir.unwrap_or_else(|| { + home_dir() + .unwrap() + .join(".lndk") + .join("lndk.log") + .as_path() + .to_str() + .unwrap() + .to_string() + }); + + // Log both to stdout and a log file. + let stdout = ConsoleAppender::builder().build(); + let lndk_logs = FileAppender::builder() + .encoder(Box::new(PatternEncoder::new("{d} - {m}{n}"))) + .build(log_dir) + .unwrap(); + + let config = LogConfig::builder() + .appender(Appender::builder().build("stdout", Box::new(stdout))) + .appender(Appender::builder().build("lndk_logs", Box::new(lndk_logs))) + .build( + Root::builder() + .appender("stdout") + .appender("lndk_logs") + .build(LevelFilter::Info), + ) + .unwrap(); + + init_logger(config); + + let mut client = get_lnd_client(args.lnd).expect("failed to connect"); + + let info = client + .lightning() + .get_info(GetInfoRequest {}) + .await + .expect("failed to get info") + .into_inner(); + + let mut network_str = None; + for chain in info.chains { + if chain.chain == "bitcoin" { + network_str = Some(chain.network.clone()) + } + } + if network_str.is_none() { + error!("lnd node is not connected to bitcoin network as expected"); + return Err(()); + } + let network = string_to_network(&network_str.unwrap()); - let mut client = get_lnd_client(args.lnd).expect("failed to connect"); + let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); + info!("Starting lndk for node: {pubkey}."); - let info = client - .lightning() - .get_info(GetInfoRequest {}) - .await - .expect("failed to get info") - .into_inner(); + if !features_support_onion_messages(&info.features) { + error!("LND must support onion messaging to run LNDK."); + return Err(()); + } - let mut network_str = None; - for chain in info.chains { - if chain.chain == "bitcoin" { - network_str = Some(chain.network.clone()) + // On startup, we want to get a list of our currently online peers to notify the onion messenger that they are + // connected. This sets up our "start state" for the messenger correctly. + let current_peers = client + .lightning() + .list_peers(tonic_lnd::lnrpc::ListPeersRequest { + latest_error: false, + }) + .await + .map_err(|e| { + error!("Could not lookup current peers: {e}."); + })?; + + let mut peer_support = HashMap::new(); + for peer in current_peers.into_inner().peers { + let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); + let onion_support = features_support_onion_messages(&peer.features); + peer_support.insert(pubkey, onion_support); } + + // Create an onion messenger that depends on LND's signer client and consume related events. + let mut node_client = client.signer().clone(); + let node_signer = LndNodeSigner::new(pubkey, &mut node_client); + let messenger_utils = MessengerUtilities::new(); + let onion_messenger = OnionMessenger::new( + &messenger_utils, + &node_signer, + &messenger_utils, + &DefaultMessageRouter {}, + IgnoringMessageHandler {}, + IgnoringMessageHandler {}, + ); + + let mut peers_client = client.lightning().clone(); + self.run_onion_messenger( + peer_support, + &mut peers_client, + onion_messenger, + network.unwrap(), + args.signals, + ) + .await } - if network_str.is_none() { - error!("lnd node is not connected to bitcoin network as expected"); - return Err(()); - } - let network = string_to_network(&network_str.unwrap()); +} - let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); - info!("Starting lndk for node: {pubkey}."); +pub struct OfferHandler {} - if !features_support_onion_messages(&info.features) { - error!("LND must support onion messaging to run LNDK."); - return Err(()); +impl OfferHandler { + pub fn new() -> Self { + OfferHandler {} } +} - // On startup, we want to get a list of our currently online peers to notify the onion messenger that they are - // connected. This sets up our "start state" for the messenger correctly. - let current_peers = client - .lightning() - .list_peers(tonic_lnd::lnrpc::ListPeersRequest { - latest_error: false, - }) - .await - .map_err(|e| { - error!("Could not lookup current peers: {e}."); - })?; - - let mut peer_support = HashMap::new(); - for peer in current_peers.into_inner().peers { - let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); - let onion_support = features_support_onion_messages(&peer.features); - peer_support.insert(pubkey, onion_support); +impl Default for OfferHandler { + fn default() -> Self { + Self::new() } - - // Create an onion messenger that depends on LND's signer client and consume related events. - let mut node_client = client.signer().clone(); - let node_signer = LndNodeSigner::new(pubkey, &mut node_client); - let messenger_utils = MessengerUtilities::new(); - let onion_messenger = OnionMessenger::new( - &messenger_utils, - &node_signer, - &messenger_utils, - &DefaultMessageRouter {}, - IgnoringMessageHandler {}, - IgnoringMessageHandler {}, - ); - - let mut peers_client = client.lightning().clone(); - run_onion_messenger( - peer_support, - &mut peers_client, - onion_messenger, - network.unwrap(), - args.signals, - ) - .await } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index 700d74ab..fd7f50a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ mod internal { use internal::*; use lndk::lnd::LndCfg; -use lndk::{Cfg, LifecycleSignals}; +use lndk::{Cfg, LifecycleSignals, LndkOnionMessenger, OfferHandler}; #[macro_use] extern crate configure_me; @@ -30,5 +30,7 @@ async fn main() -> Result<(), ()> { signals, }; - lndk::run(args).await + let handler = OfferHandler::new(); + let messenger = LndkOnionMessenger::new(handler); + messenger.run(args).await } diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index e3b6712b..d83605cb 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -1,7 +1,7 @@ use crate::clock::TokioClock; use crate::lnd::{features_support_onion_messages, ONION_MESSAGES_OPTIONAL}; use crate::rate_limit::{RateLimiter, TokenLimiter}; -use crate::LifecycleSignals; +use crate::{LifecycleSignals, LndkOnionMessenger}; use async_trait::async_trait; use bitcoin::blockdata::constants::ChainHash; use bitcoin::network::constants::Network; @@ -90,170 +90,174 @@ impl Logger for MessengerUtilities { } } -/// run_onion_messenger is the main event loop for connecting an OnionMessenger to LND's various APIs to handle -/// onion messages externally to LND. It follows a producer / consumer pattern, with many producers -/// creating MessengerEvents that are handled by a single consumer that drives the OnionMessenger accordingly. This -/// function will block until consumer errors or one of the producers exits. -/// -/// Producers: -/// 1. Peer Events: Sourced from LND's PeerEventSubscription API, produces peer online and offline events. -/// 2. Incoming Messages: Sourced from LND's SubscribeCustomMessages API, produces incoming onion message events. -/// 3. Outgoing Poll: Using a simple ticker, produces polling events to check for outgoing onion messages. -/// -/// The main consumer processes one MessengerEvent at a time, applying basic rate limiting to each peer to prevent spam. -pub(crate) async fn run_onion_messenger< - ES: Deref, - NS: Deref, - L: Deref, - MR: Deref, - OMH: Deref, - CMH: Deref, ->( - current_peers: HashMap, - ln_client: &mut tonic_lnd::LightningClient, - onion_messenger: OnionMessenger, - network: Network, - signals: LifecycleSignals, -) -> Result<(), ()> -where - ES::Target: EntropySource, - NS::Target: NodeSigner, - L::Target: Logger, - MR::Target: MessageRouter, - OMH::Target: OffersMessageHandler, - CMH::Target: CustomOnionMessageHandler + Sized, -{ - // Setup channels that we'll use to communicate onion messenger events. We buffer our channels by the number of - // peers (+1 because we require a non-zero buffer) that the node currently has so that we can send all of our - // startup online events in one go (before we boot up the consumer). The number of peers that we have is also - // related to the number of events we can expect to process, so it's a sensible enough buffer size. - let (sender, receiver) = channel(current_peers.len() + 1); - for (peer, onion_support) in current_peers.clone() { - sender - .send(MessengerEvents::PeerConnected(peer, onion_support)) - .await - .map_err(|e| { - error!("Notify peer connected: {e}."); - })? - } +impl LndkOnionMessenger { + /// run_onion_messenger is the main event loop for connecting an OnionMessenger to LND's various APIs to handle + /// onion messages externally to LND. It follows a producer / consumer pattern, with many producers + /// creating MessengerEvents that are handled by a single consumer that drives the OnionMessenger accordingly. This + /// function will block until consumer errors or one of the producers exits. + /// + /// Producers: + /// 1. Peer Events: Sourced from LND's PeerEventSubscription API, produces peer online and offline events. + /// 2. Incoming Messages: Sourced from LND's SubscribeCustomMessages API, produces incoming onion message events. + /// 3. Outgoing Poll: Using a simple ticker, produces polling events to check for outgoing onion messages. + /// + /// The main consumer processes one MessengerEvent at a time, applying basic rate limiting to each peer to prevent spam. + pub(crate) async fn run_onion_messenger< + ES: Deref, + NS: Deref, + L: Deref, + MR: Deref, + OMH: Deref, + CMH: Deref, + >( + &self, + current_peers: HashMap, + ln_client: &mut tonic_lnd::LightningClient, + onion_messenger: OnionMessenger, + network: Network, + signals: LifecycleSignals, + ) -> Result<(), ()> + where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler + Sized, + { + // Setup channels that we'll use to communicate onion messenger events. We buffer our channels by the number of + // peers (+1 because we require a non-zero buffer) that the node currently has so that we can send all of our + // startup online events in one go (before we boot up the consumer). The number of peers that we have is also + // related to the number of events we can expect to process, so it's a sensible enough buffer size. + let (sender, receiver) = channel(current_peers.len() + 1); + for (peer, onion_support) in current_peers.clone() { + sender + .send(MessengerEvents::PeerConnected(peer, onion_support)) + .await + .map_err(|e| { + error!("Notify peer connected: {e}."); + })? + } - let mut set = tokio::task::JoinSet::new(); - - // Subscribe to peer events from LND first thing so that we don't miss any online/offline events while we are - // starting up. The onion messenger can handle superfluous online/offline reports, so it's okay if this ends - // up creating some duplicate events. The event subscription from LND blocks until it gets its first event (which - // could take very long), so we get the subscription itself inside of our producer thread. - let mut peers_client = ln_client.clone(); - let peers_sender = sender.clone(); - let (peers_shutdown, peers_listener) = (signals.shutdown.clone(), signals.listener.clone()); - set.spawn(async move { - let peer_subscription = peers_client - .subscribe_peer_events(tonic_lnd::lnrpc::PeerEventSubscription {}) - .await - .expect("peer subscription failed") - .into_inner(); + let mut set = tokio::task::JoinSet::new(); + + // Subscribe to peer events from LND first thing so that we don't miss any online/offline events while we are + // starting up. The onion messenger can handle superfluous online/offline reports, so it's okay if this ends + // up creating some duplicate events. The event subscription from LND blocks until it gets its first event (which + // could take very long), so we get the subscription itself inside of our producer thread. + let mut peers_client = ln_client.clone(); + let peers_sender = sender.clone(); + let (peers_shutdown, peers_listener) = (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + let peer_subscription = peers_client + .subscribe_peer_events(tonic_lnd::lnrpc::PeerEventSubscription {}) + .await + .expect("peer subscription failed") + .into_inner(); - let peer_stream = PeerStream { - peer_subscription, - client: peers_client, - }; + let peer_stream = PeerStream { + peer_subscription, + client: peers_client, + }; - match produce_peer_events(peer_stream, peers_sender, peers_listener).await { - Ok(_) => debug!("Peer events producer exited."), - Err(e) => { - peers_shutdown.trigger(); - error!("Peer events producer exited: {e}."); + match produce_peer_events(peer_stream, peers_sender, peers_listener).await { + Ok(_) => debug!("Peer events producer exited."), + Err(e) => { + peers_shutdown.trigger(); + error!("Peer events producer exited: {e}."); + } + }; + }); + + // Subscribe to custom messaging events from LND so that we can receive incoming messages. + let mut messages_client = ln_client.clone(); + let in_msg_sender = sender.clone(); + let (messages_shutdown, messages_listener) = + (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + let message_subscription = messages_client + .subscribe_custom_messages(tonic_lnd::lnrpc::SubscribeCustomMessagesRequest {}) + .await + .expect("message subscription failed") + .into_inner(); + + let message_stream = MessageStream { + message_subscription, + }; + + match produce_incoming_message_events(message_stream, in_msg_sender, messages_listener) + .await + { + Ok(_) => debug!("Message events producer exited."), + Err(e) => { + messages_shutdown.trigger(); + error!("Message events producer exited: {e}."); + } } - }; - }); - - // Subscribe to custom messaging events from LND so that we can receive incoming messages. - let mut messages_client = ln_client.clone(); - let in_msg_sender = sender.clone(); - let (messages_shutdown, messages_listener) = - (signals.shutdown.clone(), signals.listener.clone()); - set.spawn(async move { - let message_subscription = messages_client - .subscribe_custom_messages(tonic_lnd::lnrpc::SubscribeCustomMessagesRequest {}) - .await - .expect("message subscription failed") - .into_inner(); + }); - let message_stream = MessageStream { - message_subscription, - }; + // Spin up a ticker that polls at an interval for any outgoing messages so that we can pass on outgoing messages to + // LND. + let interval = time::interval(MSG_POLL_INTERVAL); + let (events_shutdown, events_listener) = + (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + match produce_outgoing_message_events(sender, events_listener, interval).await { + Ok(_) => debug!("Outgoing message events producer exited."), + Err(e) => { + events_shutdown.trigger(); + error!("Outgoing message events producer exited: {e}."); + } + } + }); - match produce_incoming_message_events(message_stream, in_msg_sender, messages_listener) - .await - { - Ok(_) => debug!("Message events producer exited."), + // Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to + // allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread. + // This function is expected to finish if any producing thread exits (because we're no longer receiving the + // events we need). + let rate_limiter = &mut TokenLimiter::new( + current_peers.keys().copied(), + DEFAULT_CALL_COUNT, + DEFAULT_CALL_FREQUENCY, + TokioClock::new(), + ); + let mut message_sender = CustomMessenger { + client: ln_client.clone(), + }; + let consume_result = consume_messenger_events( + onion_messenger, + receiver, + &mut message_sender, + rate_limiter, + network, + ) + .await; + match consume_result { + Ok(_) => info!("Consume messenger events exited."), Err(e) => { - messages_shutdown.trigger(); - error!("Message events producer exited: {e}."); + signals.shutdown.trigger(); + error!("Consume messenger events exited: {e}."); } } - }); - - // Spin up a ticker that polls at an interval for any outgoing messages so that we can pass on outgoing messages to - // LND. - let interval = time::interval(MSG_POLL_INTERVAL); - let (events_shutdown, events_listener) = (signals.shutdown.clone(), signals.listener.clone()); - set.spawn(async move { - match produce_outgoing_message_events(sender, events_listener, interval).await { - Ok(_) => debug!("Outgoing message events producer exited."), - Err(e) => { - events_shutdown.trigger(); - error!("Outgoing message events producer exited: {e}."); - } + + // Tasks will independently exit, so we can assert that they do so in any order. + let mut task_err = false; + while let Some(res) = set.join_next().await { + match res { + Ok(_) => info!("Producer exited."), + Err(_) => { + task_err = true; + error!("Producer exited with an error."); + } + }; } - }); - - // Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to - // allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread. - // This function is expected to finish if any producing thread exits (because we're no longer receiving the - // events we need). - let rate_limiter = &mut TokenLimiter::new( - current_peers.keys().copied(), - DEFAULT_CALL_COUNT, - DEFAULT_CALL_FREQUENCY, - TokioClock::new(), - ); - let mut message_sender = CustomMessenger { - client: ln_client.clone(), - }; - let consume_result = consume_messenger_events( - onion_messenger, - receiver, - &mut message_sender, - rate_limiter, - network, - ) - .await; - match consume_result { - Ok(_) => info!("Consume messenger events exited."), - Err(e) => { - signals.shutdown.trigger(); - error!("Consume messenger events exited: {e}."); + // Exit with an error if any task did not exit cleanly. + if consume_result.is_err() || task_err { + return Err(()); } - } - // Tasks will independently exit, so we can assert that they do so in any order. - let mut task_err = false; - while let Some(res) = set.join_next().await { - match res { - Ok(_) => info!("Producer exited."), - Err(_) => { - task_err = true; - error!("Producer exited with an error."); - } - }; + Ok(()) } - // Exit with an error if any task did not exit cleanly. - if consume_result.is_err() || task_err { - return Err(()); - } - - Ok(()) } /// lookup_onion_support performs a best-effort lookup in the node's list of current peers to determine whether it diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index e660fa85..0bbe2ba0 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -83,8 +83,11 @@ async fn test_lndk_forwards_onion_message() { ), signals, }; + + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); select! { - val = lndk::run(lndk_cfg) => { + val = messenger.run(lndk_cfg) => { panic!("lndk should not have completed first {:?}", val); }, // We wait for ldk2 to receive the onion message. From 8241ca1166c23f15e1bb7d05639d3e2ba81375ef Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 20 Dec 2023 10:24:12 -0500 Subject: [PATCH 03/24] lib: implement and use OfferMessageHandler on OfferHandler --- src/lib.rs | 44 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7cd5433f..44916fbc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,9 @@ use crate::onion_messenger::MessengerUtilities; use bitcoin::secp256k1::PublicKey; use home::home_dir; use lightning::ln::peer_handler::IgnoringMessageHandler; -use lightning::onion_message::{DefaultMessageRouter, OnionMessenger}; +use lightning::onion_message::{ + DefaultMessageRouter, OffersMessage, OffersMessageHandler, OnionMessenger, PendingOnionMessage, +}; use log::{error, info, LevelFilter}; use log4rs::append::console::ConsoleAppender; use log4rs::append::file::FileAppender; @@ -21,7 +23,7 @@ use log4rs::config::{Appender, Config as LogConfig, Root}; use log4rs::encode::pattern::PatternEncoder; use std::collections::HashMap; use std::str::FromStr; -use std::sync::Once; +use std::sync::{Mutex, Once}; use tonic_lnd::lnrpc::GetInfoRequest; use triggered::{Listener, Trigger}; @@ -144,7 +146,7 @@ impl LndkOnionMessenger { &node_signer, &messenger_utils, &DefaultMessageRouter {}, - IgnoringMessageHandler {}, + &self.offer_handler, IgnoringMessageHandler {}, ); @@ -160,11 +162,26 @@ impl LndkOnionMessenger { } } -pub struct OfferHandler {} +#[allow(dead_code)] +enum OfferState { + OfferAdded, + InvoiceRequestSent, + InvoiceReceived, + InvoicePaymentDispatched, + InvoicePaid, +} + +pub struct OfferHandler { + _active_offers: Mutex>, + pending_messages: Mutex>>, +} impl OfferHandler { pub fn new() -> Self { - OfferHandler {} + OfferHandler { + _active_offers: Mutex::new(HashMap::new()), + pending_messages: Mutex::new(Vec::new()), + } } } @@ -174,6 +191,23 @@ impl Default for OfferHandler { } } +impl OffersMessageHandler for OfferHandler { + fn handle_message(&self, message: OffersMessage) -> Option { + match message { + OffersMessage::InvoiceRequest(_) => { + log::error!("Invoice request received, payment not yet supported."); + None + } + OffersMessage::Invoice(_invoice) => None, + OffersMessage::InvoiceError(_error) => None, + } + } + + fn release_pending_messages(&self) -> Vec> { + core::mem::take(&mut self.pending_messages.lock().unwrap()) + } +} + #[cfg(test)] mod tests { pub mod test_utils; From 277e53e7b63151ddae8e874e587978393b0014bb Mon Sep 17 00:00:00 2001 From: Orbital Date: Tue, 30 Jan 2024 21:00:03 -0600 Subject: [PATCH 04/24] lib: refactor create_invoice_request to be a method of OfferHandler --- src/lndk_offers.rs | 139 +++++++++++++++++++++++---------------------- 1 file changed, 71 insertions(+), 68 deletions(-) diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 26aae8fd..bac4cd5a 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -1,4 +1,5 @@ use crate::lnd::MessageSigner; +use crate::OfferHandler; use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; use bitcoin::network::constants::Network; @@ -44,58 +45,63 @@ pub fn decode(offer_str: String) -> Result { offer_str.parse::() } -#[allow(dead_code)] -// create_request_invoice builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. -pub(crate) async fn create_request_invoice( - mut signer: impl MessageSigner + std::marker::Send + 'static, - offer: Offer, - metadata: Vec, - network: Network, - msats: u64, -) -> Result> { - // We use KeyFamily KeyFamilyNodeKey (6) to derive a key to represent our node id. See: - // https://github.com/lightningnetwork/lnd/blob/a3f8011ed695f6204ec6a13ad5c2a67ac542b109/keychain/derivation.go#L103 - let key_loc = KeyLocator { - key_family: 6, - key_index: 1, - }; - - let pubkey_bytes = signer - .derive_key(key_loc.clone()) - .await - .map_err(OfferError::DeriveKeyFailure)?; - let pubkey = PublicKey::from_slice(&pubkey_bytes).expect("failed to deserialize public key"); - - let unsigned_invoice_req = offer - .request_invoice(metadata, pubkey) - .unwrap() - .chain(network) - .unwrap() - .amount_msats(msats) - .unwrap() - .build() - .map_err(OfferError::BuildUIRFailure)?; - - // To create a valid invoice request, we also need to sign it. This is spawned in a blocking - // task because we need to call block_on on sign_message so that sign_closure can be a - // synchronous closure. - task::spawn_blocking(move || { - let sign_closure = |msg: &UnsignedInvoiceRequest| { - let tagged_hash = msg.as_ref(); - let tag = tagged_hash.tag().to_string(); - - let signature = block_on(signer.sign_message(key_loc, tagged_hash.merkle_root(), tag)) - .map_err(|_| Secp256k1Error::InvalidSignature)?; - - Signature::from_slice(&signature) +impl OfferHandler { + #[allow(dead_code)] + // create_request_invoice builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. + pub(crate) async fn create_request_invoice( + &self, + mut signer: impl MessageSigner + std::marker::Send + 'static, + offer: Offer, + metadata: Vec, + network: Network, + msats: u64, + ) -> Result> { + // We use KeyFamily KeyFamilyNodeKey (6) to derive a key to represent our node id. See: + // https://github.com/lightningnetwork/lnd/blob/a3f8011ed695f6204ec6a13ad5c2a67ac542b109/keychain/derivation.go#L103 + let key_loc = KeyLocator { + key_family: 6, + key_index: 1, }; - unsigned_invoice_req - .sign(sign_closure) - .map_err(OfferError::SignError) - }) - .await - .unwrap() + let pubkey_bytes = signer + .derive_key(key_loc.clone()) + .await + .map_err(OfferError::DeriveKeyFailure)?; + let pubkey = + PublicKey::from_slice(&pubkey_bytes).expect("failed to deserialize public key"); + + let unsigned_invoice_req = offer + .request_invoice(metadata, pubkey) + .unwrap() + .chain(network) + .unwrap() + .amount_msats(msats) + .unwrap() + .build() + .map_err(OfferError::BuildUIRFailure)?; + + // To create a valid invoice request, we also need to sign it. This is spawned in a blocking + // task because we need to call block_on on sign_message so that sign_closure can be a + // synchronous closure. + task::spawn_blocking(move || { + let sign_closure = |msg: &UnsignedInvoiceRequest| { + let tagged_hash = msg.as_ref(); + let tag = tagged_hash.tag().to_string(); + + let signature = + block_on(signer.sign_message(key_loc, tagged_hash.merkle_root(), tag)) + .map_err(|_| Secp256k1Error::InvalidSignature)?; + + Signature::from_slice(&signature) + }; + + unsigned_invoice_req + .sign(sign_closure) + .map_err(OfferError::SignError) + }) + .await + .unwrap() + } } #[async_trait] @@ -176,12 +182,11 @@ mod tests { }); let offer = decode(get_offer()).unwrap(); - - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_ok() - ) + let handler = OfferHandler::new(); + assert!(handler + .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_ok()) } #[tokio::test] @@ -200,12 +205,11 @@ mod tests { }); let offer = decode(get_offer()).unwrap(); - - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err() - ) + let handler = OfferHandler::new(); + assert!(handler + .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) } #[tokio::test] @@ -224,11 +228,10 @@ mod tests { .returning(|_, _, _| Err(Status::unknown("error testing"))); let offer = decode(get_offer()).unwrap(); - - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err() - ) + let handler = OfferHandler::new(); + assert!(handler + .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) } } From 6f4ae687fd10c27c38974e76b73b991dde47f08e Mon Sep 17 00:00:00 2001 From: Orbital Date: Tue, 30 Jan 2024 21:35:27 -0600 Subject: [PATCH 05/24] offers: rename create_invoice_request --- src/lndk_offers.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index bac4cd5a..02017887 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -47,8 +47,8 @@ pub fn decode(offer_str: String) -> Result { impl OfferHandler { #[allow(dead_code)] - // create_request_invoice builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. - pub(crate) async fn create_request_invoice( + // create_invoice_request builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. + pub(crate) async fn create_invoice_request( &self, mut signer: impl MessageSigner + std::marker::Send + 'static, offer: Offer, @@ -184,7 +184,7 @@ mod tests { let offer = decode(get_offer()).unwrap(); let handler = OfferHandler::new(); assert!(handler - .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) .await .is_ok()) } @@ -207,7 +207,7 @@ mod tests { let offer = decode(get_offer()).unwrap(); let handler = OfferHandler::new(); assert!(handler - .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) .await .is_err()) } @@ -230,7 +230,7 @@ mod tests { let offer = decode(get_offer()).unwrap(); let handler = OfferHandler::new(); assert!(handler - .create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) .await .is_err()) } From 9bfed001874fdc917296fe365fe7fc18975e642b Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 13 Nov 2023 18:35:50 -0600 Subject: [PATCH 06/24] itests: update lnd submodule to tagged hash change For our integration tests to work properly we need to use a custom version of lnd's RPC that allows signing the tagged hash (BIP 340) of a message. This was introduced in LND at commit dacb86f. --- lnd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lnd b/lnd index 2fb150c8..dacb86fd 160000 --- a/lnd +++ b/lnd @@ -1 +1 @@ -Subproject commit 2fb150c8fe827df9df0520ef9916b3afb7b03a8d +Subproject commit dacb86fdbc2d15c81e780217414fcbc6e0d08186 From 8b00be5dd4e8a511e484b27a0b9dbe36d77e5e13 Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 13 Nov 2023 18:36:50 -0600 Subject: [PATCH 07/24] itests: add walletrpc subserver to lnd Makefile/README For the integration tests to work we need to add the walletrpc subservice to the lnd make process so we can use the DeriveKey RPC call. --- Makefile | 2 +- README.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 11ca02b3..51d4d7db 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,6 @@ endif itest: @$(call print, "Building lnd for itests.") git submodule update --init --recursive - cd lnd/cmd/lnd; $(GO_BUILD) -tags="peersrpc signrpc dev" -o $(TMP_DIR)/lndk-tests/bin/lnd-itest$(EXEC_SUFFIX) + cd lnd/cmd/lnd; $(GO_BUILD) -tags="peersrpc signrpc walletrpc dev" -o $(TMP_DIR)/lndk-tests/bin/lnd-itest$(EXEC_SUFFIX) $(CARGO_TEST) -- -- test '*' --test-threads=1 --nocapture diff --git a/README.md b/README.md index d4d9f732..3b4d33f2 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ When you encounter a problem with `LNDK`, Feel free to file issues or start [a d To run `LNDK`, you require access to a `LND` node running _at least_ [LND v0.17.0](https://github.com/lightningnetwork/lnd/releases/tag/v0.17.0-beta). -You will need to compile `LND` in `dev` mode (to enable protocol-level feature handling externally) and enable the `peersrpc` and `signerrpc` sub-servers: +You will need to compile `LND` in `dev` mode (to enable protocol-level feature handling externally) and enable the `peersrpc`, `signerrpc`, and `walletrpc` sub-servers: -`make install tags="peersrpc signrpc dev"` +`make install tags="peersrpc signrpc walletrpc dev"` Note that this guide assumes some familiarity with setting up `LND`. If you're looking to get up to speed, try [this guide](https://docs.lightning.engineering/lightning-network-tools/lnd/run-lnd). From 53640aa657eb7af4ea1aae0f3101181b891def26 Mon Sep 17 00:00:00 2001 From: Orbital Date: Thu, 16 Nov 2023 17:36:27 -0600 Subject: [PATCH 08/24] Pin rust-lightning to a custom version Bumps rust-lightning to a custom version that allows us to sign the invoice request's tagged hash. --- Cargo.lock | 67 +++++++++++++++++++++++------------------------------- Cargo.toml | 4 ++-- 2 files changed, 30 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6b37e0d..c4ff5fc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -773,12 +773,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" -[[package]] -name = "hex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" - [[package]] name = "hex" version = "0.4.3" @@ -953,22 +947,22 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "ldk-sample" version = "0.1.0" -source = "git+https://github.com/lndk-org/ldk-sample?branch=onion-handler#65758f45c9caa6befda1c5c139c049b3081ffdfb" +source = "git+https://github.com/lndk-org/ldk-sample?branch=offers#848e562f0ef57c95e015454cc2c00bca1e315e29" dependencies = [ "base64 0.13.1", "bech32 0.8.1", "bitcoin 0.29.2", "bitcoin-bech32", "chrono", - "hex 0.3.2", "libc", - "lightning 0.0.116", + "lightning 0.0.118", "lightning-background-processor", "lightning-block-sync", "lightning-invoice", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", + "log", "rand 0.4.6", "serde_json", "tempfile", @@ -983,9 +977,9 @@ checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "lightning" -version = "0.0.116" +version = "0.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90a0f2155316f1570446a0447c993480673f840748c8ed25bbc59dfc442ac770" +checksum = "cb0cb32c79c42444453bfabed7bd4f4ef64dfdd2e35e191fa8f3b9099ad2a186" dependencies = [ "bitcoin 0.29.2", ] @@ -993,7 +987,7 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.118" -source = "git+https://github.com/lightningdevkit/rust-lightning?rev=caafcedf3fc40fc6253261218c25b254dd955a82#caafcedf3fc40fc6253261218c25b254dd955a82" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", "musig2", @@ -1001,72 +995,67 @@ dependencies = [ [[package]] name = "lightning-background-processor" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "398b68a96cceb3c1227504bd5faeb74f26c3233447bc10cc1cb2c67e01b51556" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", "lightning-rapid-gossip-sync", ] [[package]] name = "lightning-block-sync" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d94c276dbe2a777d58ed6ececca96006247a4717c00ac4cdfff62d76852be783" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", "chunked_transfer", - "lightning 0.0.116", + "lightning 0.0.118", "serde_json", + "tokio", ] [[package]] name = "lightning-invoice" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788c0158526ec27a502043c2911ea6ea58fdc656bdf8749484942c07b790d23" +checksum = "6d2380d053673c467c5f0be992cd4685e40d141d075647d74b64aba1b9c0b1ae" dependencies = [ "bech32 0.9.1", "bitcoin 0.29.2", "bitcoin_hashes 0.11.0", - "lightning 0.0.116", + "lightning 0.0.117", "num-traits", "secp256k1 0.24.3", ] [[package]] name = "lightning-net-tokio" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "366c0ae225736cbc03555bd5fb4b44b2e8fe2ca3c868ec53a4b325c38b2ab2bd" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", "tokio", ] [[package]] name = "lightning-persister" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93caaafeb42115b70119619c2420e362cce776670427fc4ced3e6df77b41c0b6" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "libc", - "lightning 0.0.116", - "winapi", + "lightning 0.0.118", + "windows-sys", ] [[package]] name = "lightning-rapid-gossip-sync" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a07af5814234924e623bca499e003fca1864024d5bd984e752230f73a131584" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", ] [[package]] @@ -1095,7 +1084,7 @@ dependencies = [ "configure_me_codegen", "core-rpc", "futures", - "hex 0.4.3", + "hex", "home", "ldk-sample", "lightning 0.0.118", @@ -2220,7 +2209,7 @@ name = "tonic_lnd" version = "0.5.1" source = "git+https://github.com/orbitalturtle/tonic_lnd?branch=update-signer-client#de989089fdb23f87d3e6bc4796c504bda9b9be9b" dependencies = [ - "hex 0.4.3", + "hex", "prost 0.9.0", "rustls 0.19.1", "rustls-pemfile", diff --git a/Cargo.toml b/Cargo.toml index e3a0946a..4ab98ba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ bitcoin = { version = "0.29.2", features = ["rand"] } clap = { version = "4.4.6", features = ["derive"] } futures = "0.3.26" home = "0.5.5" -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "caafcedf3fc40fc6253261218c25b254dd955a82" } +lightning = { git = "https://github.com/orbitalturtle/rust-lightning", branch = "v0.0.118-custom", features = ["max_level_trace"] } rand_chacha = "0.3.1" rand_core = "0.6.4" log = "0.4.17" @@ -35,7 +35,7 @@ triggered = "0.1.2" bitcoincore-rpc = { package="core-rpc", version = "0.17.0" } bitcoind = { version = "0.30.0", features = [ "22_0" ] } chrono = { version = "0.4.26" } -ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "onion-handler" } +ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "offers" } mockall = "0.11.3" tempfile = "3.5.0" From 3f9982fe64a14f28364c8bbfb977b6380d20039b Mon Sep 17 00:00:00 2001 From: Orbital Date: Fri, 17 Nov 2023 13:15:06 -0600 Subject: [PATCH 09/24] Add tokio clock default to satisfy clippy --- src/clock.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/clock.rs b/src/clock.rs index 9f27c4f9..173c3db1 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -18,3 +18,9 @@ impl Clock for TokioClock { Instant::now() } } + +impl Default for TokioClock { + fn default() -> Self { + Self::new() + } +} From 26b039fd37d5fa63e2ed27dafc32e4ff581039d6 Mon Sep 17 00:00:00 2001 From: Orbital Date: Fri, 19 Jan 2024 16:57:55 -0600 Subject: [PATCH 10/24] utils: add Default for MessengerUtilities to satisfy clippy --- src/onion_messenger.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index d83605cb..e5572e96 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -65,6 +65,12 @@ impl MessengerUtilities { } } +impl Default for MessengerUtilities { + fn default() -> Self { + Self::new() + } +} + impl EntropySource for MessengerUtilities { // TODO: surface LDK's EntropySource and use instead. fn get_secure_random_bytes(&self) -> [u8; 32] { From fbd5ae1614910a2819e1926fa7ee7e32d55d3731 Mon Sep 17 00:00:00 2001 From: Orbital Date: Thu, 25 Jan 2024 17:06:25 -0600 Subject: [PATCH 11/24] main: ignore unused imports from configure_me With Rust 1.75.0, we get an unused import error from configure_me, which we can't change on our end. We ignore this warning so the CI runs properly. --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index fd7f50a0..c9eaff50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#[allow(unused_imports)] mod internal { #![allow(clippy::enum_variant_names)] #![allow(clippy::unnecessary_lazy_evaluations)] From 6de424f2a258bda0c11780fdcb9db98d1703ef56 Mon Sep 17 00:00:00 2001 From: Orbital Date: Sun, 19 Nov 2023 12:25:46 -0600 Subject: [PATCH 12/24] tests: specify log level in ldk nodes --- tests/common/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 5d46d977..29224a43 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,6 +7,7 @@ use bitcoind::{get_available_port, BitcoinD, Conf, ConnectParams}; use chrono::Utc; use ldk_sample::config::LdkUserInfo; use ldk_sample::node_api::Node as LdkNode; +use lightning::util::logger::Level; use std::net::SocketAddr; use std::path::PathBuf; use std::process::{Child, Command, Stdio}; @@ -44,6 +45,8 @@ pub async fn setup_test_infrastructure( ldk_peer_listening_port: get_available_port().unwrap(), ldk_announced_node_name: [0; 32], network: Network::Regtest, + log_level: Level::Trace, + node_num: 1, }; let ldk2_config = LdkUserInfo { @@ -56,6 +59,8 @@ pub async fn setup_test_infrastructure( ldk_peer_listening_port: get_available_port().unwrap(), ldk_announced_node_name: [0; 32], network: Network::Regtest, + log_level: Level::Trace, + node_num: 2, }; let ldk1 = ldk_sample::start_ldk(ldk1_config, test_name).await; From be673e6a9c45abe19d15f4c9e99ed4018df89154 Mon Sep 17 00:00:00 2001 From: Orbital Date: Tue, 21 Nov 2023 22:14:52 -0600 Subject: [PATCH 13/24] README: Fix bakemacaroon typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b4d33f2..8313067b 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ Or in a more concrete example: Rather than use the admin.macaroon with unrestricted permission to an `LND` node, we can bake a macaroon using lncli with much more specific permissions for better security. With this command, generate a macaroon which will give `LNDK` only the specific grpc endpoints it's designed to hit: ``` -lncli --save_to=/lndk.macaroon uri:/lnrpc.Lightning/GetInfo uri:/lnrpc.Lightning/ListPeers uri:/lnrpc.Lightning/SubscribePeerEvents uri:/lnrpc.Lightning/SendCustomMessage uri:/lnrpc.Lightning/SubscribeCustomMessages uri:/peersrpc.Peers/UpdateNodeAnnouncement uri:/signrpc.Signer/DeriveSharedKey +lncli bakemacaroon --save_to=/lndk.macaroon uri:/lnrpc.Lightning/GetInfo uri:/lnrpc.Lightning/ListPeers uri:/lnrpc.Lightning/SubscribePeerEvents uri:/lnrpc.Lightning/SendCustomMessage uri:/lnrpc.Lightning/SubscribeCustomMessages uri:/peersrpc.Peers/UpdateNodeAnnouncement uri:/signrpc.Signer/DeriveSharedKey ``` ## Security From a469d710c4aab79df16097c72e1cd67ed5197a28 Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 1 Jan 2024 17:19:13 -0600 Subject: [PATCH 14/24] Fix Makefile test typo --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 51d4d7db..aa669da7 100644 --- a/Makefile +++ b/Makefile @@ -14,5 +14,5 @@ itest: @$(call print, "Building lnd for itests.") git submodule update --init --recursive cd lnd/cmd/lnd; $(GO_BUILD) -tags="peersrpc signrpc walletrpc dev" -o $(TMP_DIR)/lndk-tests/bin/lnd-itest$(EXEC_SUFFIX) - $(CARGO_TEST) -- -- test '*' --test-threads=1 --nocapture + $(CARGO_TEST) --test '*' -- --test-threads=1 --nocapture From 8263b9e90cf99c83817e2d696f288b866f28524c Mon Sep 17 00:00:00 2001 From: Orbital Date: Thu, 4 Jan 2024 21:44:09 -0600 Subject: [PATCH 15/24] Export MessengerUtilities for integration tests --- src/lib.rs | 2 +- src/onion_messenger.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 44916fbc..9139594d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ mod clock; #[allow(dead_code)] pub mod lnd; pub mod lndk_offers; -mod onion_messenger; +pub mod onion_messenger; mod rate_limit; use crate::lnd::{ diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index e5572e96..6867eda8 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -53,12 +53,12 @@ const DEFAULT_CALL_FREQUENCY: Duration = Duration::from_secs(1); /// A refcell is used for entropy_source to provide interior mutability for ChaCha20Rng. We need a mutable reference /// to be able to use the chacha library’s fill_bytes method, but the EntropySource interface in LDK is for an /// immutable reference. -pub(crate) struct MessengerUtilities { +pub struct MessengerUtilities { entropy_source: RefCell, } impl MessengerUtilities { - pub(crate) fn new() -> Self { + pub fn new() -> Self { MessengerUtilities { entropy_source: RefCell::new(ChaCha20Rng::from_entropy()), } From 780d0fbf349aaf953d374760dfbca7d8f71e25c2 Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 1 Jan 2024 19:10:54 -0600 Subject: [PATCH 16/24] offers: add logic for connecting to the introduction node peer --- src/lnd.rs | 9 ++ src/lndk_offers.rs | 198 +++++++++++++++++++++++++++++++++++++++++++- tests/common/mod.rs | 28 +++++++ 3 files changed, 233 insertions(+), 2 deletions(-) diff --git a/src/lnd.rs b/src/lnd.rs index 277b2161..58a96d7d 100644 --- a/src/lnd.rs +++ b/src/lnd.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt; use std::path::PathBuf; +use tonic_lnd::lnrpc::{LightningNode, ListPeersResponse}; use tonic_lnd::signrpc::KeyLocator; use tonic_lnd::tonic::Status; use tonic_lnd::{Client, ConnectError}; @@ -194,3 +195,11 @@ pub(crate) trait MessageSigner { tag: String, ) -> Result, Status>; } + +/// PeerConnector provides a layer of abstraction over the LND API for connecting to a peer. +#[async_trait] +pub trait PeerConnector { + async fn list_peers(&mut self) -> Result; + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status>; + async fn get_node_info(&mut self, pub_key: String) -> Result, Status>; +} diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 02017887..3dc29c30 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -1,4 +1,4 @@ -use crate::lnd::MessageSigner; +use crate::lnd::{MessageSigner, PeerConnector}; use crate::OfferHandler; use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; @@ -13,19 +13,24 @@ use lightning::offers::parse::{Bolt12ParseError, Bolt12SemanticError}; use std::error::Error; use std::fmt::Display; use tokio::task; +use tonic_lnd::lnrpc::{LightningNode, ListPeersRequest, ListPeersResponse}; use tonic_lnd::signrpc::{KeyLocator, SignMessageReq}; use tonic_lnd::tonic::Status; use tonic_lnd::Client; #[derive(Debug)] /// OfferError is an error that occurs during the process of paying an offer. -pub(crate) enum OfferError { +pub enum OfferError { /// BuildUIRFailure indicates a failure to build the unsigned invoice request. BuildUIRFailure(Bolt12SemanticError), /// SignError indicates a failure to sign the invoice request. SignError(SignError), /// DeriveKeyFailure indicates a failure to derive key for signing the invoice request. DeriveKeyFailure(Status), + /// Unable to connect to peer. + PeerConnectError(Status), + /// No node address. + NodeAddressNotFound, } impl Display for OfferError { @@ -34,6 +39,8 @@ impl Display for OfferError { OfferError::BuildUIRFailure(e) => write!(f, "Error building invoice request: {e:?}"), OfferError::SignError(e) => write!(f, "Error signing invoice request: {e:?}"), OfferError::DeriveKeyFailure(e) => write!(f, "Error signing invoice request: {e:?}"), + OfferError::PeerConnectError(e) => write!(f, "Error connecting to peer: {e:?}"), + OfferError::NodeAddressNotFound => write!(f, "Couldn't get node address"), } } } @@ -104,6 +111,89 @@ impl OfferHandler { } } +// connect_to_peer connects to the provided node if we're not already connected. +pub async fn connect_to_peer( + mut connector: impl PeerConnector, + node_id: PublicKey, +) -> Result<(), OfferError> { + let resp = connector + .list_peers() + .await + .map_err(OfferError::PeerConnectError)?; + + let node_id_str = node_id.to_string(); + for peer in resp.peers.iter() { + if peer.pub_key == node_id_str { + return Ok(()); + } + } + + let node = connector + .get_node_info(node_id_str.clone()) + .await + .map_err(OfferError::PeerConnectError)?; + + let node = match node { + Some(node) => node, + None => return Err(OfferError::NodeAddressNotFound), + }; + + if node.addresses.is_empty() { + return Err(OfferError::NodeAddressNotFound); + } + + connector + .connect_peer(node_id_str, node.addresses[0].clone().addr) + .await + .map_err(OfferError::PeerConnectError)?; + + Ok(()) +} + +#[async_trait] +impl PeerConnector for Client { + async fn list_peers(&mut self) -> Result { + let list_req = ListPeersRequest { + ..Default::default() + }; + + self.lightning() + .list_peers(list_req) + .await + .map(|resp| resp.into_inner()) + } + + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status> { + let ln_addr = tonic_lnd::lnrpc::LightningAddress { + pubkey: node_id, + host: addr, + }; + + let connect_req = tonic_lnd::lnrpc::ConnectPeerRequest { + addr: Some(ln_addr), + timeout: 20, + ..Default::default() + }; + + self.lightning() + .connect_peer(connect_req.clone()) + .await + .map(|_| ()) + } + + async fn get_node_info(&mut self, pub_key: String) -> Result, Status> { + let req = tonic_lnd::lnrpc::NodeInfoRequest { + pub_key, + include_channels: false, + }; + + self.lightning() + .get_node_info(req) + .await + .map(|resp| resp.into_inner().node) + } +} + #[async_trait] impl MessageSigner for Client { async fn derive_key(&mut self, key_loc: KeyLocator) -> Result, Status> { @@ -140,6 +230,7 @@ mod tests { use super::*; use mockall::mock; use std::str::FromStr; + use tonic_lnd::lnrpc::NodeAddress; fn get_offer() -> String { "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrcgqgn3qzsyvfkx26qkyypvr5hfx60h9w9k934lt8s2n6zc0wwtgqlulw7dythr83dqx8tzumg".to_string() @@ -163,6 +254,17 @@ mod tests { } } + mock! { + TestPeerConnector{} + + #[async_trait] + impl PeerConnector for TestPeerConnector { + async fn list_peers(&mut self) -> Result; + async fn get_node_info(&mut self, pub_key: String) -> Result, Status>; + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status>; + } + } + #[tokio::test] async fn test_request_invoice() { let mut signer_mock = MockTestBolt12Signer::new(); @@ -234,4 +336,96 @@ mod tests { .await .is_err()) } + + #[tokio::test] + async fn test_connect_peer() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + Ok(ListPeersResponse { + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + connector_mock + .expect_connect_peer() + .returning(|_, _| Ok(())); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_ok()); + } + + #[tokio::test] + async fn test_connect_peer_already_connected() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + let peer = tonic_lnd::lnrpc::Peer { + pub_key: get_pubkey(), + ..Default::default() + }; + + Ok(ListPeersResponse { + peers: vec![peer], + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_ok()); + } + + #[tokio::test] + async fn test_connect_peer_connect_error() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + Ok(ListPeersResponse { + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + connector_mock + .expect_connect_peer() + .returning(|_, _| Err(Status::unknown(""))); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_err()); + } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 29224a43..bafdd82e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -326,4 +326,32 @@ impl LndNode { resp } + + // disconnect_peer disconnects the specified peer. + #[allow(dead_code)] + pub async fn disconnect_peer( + &mut self, + node_id: PublicKey, + ) -> tonic_lnd::lnrpc::DisconnectPeerResponse { + let disconnect_req = tonic_lnd::lnrpc::DisconnectPeerRequest { + pub_key: node_id.to_string(), + ..Default::default() + }; + + let resp = if let Some(client) = self.client.clone() { + let make_request = || async { + client + .clone() + .lightning() + .disconnect_peer(disconnect_req.clone()) + .await + }; + let resp = test_utils::retry_async(make_request, String::from("disconnect_peer")); + resp.await.unwrap() + } else { + panic!("No client") + }; + + resp + } } From ac957e40a98f1a55161fc7f45e67c1af9fd1587e Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 1 Jan 2024 18:37:45 -0600 Subject: [PATCH 17/24] offers: validate offer amount user input --- src/lndk_offers.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-) diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 3dc29c30..8cf0ebd8 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -8,7 +8,7 @@ use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey}; use futures::executor::block_on; use lightning::offers::invoice_request::{InvoiceRequest, UnsignedInvoiceRequest}; use lightning::offers::merkle::SignError; -use lightning::offers::offer::Offer; +use lightning::offers::offer::{Amount, Offer}; use lightning::offers::parse::{Bolt12ParseError, Bolt12SemanticError}; use std::error::Error; use std::fmt::Display; @@ -27,6 +27,10 @@ pub enum OfferError { SignError(SignError), /// DeriveKeyFailure indicates a failure to derive key for signing the invoice request. DeriveKeyFailure(Status), + /// User provided an invalid amount. + InvalidAmount(String), + /// Invalid currency contained in the offer. + InvalidCurrency, /// Unable to connect to peer. PeerConnectError(Status), /// No node address. @@ -39,6 +43,11 @@ impl Display for OfferError { OfferError::BuildUIRFailure(e) => write!(f, "Error building invoice request: {e:?}"), OfferError::SignError(e) => write!(f, "Error signing invoice request: {e:?}"), OfferError::DeriveKeyFailure(e) => write!(f, "Error signing invoice request: {e:?}"), + OfferError::InvalidAmount(e) => write!(f, "User provided an invalid amount: {e:?}"), + OfferError::InvalidCurrency => write!( + f, + "LNDK doesn't yet support offer currencies other than bitcoin" + ), OfferError::PeerConnectError(e) => write!(f, "Error connecting to peer: {e:?}"), OfferError::NodeAddressNotFound => write!(f, "Couldn't get node address"), } @@ -111,6 +120,53 @@ impl OfferHandler { } } +// Checks that the user-provided amount matches the offer. +pub async fn validate_amount( + offer: &Offer, + amount_msats: Option, +) -> Result> { + let validated_amount = match offer.amount() { + Some(offer_amount) => { + match *offer_amount { + Amount::Bitcoin { + amount_msats: bitcoin_amt, + } => { + if let Some(msats) = amount_msats { + if msats < bitcoin_amt { + return Err(OfferError::InvalidAmount(format!( + "{msats} is less than offer amount {}", + bitcoin_amt + ))); + } + msats + } else { + // If user didn't set amount, set it to the offer amount. + if bitcoin_amt == 0 { + return Err(OfferError::InvalidAmount( + "Offer doesn't set an amount, so user must specify one".to_string(), + )); + } + bitcoin_amt + } + } + _ => { + return Err(OfferError::InvalidCurrency); + } + } + } + None => { + if let Some(msats) = amount_msats { + msats + } else { + return Err(OfferError::InvalidAmount( + "Offer doesn't set an amount, so user must specify one".to_string(), + )); + } + } + }; + Ok(validated_amount) +} + // connect_to_peer connects to the provided node if we're not already connected. pub async fn connect_to_peer( mut connector: impl PeerConnector, @@ -228,14 +284,32 @@ impl MessageSigner for Client { #[cfg(test)] mod tests { use super::*; + use bitcoin::secp256k1::{KeyPair, Secp256k1, SecretKey}; + use lightning::offers::offer::{OfferBuilder, Quantity}; use mockall::mock; use std::str::FromStr; + use std::time::{Duration, SystemTime}; use tonic_lnd::lnrpc::NodeAddress; fn get_offer() -> String { "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrcgqgn3qzsyvfkx26qkyypvr5hfx60h9w9k934lt8s2n6zc0wwtgqlulw7dythr83dqx8tzumg".to_string() } + fn build_custom_offer(amount_msats: u64) -> Offer { + let secp_ctx = Secp256k1::new(); + let keys = KeyPair::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let pubkey = PublicKey::from(keys); + + let expiration = SystemTime::now() + Duration::from_secs(24 * 60 * 60); + OfferBuilder::new("coffee".to_string(), pubkey) + .amount_msats(amount_msats) + .supported_quantity(Quantity::Unbounded) + .absolute_expiry(expiration.duration_since(SystemTime::UNIX_EPOCH).unwrap()) + .issuer("Foo Bar".to_string()) + .build() + .unwrap() + } + fn get_pubkey() -> String { "0313ba7ccbd754c117962b9afab6c2870eb3ef43f364a9f6c43d0fabb4553776ba".to_string() } @@ -337,6 +411,28 @@ mod tests { .is_err()) } + #[tokio::test] + async fn test_validate_amount() { + // If the amount the user provided is greater than the offer-provided amount, then + // we should be good. + let offer = build_custom_offer(20000); + assert!(validate_amount(&offer, Some(20000)).await.is_ok()); + + let offer = build_custom_offer(0); + assert!(validate_amount(&offer, Some(20000)).await.is_ok()); + } + + #[tokio::test] + async fn test_validate_invalid_amount() { + // If the amount the user provided is lower than the offer amount, we error. + let offer = build_custom_offer(20000); + assert!(validate_amount(&offer, Some(1000)).await.is_err()); + + // Both user amount and offer amount can't be 0. + let offer = build_custom_offer(0); + assert!(validate_amount(&offer, None).await.is_err()); + } + #[tokio::test] async fn test_connect_peer() { let mut connector_mock = MockTestPeerConnector::new(); From f97bd3d000615a94e5d8559c997b1248b24258bc Mon Sep 17 00:00:00 2001 From: Orbital Date: Sun, 21 Jan 2024 21:17:17 -0600 Subject: [PATCH 18/24] offers: wait for onion messenger ready signal before sending request Before we can send an invoice request, LNDK's onion messenger needs to be fully started and running. We use a channel close to signal when the messenger is ready. --- src/lib.rs | 3 +++ src/main.rs | 10 +++++++++- src/onion_messenger.rs | 3 +++ tests/integration_tests.rs | 4 ++++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 9139594d..f685283f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ use log4rs::encode::pattern::PatternEncoder; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Mutex, Once}; +use tokio::sync::mpsc::Sender; use tonic_lnd::lnrpc::GetInfoRequest; use triggered::{Listener, Trigger}; @@ -40,6 +41,8 @@ pub struct LifecycleSignals { pub shutdown: Trigger, // Used to listen for the signal to shutdown. pub listener: Listener, + // Used to signal when the onion messenger has started up. + pub started: Sender, } pub fn init_logger(config: LogConfig) { diff --git a/src/main.rs b/src/main.rs index c9eaff50..a98fb46e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ mod internal { use internal::*; use lndk::lnd::LndCfg; use lndk::{Cfg, LifecycleSignals, LndkOnionMessenger, OfferHandler}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; #[macro_use] extern crate configure_me; @@ -24,7 +26,13 @@ async fn main() -> Result<(), ()> { let lnd_args = LndCfg::new(config.address, config.cert, config.macaroon); let (shutdown, listener) = triggered::trigger(); - let signals = LifecycleSignals { shutdown, listener }; + // Create the channel which will tell us when the onion messenger has finished starting up. + let (tx, _): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown, + listener, + started: tx, + }; let args = Cfg { lnd: lnd_args, log_dir: config.log_dir, diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index 6867eda8..283bde66 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -217,6 +217,9 @@ impl LndkOnionMessenger { } }); + // By dropping the sender, we signal to the receiver that the onion messenger has successfully started up. + drop(signals.started); + // Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to // allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread. // This function is expected to finish if any producing thread exits (because we're no longer receiving the diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 0bbe2ba0..1af8b77c 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -8,6 +8,8 @@ use lndk::LifecycleSignals; use std::path::PathBuf; use std::str::FromStr; use tokio::select; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{sleep, timeout, Duration}; async fn wait_to_receive_onion_message( @@ -68,9 +70,11 @@ async fn test_lndk_forwards_onion_message() { ); let now_timestamp = Utc::now(); let timestamp = now_timestamp.format("%d-%m-%Y-%H%M"); + let (tx, _): (Sender, Receiver) = mpsc::channel(1); let signals = LifecycleSignals { shutdown: shutdown.clone(), listener, + started: tx, }; let lndk_cfg = lndk::Cfg { lnd: lnd_cfg, From 3b09ec2bc7fb3696a71d38ecc9df5ff79cb8deb9 Mon Sep 17 00:00:00 2001 From: Orbital Date: Sun, 28 Jan 2024 22:49:10 -0600 Subject: [PATCH 19/24] offers: Build a reply path for invoice request --- src/lib.rs | 2 + src/lndk_offers.rs | 200 ++++++++++++++++++++++++++------------------- 2 files changed, 120 insertions(+), 82 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f685283f..ad2250cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -177,6 +177,7 @@ enum OfferState { pub struct OfferHandler { _active_offers: Mutex>, pending_messages: Mutex>>, + messenger_utils: MessengerUtilities, } impl OfferHandler { @@ -184,6 +185,7 @@ impl OfferHandler { OfferHandler { _active_offers: Mutex::new(HashMap::new()), pending_messages: Mutex::new(Vec::new()), + messenger_utils: MessengerUtilities::new(), } } } diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 8cf0ebd8..5da9f490 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -1,17 +1,20 @@ -use crate::lnd::{MessageSigner, PeerConnector}; +use crate::lnd::{features_support_onion_messages, MessageSigner, PeerConnector}; use crate::OfferHandler; use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; use bitcoin::network::constants::Network; use bitcoin::secp256k1::schnorr::Signature; -use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey}; +use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey, Secp256k1}; use futures::executor::block_on; +use lightning::blinded_path::BlindedPath; use lightning::offers::invoice_request::{InvoiceRequest, UnsignedInvoiceRequest}; use lightning::offers::merkle::SignError; use lightning::offers::offer::{Amount, Offer}; use lightning::offers::parse::{Bolt12ParseError, Bolt12SemanticError}; +use log::error; use std::error::Error; use std::fmt::Display; +use std::str::FromStr; use tokio::task; use tonic_lnd::lnrpc::{LightningNode, ListPeersRequest, ListPeersResponse}; use tonic_lnd::signrpc::{KeyLocator, SignMessageReq}; @@ -35,6 +38,10 @@ pub enum OfferError { PeerConnectError(Status), /// No node address. NodeAddressNotFound, + /// Cannot list peers. + ListPeersFailure(Status), + /// Failure to build a reply path. + BuildBlindedPathFailure, } impl Display for OfferError { @@ -50,6 +57,8 @@ impl Display for OfferError { ), OfferError::PeerConnectError(e) => write!(f, "Error connecting to peer: {e:?}"), OfferError::NodeAddressNotFound => write!(f, "Couldn't get node address"), + OfferError::ListPeersFailure(e) => write!(f, "Error listing peers: {e:?}"), + OfferError::BuildBlindedPathFailure => write!(f, "Error building blinded path"), } } } @@ -118,6 +127,52 @@ impl OfferHandler { .await .unwrap() } + + /// create_reply_path creates a blinded path to provide to the offer maker when requesting an + /// invoice so they know where to send the invoice back to. We try to find a peer that we're + /// connected to with onion messaging support that we can use to form a blinded path, + /// otherwise we creae a blinded path directly to ourselves. + pub async fn create_reply_path( + &self, + mut connector: impl PeerConnector + std::marker::Send + 'static, + node_id: PublicKey, + ) -> Result> { + // Find an introduction node for our blinded path. + let current_peers = connector.list_peers().await.map_err(|e| { + error!("Could not lookup current peers: {e}."); + OfferError::ListPeersFailure(e) + })?; + + let mut intro_node = None; + for peer in current_peers.peers { + let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); + let onion_support = features_support_onion_messages(&peer.features); + if onion_support { + intro_node = Some(pubkey); + } + } + + let secp_ctx = Secp256k1::new(); + if intro_node.is_none() { + Ok( + BlindedPath::one_hop_for_message(node_id, &self.messenger_utils, &secp_ctx) + .map_err(|_| { + error!("Could not create blinded path."); + OfferError::BuildBlindedPathFailure + })?, + ) + } else { + Ok(BlindedPath::new_for_message( + &[intro_node.unwrap()], + &self.messenger_utils, + &secp_ctx, + ) + .map_err(|_| { + error!("Could not create blinded path."); + OfferError::BuildBlindedPathFailure + }))? + } + } } // Checks that the user-provided amount matches the offer. @@ -287,14 +342,11 @@ mod tests { use bitcoin::secp256k1::{KeyPair, Secp256k1, SecretKey}; use lightning::offers::offer::{OfferBuilder, Quantity}; use mockall::mock; + use std::collections::HashMap; use std::str::FromStr; use std::time::{Duration, SystemTime}; use tonic_lnd::lnrpc::NodeAddress; - fn get_offer() -> String { - "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrcgqgn3qzsyvfkx26qkyypvr5hfx60h9w9k934lt8s2n6zc0wwtgqlulw7dythr83dqx8tzumg".to_string() - } - fn build_custom_offer(amount_msats: u64) -> Offer { let secp_ctx = Secp256k1::new(); let keys = KeyPair::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); @@ -314,10 +366,6 @@ mod tests { "0313ba7ccbd754c117962b9afab6c2870eb3ef43f364a9f6c43d0fabb4553776ba".to_string() } - fn get_signature() -> String { - "28b937976a29c15827433086440b36c2bec6ca5bd977557972dca8641cd59ffba50daafb8ee99a19c950976b46f47d9e7aa716652e5657dfc555b82eff467f18".to_string() - } - mock! { TestBolt12Signer{} @@ -339,78 +387,6 @@ mod tests { } } - #[tokio::test] - async fn test_request_invoice() { - let mut signer_mock = MockTestBolt12Signer::new(); - - signer_mock.expect_derive_key().returning(|_| { - Ok(PublicKey::from_str(&get_pubkey()) - .unwrap() - .serialize() - .to_vec()) - }); - - signer_mock.expect_sign_message().returning(|_, _, _| { - Ok(Signature::from_str(&get_signature()) - .unwrap() - .as_ref() - .to_vec()) - }); - - let offer = decode(get_offer()).unwrap(); - let handler = OfferHandler::new(); - assert!(handler - .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_ok()) - } - - #[tokio::test] - async fn test_request_invoice_derive_key_error() { - let mut signer_mock = MockTestBolt12Signer::new(); - - signer_mock - .expect_derive_key() - .returning(|_| Err(Status::unknown("error testing"))); - - signer_mock.expect_sign_message().returning(|_, _, _| { - Ok(Signature::from_str(&get_signature()) - .unwrap() - .as_ref() - .to_vec()) - }); - - let offer = decode(get_offer()).unwrap(); - let handler = OfferHandler::new(); - assert!(handler - .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err()) - } - - #[tokio::test] - async fn test_request_invoice_signer_error() { - let mut signer_mock = MockTestBolt12Signer::new(); - - signer_mock.expect_derive_key().returning(|_| { - Ok(PublicKey::from_str(&get_pubkey()) - .unwrap() - .serialize() - .to_vec()) - }); - - signer_mock - .expect_sign_message() - .returning(|_, _, _| Err(Status::unknown("error testing"))); - - let offer = decode(get_offer()).unwrap(); - let handler = OfferHandler::new(); - assert!(handler - .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err()) - } - #[tokio::test] async fn test_validate_amount() { // If the amount the user provided is greater than the offer-provided amount, then @@ -524,4 +500,64 @@ mod tests { let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); assert!(connect_to_peer(connector_mock, pubkey).await.is_err()); } + + #[tokio::test] + async fn test_create_reply_path() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock.expect_list_peers().returning(|| { + let feature = tonic_lnd::lnrpc::Feature { + ..Default::default() + }; + let mut feature_entry = HashMap::new(); + feature_entry.insert(38, feature); + + let peer = tonic_lnd::lnrpc::Peer { + pub_key: get_pubkey(), + features: feature_entry, + ..Default::default() + }; + Ok(ListPeersResponse { peers: vec![peer] }) + }); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_ok()) + } + + #[tokio::test] + // Test that create_reply_path works fine when no suitable introduction node peer is found. + async fn test_create_reply_path_no_intro_node() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock + .expect_list_peers() + .returning(|| Ok(ListPeersResponse { peers: vec![] })); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_ok()) + } + + #[tokio::test] + async fn test_create_reply_path_list_peers_error() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock + .expect_list_peers() + .returning(|| Err(Status::unknown("unknown error"))); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_err()) + } } From df50a92166b54c6a2f8952eda8f626284b25f18f Mon Sep 17 00:00:00 2001 From: Orbital Date: Wed, 31 Jan 2024 17:05:50 -0600 Subject: [PATCH 20/24] itests: add lnd API calls needed to set up channels --- tests/common/mod.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index bafdd82e..3db7e1df 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -14,7 +14,7 @@ use std::process::{Child, Command, Stdio}; use std::thread; use std::{env, fs}; use tempfile::{tempdir, Builder, TempDir}; -use tokio::time::Duration; +use tokio::time::{sleep, timeout, Duration}; use tonic_lnd::lnrpc::GetInfoRequest; use tonic_lnd::Client; @@ -354,4 +354,66 @@ impl LndNode { resp } + + // Create an on-chain bitcoin address to fund our LND node. + pub async fn new_address(&mut self) -> tonic_lnd::lnrpc::NewAddressResponse { + let addr_req = tonic_lnd::lnrpc::NewAddressRequest { + r#type: 4, // 4 is the TAPROOT_PUBKEY type. + ..Default::default() + }; + + let resp = if let Some(client) = self.client.clone() { + let make_request = || async { + client + .clone() + .lightning() + .new_address(addr_req.clone()) + .await + }; + let resp = test_utils::retry_async(make_request, String::from("new_address")); + resp.await.unwrap() + } else { + panic!("No client") + }; + + resp + } + + // wait_for_chain_sync waits until we're synced to chain according to the get_info response. + // We'll timeout if it takes too long. + pub async fn wait_for_chain_sync(&mut self) { + match timeout(Duration::from_secs(100), self.check_chain_sync()).await { + Err(_) => panic!("timeout before lnd synced to chain"), + _ => {} + }; + } + + pub async fn check_chain_sync(&mut self) { + loop { + let resp = self.get_info().await; + if resp.synced_to_chain { + return; + } + sleep(Duration::from_secs(2)).await; + } + } + + // wait_for_lnd_sync waits until we're synced to graph according to the get_info response. + // We'll timeout if it takes too long. + pub async fn wait_for_graph_sync(&mut self) { + match timeout(Duration::from_secs(100), self.check_graph_sync()).await { + Err(_) => panic!("timeout before lnd synced to graph"), + _ => {} + }; + } + + pub async fn check_graph_sync(&mut self) { + loop { + let resp = self.get_info().await; + if resp.synced_to_graph { + return; + } + sleep(Duration::from_secs(2)).await; + } + } } From 491e42eaf82d8992dfee5809f9f36def87238266 Mon Sep 17 00:00:00 2001 From: Orbital Date: Thu, 11 Jan 2024 21:24:46 -0600 Subject: [PATCH 21/24] itests: export bitcoind for tests --- tests/common/mod.rs | 26 +------------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 3db7e1df..91f3912c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -109,7 +109,7 @@ fn setup_test_dirs(test_name: &str) -> (PathBuf, PathBuf, PathBuf) { // BitcoindNode holds the tools we need to interact with a Bitcoind node. pub struct BitcoindNode { - node: BitcoinD, + pub node: BitcoinD, _data_dir: TempDir, zmq_block_port: u16, zmq_tx_port: u16, @@ -355,30 +355,6 @@ impl LndNode { resp } - // Create an on-chain bitcoin address to fund our LND node. - pub async fn new_address(&mut self) -> tonic_lnd::lnrpc::NewAddressResponse { - let addr_req = tonic_lnd::lnrpc::NewAddressRequest { - r#type: 4, // 4 is the TAPROOT_PUBKEY type. - ..Default::default() - }; - - let resp = if let Some(client) = self.client.clone() { - let make_request = || async { - client - .clone() - .lightning() - .new_address(addr_req.clone()) - .await - }; - let resp = test_utils::retry_async(make_request, String::from("new_address")); - resp.await.unwrap() - } else { - panic!("No client") - }; - - resp - } - // wait_for_chain_sync waits until we're synced to chain according to the get_info response. // We'll timeout if it takes too long. pub async fn wait_for_chain_sync(&mut self) { From 8bfb006d20fe472ec8f63421c8c2c42cae9d787e Mon Sep 17 00:00:00 2001 From: Orbital Date: Wed, 31 Jan 2024 17:08:20 -0600 Subject: [PATCH 22/24] itests: bump ldk-sample to newer version We bump ldk-sample to a version that can open channels and broadcasts the node announcement at a more frequent interval so we can pull ldk's address from the node graph in our integration tests. --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4ff5fc9..1ed7b67f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,7 +947,7 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "ldk-sample" version = "0.1.0" -source = "git+https://github.com/lndk-org/ldk-sample?branch=offers#848e562f0ef57c95e015454cc2c00bca1e315e29" +source = "git+https://github.com/lndk-org/ldk-sample?branch=change-node-announcement-interval#d24839740b44048568341ca37e289557eff52fd9" dependencies = [ "base64 0.13.1", "bech32 0.8.1", diff --git a/Cargo.toml b/Cargo.toml index 4ab98ba7..beb2e402 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ triggered = "0.1.2" bitcoincore-rpc = { package="core-rpc", version = "0.17.0" } bitcoind = { version = "0.30.0", features = [ "22_0" ] } chrono = { version = "0.4.26" } -ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "offers" } +ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "change-node-announcement-interval" } mockall = "0.11.3" tempfile = "3.5.0" From 8af07f48b6ac2e1edf1259a51dfff0c7af5f5358 Mon Sep 17 00:00:00 2001 From: Orbital Date: Tue, 6 Feb 2024 01:52:41 -0600 Subject: [PATCH 23/24] itests: advertise ldk node address --- tests/common/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 91f3912c..7b65abf1 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -8,7 +8,7 @@ use chrono::Utc; use ldk_sample::config::LdkUserInfo; use ldk_sample::node_api::Node as LdkNode; use lightning::util::logger::Level; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::process::{Child, Command, Stdio}; use std::thread; @@ -35,28 +35,32 @@ pub async fn setup_test_infrastructure( let connect_params = bitcoind.node.params.get_cookie_values().unwrap(); + let port = get_available_port().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let ldk1_config = LdkUserInfo { bitcoind_rpc_username: connect_params.0.clone().unwrap(), bitcoind_rpc_password: connect_params.1.clone().unwrap(), bitcoind_rpc_host: String::from("localhost"), bitcoind_rpc_port: bitcoind.node.params.rpc_socket.port(), ldk_data_dir: ldk_test_dir.clone(), - ldk_announced_listen_addr: Vec::new(), - ldk_peer_listening_port: get_available_port().unwrap(), + ldk_announced_listen_addr: vec![addr.into()], + ldk_peer_listening_port: port, ldk_announced_node_name: [0; 32], network: Network::Regtest, log_level: Level::Trace, node_num: 1, }; + let port = get_available_port().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let ldk2_config = LdkUserInfo { bitcoind_rpc_username: connect_params.0.unwrap(), bitcoind_rpc_password: connect_params.1.unwrap(), bitcoind_rpc_host: String::from("localhost"), bitcoind_rpc_port: bitcoind.node.params.rpc_socket.port(), ldk_data_dir: ldk_test_dir, - ldk_announced_listen_addr: Vec::new(), - ldk_peer_listening_port: get_available_port().unwrap(), + ldk_announced_listen_addr: vec![addr.into()], + ldk_peer_listening_port: port, ldk_announced_node_name: [0; 32], network: Network::Regtest, log_level: Level::Trace, From b85cad4d8cadf80282dd87dc2a84bffc8e91ebdb Mon Sep 17 00:00:00 2001 From: Orbital Date: Mon, 1 Jan 2024 18:40:40 -0600 Subject: [PATCH 24/24] offers: send invoice request --- src/lib.rs | 39 ++++++-- src/lnd.rs | 3 +- src/lndk_offers.rs | 172 ++++++++++++++++++++++++++++++- tests/common/mod.rs | 2 +- tests/integration_tests.rs | 200 ++++++++++++++++++++++++++++++++++++- 5 files changed, 400 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ad2250cb..4286b634 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,13 +8,17 @@ mod rate_limit; use crate::lnd::{ features_support_onion_messages, get_lnd_client, string_to_network, LndCfg, LndNodeSigner, }; - +use crate::lndk_offers::OfferError; use crate::onion_messenger::MessengerUtilities; -use bitcoin::secp256k1::PublicKey; +use bitcoin::network::constants::Network; +use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey}; use home::home_dir; +use lightning::blinded_path::BlindedPath; use lightning::ln::peer_handler::IgnoringMessageHandler; +use lightning::offers::offer::Offer; use lightning::onion_message::{ - DefaultMessageRouter, OffersMessage, OffersMessageHandler, OnionMessenger, PendingOnionMessage, + DefaultMessageRouter, Destination, OffersMessage, OffersMessageHandler, OnionMessenger, + PendingOnionMessage, }; use log::{error, info, LevelFilter}; use log4rs::append::console::ConsoleAppender; @@ -24,8 +28,9 @@ use log4rs::encode::pattern::PatternEncoder; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Mutex, Once}; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{Receiver, Sender}; use tonic_lnd::lnrpc::GetInfoRequest; +use tonic_lnd::Client; use triggered::{Listener, Trigger}; static INIT: Once = Once::new(); @@ -175,19 +180,41 @@ enum OfferState { } pub struct OfferHandler { - _active_offers: Mutex>, + active_offers: Mutex>, pending_messages: Mutex>>, messenger_utils: MessengerUtilities, } +#[derive(Clone)] +pub struct PayOfferParams { + pub offer: Offer, + pub amount: Option, + pub network: Network, + pub client: Client, + /// The destination the offer creator provided, which we will use to send the invoice request. + pub destination: Destination, + /// The path we will send back to the offer creator, so it knows where to send back the invoice. + pub reply_path: Option, +} + impl OfferHandler { pub fn new() -> Self { OfferHandler { - _active_offers: Mutex::new(HashMap::new()), + active_offers: Mutex::new(HashMap::new()), pending_messages: Mutex::new(Vec::new()), messenger_utils: MessengerUtilities::new(), } } + + /// Adds an offer to be paid with the amount specified. May only be called once for a single offer. + pub async fn pay_offer( + &self, + cfg: PayOfferParams, + started: Receiver, + ) -> Result<(), OfferError> { + self.send_invoice_request(cfg, started).await?; + Ok(()) + } } impl Default for OfferHandler { diff --git a/src/lnd.rs b/src/lnd.rs index 58a96d7d..1eb6161b 100644 --- a/src/lnd.rs +++ b/src/lnd.rs @@ -29,6 +29,7 @@ pub(crate) fn get_lnd_client(cfg: LndCfg) -> Result { } /// LndCfg specifies the configuration required to connect to LND's grpc client. +#[derive(Clone)] pub struct LndCfg { address: String, cert: PathBuf, @@ -186,7 +187,7 @@ pub(crate) fn string_to_network(network_str: &str) -> Result Result, Status>; async fn sign_message( &mut self, diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 5da9f490..3fa25eb3 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -1,5 +1,5 @@ use crate::lnd::{features_support_onion_messages, MessageSigner, PeerConnector}; -use crate::OfferHandler; +use crate::{OfferHandler, OfferState, PayOfferParams}; use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; use bitcoin::network::constants::Network; @@ -11,12 +11,14 @@ use lightning::offers::invoice_request::{InvoiceRequest, UnsignedInvoiceRequest} use lightning::offers::merkle::SignError; use lightning::offers::offer::{Amount, Offer}; use lightning::offers::parse::{Bolt12ParseError, Bolt12SemanticError}; +use lightning::onion_message::{Destination, OffersMessage, PendingOnionMessage}; use log::error; use std::error::Error; use std::fmt::Display; use std::str::FromStr; +use tokio::sync::mpsc::Receiver; use tokio::task; -use tonic_lnd::lnrpc::{LightningNode, ListPeersRequest, ListPeersResponse}; +use tonic_lnd::lnrpc::{GetInfoRequest, LightningNode, ListPeersRequest, ListPeersResponse}; use tonic_lnd::signrpc::{KeyLocator, SignMessageReq}; use tonic_lnd::tonic::Status; use tonic_lnd::Client; @@ -24,6 +26,8 @@ use tonic_lnd::Client; #[derive(Debug)] /// OfferError is an error that occurs during the process of paying an offer. pub enum OfferError { + /// AlreadyProcessing indicates that we're already in the process of paying an offer. + AlreadyProcessing, /// BuildUIRFailure indicates a failure to build the unsigned invoice request. BuildUIRFailure(Bolt12SemanticError), /// SignError indicates a failure to sign the invoice request. @@ -47,6 +51,9 @@ pub enum OfferError { impl Display for OfferError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + OfferError::AlreadyProcessing => { + write!(f, "LNDK is already trying to pay for provided offer") + } OfferError::BuildUIRFailure(e) => write!(f, "Error building invoice request: {e:?}"), OfferError::SignError(e) => write!(f, "Error signing invoice request: {e:?}"), OfferError::DeriveKeyFailure(e) => write!(f, "Error signing invoice request: {e:?}"), @@ -71,9 +78,76 @@ pub fn decode(offer_str: String) -> Result { } impl OfferHandler { - #[allow(dead_code)] + pub async fn send_invoice_request( + &self, + mut cfg: PayOfferParams, + mut started: Receiver, + ) -> Result<(), OfferError> { + // Wait for onion messenger to give us the signal that it's ready. Once the onion messenger drops + // the channel sender, recv will return None and we'll stop blocking here. + if started.recv().await.is_some() { + error!("Error: we shouldn't receive any messages on this channel"); + } + + let validated_amount = validate_amount(&cfg.offer, cfg.amount).await?; + + // For now we connect directly to the introduction node of the blinded path so we don't need any + // intermediate nodes here. In the future we'll query for a full path to the introduction node for + // better sender privacy. + match cfg.destination { + Destination::Node(pubkey) => connect_to_peer(cfg.client.clone(), pubkey).await?, + Destination::BlindedPath(ref path) => { + connect_to_peer(cfg.client.clone(), path.introduction_node_id).await? + } + }; + + let offer_id = cfg.offer.clone().to_string(); + { + let mut active_offers = self.active_offers.lock().unwrap(); + if active_offers.contains_key(&offer_id.clone()) { + return Err(OfferError::AlreadyProcessing); + } + active_offers.insert(cfg.offer.to_string().clone(), OfferState::OfferAdded); + } + + let invoice_request = self + .create_invoice_request( + cfg.client.clone(), + cfg.offer, + vec![], + cfg.network, + validated_amount, + ) + .await?; + + if cfg.reply_path.is_none() { + let info = cfg + .client + .lightning() + .get_info(GetInfoRequest {}) + .await + .expect("failed to get info") + .into_inner(); + + let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); + cfg.reply_path = Some(self.create_reply_path(cfg.client.clone(), pubkey).await?) + }; + let contents = OffersMessage::InvoiceRequest(invoice_request); + let pending_message = PendingOnionMessage { + contents, + destination: cfg.destination, + reply_path: cfg.reply_path, + }; + + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(pending_message); + std::mem::drop(pending_messages); + + Ok(()) + } + // create_invoice_request builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. - pub(crate) async fn create_invoice_request( + pub async fn create_invoice_request( &self, mut signer: impl MessageSigner + std::marker::Send + 'static, offer: Offer, @@ -163,7 +237,7 @@ impl OfferHandler { ) } else { Ok(BlindedPath::new_for_message( - &[intro_node.unwrap()], + &[intro_node.unwrap(), node_id], &self.messenger_utils, &secp_ctx, ) @@ -222,6 +296,14 @@ pub async fn validate_amount( Ok(validated_amount) } +pub async fn get_destination(offer: &Offer) -> Destination { + if offer.paths().is_empty() { + Destination::Node(offer.signing_pubkey()) + } else { + Destination::BlindedPath(offer.paths()[0].clone()) + } +} + // connect_to_peer connects to the provided node if we're not already connected. pub async fn connect_to_peer( mut connector: impl PeerConnector, @@ -347,6 +429,10 @@ mod tests { use std::time::{Duration, SystemTime}; use tonic_lnd::lnrpc::NodeAddress; + fn get_offer() -> String { + "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrcgqgn3qzsyvfkx26qkyypvr5hfx60h9w9k934lt8s2n6zc0wwtgqlulw7dythr83dqx8tzumg".to_string() + } + fn build_custom_offer(amount_msats: u64) -> Offer { let secp_ctx = Secp256k1::new(); let keys = KeyPair::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); @@ -366,6 +452,10 @@ mod tests { "0313ba7ccbd754c117962b9afab6c2870eb3ef43f364a9f6c43d0fabb4553776ba".to_string() } + fn get_signature() -> String { + "28b937976a29c15827433086440b36c2bec6ca5bd977557972dca8641cd59ffba50daafb8ee99a19c950976b46f47d9e7aa716652e5657dfc555b82eff467f18".to_string() + } + mock! { TestBolt12Signer{} @@ -387,6 +477,78 @@ mod tests { } } + #[tokio::test] + async fn test_request_invoice() { + let mut signer_mock = MockTestBolt12Signer::new(); + + signer_mock.expect_derive_key().returning(|_| { + Ok(PublicKey::from_str(&get_pubkey()) + .unwrap() + .serialize() + .to_vec()) + }); + + signer_mock.expect_sign_message().returning(|_, _, _| { + Ok(Signature::from_str(&get_signature()) + .unwrap() + .as_ref() + .to_vec()) + }); + + let offer = decode(get_offer()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_ok()) + } + + #[tokio::test] + async fn test_request_invoice_derive_key_error() { + let mut signer_mock = MockTestBolt12Signer::new(); + + signer_mock + .expect_derive_key() + .returning(|_| Err(Status::unknown("error testing"))); + + signer_mock.expect_sign_message().returning(|_, _, _| { + Ok(Signature::from_str(&get_signature()) + .unwrap() + .as_ref() + .to_vec()) + }); + + let offer = decode(get_offer()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) + } + + #[tokio::test] + async fn test_request_invoice_signer_error() { + let mut signer_mock = MockTestBolt12Signer::new(); + + signer_mock.expect_derive_key().returning(|_| { + Ok(PublicKey::from_str(&get_pubkey()) + .unwrap() + .serialize() + .to_vec()) + }); + + signer_mock + .expect_sign_message() + .returning(|_, _, _| Err(Status::unknown("error testing"))); + + let offer = decode(get_offer()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) + } + #[tokio::test] async fn test_validate_amount() { // If the amount the user provided is greater than the offer-provided amount, then diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7b65abf1..af7e505e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -155,7 +155,7 @@ pub struct LndNode { pub cert_path: String, pub macaroon_path: String, _handle: Child, - client: Option, + pub client: Option, } impl LndNode { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 1af8b77c..7e21ed60 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,12 +1,20 @@ mod common; use lndk; -use bitcoin::secp256k1::PublicKey; +use bitcoin::secp256k1::{PublicKey, Secp256k1}; +use bitcoin::Network; +use bitcoincore_rpc::bitcoin::Network as RpcNetwork; +use bitcoincore_rpc::RpcApi; use chrono::Utc; use ldk_sample::node_api::Node as LdkNode; -use lndk::LifecycleSignals; +use lightning::blinded_path::BlindedPath; +use lightning::offers::offer::Quantity; +use lightning::onion_message::Destination; +use lndk::onion_messenger::MessengerUtilities; +use lndk::{LifecycleSignals, PayOfferParams}; use std::path::PathBuf; use std::str::FromStr; +use std::time::SystemTime; use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; @@ -36,7 +44,6 @@ async fn wait_to_receive_onion_message( async fn check_for_message(ldk: LdkNode) -> LdkNode { loop { if ldk.onion_message_handler.messages.lock().unwrap().len() == 1 { - println!("MESSAGE: {:?}", ldk.onion_message_handler.messages); return ldk; } sleep(Duration::from_secs(2)).await; @@ -102,3 +109,190 @@ async fn test_lndk_forwards_onion_message() { } } } + +#[tokio::test(flavor = "multi_thread")] +// Here we test the beginning of the BOLT 12 offers flow. We show that lndk successfully builds an +// invoice_request and sends it. +async fn test_lndk_send_invoice_request() { + let test_name = "lndk_send_invoice_request"; + let (bitcoind, mut lnd, ldk1, ldk2, lndk_dir) = + common::setup_test_infrastructure(test_name).await; + + // Here we'll produce a little network. ldk1 will be the offer creator in this scenario. We'll + // connect ldk1 and ldk2 with a channel so ldk1 can create an offer and ldk2 can be the + // introduction node for the blinded path. + // + // Later on we'll disconnect lnd to ldk2 to make sure lnd can still auto-connect to the + // introduction node. + // + // ldk1 <--- channel ---> ldk2 <--- peer connection ---> lnd + // + // ldk1 will be the offer creator, which will build a blinded route from ldk2 to ldk1. + let (pubkey, addr) = ldk1.get_node_info(); + let (pubkey_2, addr_2) = ldk2.get_node_info(); + let lnd_info = lnd.get_info().await; + let lnd_pubkey = PublicKey::from_str(&lnd_info.identity_pubkey).unwrap(); + + ldk1.connect_to_peer(pubkey_2, addr_2).await.unwrap(); + lnd.connect_to_peer(pubkey_2, addr_2).await; + + let ldk2_fund_addr = ldk2.bitcoind_client.get_new_address().await; + + // We need to convert funding addresses to the form that the bitcoincore_rpc library recognizes. + let ldk2_addr_string = ldk2_fund_addr.to_string(); + let ldk2_addr = bitcoind::bitcoincore_rpc::bitcoin::Address::from_str(&ldk2_addr_string) + .unwrap() + .require_network(RpcNetwork::Regtest) + .unwrap(); + + // Fund both of these nodes, open the channels, and synchronize the network. + bitcoind + .node + .client + .generate_to_address(6, &ldk2_addr) + .unwrap(); + + lnd.wait_for_chain_sync().await; + + ldk2.open_channel(pubkey, addr, 200000, 0, true) + .await + .unwrap(); + + lnd.wait_for_graph_sync().await; + + bitcoind + .node + .client + .generate_to_address(20, &ldk2_addr) + .unwrap(); + + lnd.wait_for_chain_sync().await; + + let path_pubkeys = vec![pubkey_2, pubkey]; + let expiration = SystemTime::now() + Duration::from_secs(24 * 60 * 60); + let offer = ldk1 + .create_offer( + &path_pubkeys, + Network::Regtest, + 20_000, + Quantity::One, + expiration, + ) + .await + .expect("should create offer"); + + // Now we'll spin up lndk, which should forward the invoice request to ldk2. + let (shutdown, listener) = triggered::trigger(); + let lnd_cfg = lndk::lnd::LndCfg::new( + lnd.address.clone(), + PathBuf::from_str(&lnd.cert_path).unwrap(), + PathBuf::from_str(&lnd.macaroon_path).unwrap(), + ); + let (tx, rx): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + started: tx, + }; + + let lndk_cfg = lndk::Cfg { + lnd: lnd_cfg.clone(), + log_dir: Some( + lndk_dir + .join(format!("lndk-logs.txt")) + .to_str() + .unwrap() + .to_string(), + ), + signals, + }; + + let mut client = lnd.client.clone().unwrap(); + let blinded_path = offer.paths()[0].clone(); + + let messenger_utils = MessengerUtilities::new(); + let secp_ctx = Secp256k1::new(); + let reply_path = + BlindedPath::new_for_message(&[pubkey_2, lnd_pubkey], &messenger_utils, &secp_ctx).unwrap(); + + let mut stream = client + .lightning() + .subscribe_channel_graph(tonic_lnd::lnrpc::GraphTopologySubscription {}) + .await + .unwrap() + .into_inner(); + + // Wait for ldk2's graph update to come through, otherwise when we try to auto-connect to + // the introduction node later on, the address won't be available when we call the + // describe_graph API method. + 'outer: while let Some(update) = stream.message().await.unwrap() { + for node in update.node_updates.iter() { + for node_addr in node.node_addresses.iter() { + if node_addr.addr == addr_2.to_string() { + break 'outer; + } + } + } + } + + // Make sure lndk successfully sends the invoice_request. + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); + let pay_cfg = PayOfferParams { + offer: offer.clone(), + amount: Some(20_000), + network: Network::Regtest, + client: client.clone(), + destination: Destination::BlindedPath(blinded_path.clone()), + reply_path: Some(reply_path.clone()), + }; + select! { + val = messenger.run(lndk_cfg) => { + panic!("lndk should not have completed first {:?}", val); + }, + // We wait for ldk2 to receive the onion message. + res = messenger.offer_handler.send_invoice_request(pay_cfg.clone(), rx) => { + assert!(res.is_ok()); + } + } + + // Let's try again, but, make sure we can request the invoice when the LND node is not already connected + // to the introduction node (LDK2). + lnd.disconnect_peer(pubkey_2).await; + lnd.wait_for_chain_sync().await; + + let (shutdown, listener) = triggered::trigger(); + let (tx, rx): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + started: tx, + }; + + let lndk_cfg = lndk::Cfg { + lnd: lnd_cfg, + log_dir: Some( + lndk_dir + .join(format!("lndk-logs.txt")) + .to_str() + .unwrap() + .to_string(), + ), + signals, + }; + + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); + select! { + val = messenger.run(lndk_cfg) => { + panic!("lndk should not have completed first {:?}", val); + }, + // We wait for ldk2 to receive the onion message. + res = messenger.offer_handler.send_invoice_request(pay_cfg, rx) => { + assert!(res.is_ok()); + shutdown.trigger(); + ldk1.stop().await; + ldk2.stop().await; + } + } +}