diff --git a/Cargo.lock b/Cargo.lock index f1fc2c90..1ed7b67f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -773,12 +773,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" -[[package]] -name = "hex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" - [[package]] name = "hex" version = "0.4.3" @@ -953,22 +947,22 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "ldk-sample" version = "0.1.0" -source = "git+https://github.com/lndk-org/ldk-sample?branch=onion-handler#65758f45c9caa6befda1c5c139c049b3081ffdfb" +source = "git+https://github.com/lndk-org/ldk-sample?branch=change-node-announcement-interval#d24839740b44048568341ca37e289557eff52fd9" dependencies = [ "base64 0.13.1", "bech32 0.8.1", "bitcoin 0.29.2", "bitcoin-bech32", "chrono", - "hex 0.3.2", "libc", - "lightning 0.0.116", + "lightning 0.0.118", "lightning-background-processor", "lightning-block-sync", "lightning-invoice", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", + "log", "rand 0.4.6", "serde_json", "tempfile", @@ -983,9 +977,9 @@ checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "lightning" -version = "0.0.116" +version = "0.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90a0f2155316f1570446a0447c993480673f840748c8ed25bbc59dfc442ac770" +checksum = "cb0cb32c79c42444453bfabed7bd4f4ef64dfdd2e35e191fa8f3b9099ad2a186" dependencies = [ "bitcoin 0.29.2", ] @@ -993,7 +987,7 @@ dependencies = [ [[package]] name = "lightning" version = "0.0.118" -source = "git+https://github.com/lightningdevkit/rust-lightning?rev=caafcedf3fc40fc6253261218c25b254dd955a82#caafcedf3fc40fc6253261218c25b254dd955a82" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", "musig2", @@ -1001,72 +995,67 @@ dependencies = [ [[package]] name = "lightning-background-processor" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "398b68a96cceb3c1227504bd5faeb74f26c3233447bc10cc1cb2c67e01b51556" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", "lightning-rapid-gossip-sync", ] [[package]] name = "lightning-block-sync" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d94c276dbe2a777d58ed6ececca96006247a4717c00ac4cdfff62d76852be783" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", "chunked_transfer", - "lightning 0.0.116", + "lightning 0.0.118", "serde_json", + "tokio", ] [[package]] name = "lightning-invoice" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788c0158526ec27a502043c2911ea6ea58fdc656bdf8749484942c07b790d23" +checksum = "6d2380d053673c467c5f0be992cd4685e40d141d075647d74b64aba1b9c0b1ae" dependencies = [ "bech32 0.9.1", "bitcoin 0.29.2", "bitcoin_hashes 0.11.0", - "lightning 0.0.116", + "lightning 0.0.117", "num-traits", "secp256k1 0.24.3", ] [[package]] name = "lightning-net-tokio" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "366c0ae225736cbc03555bd5fb4b44b2e8fe2ca3c868ec53a4b325c38b2ab2bd" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", "tokio", ] [[package]] name = "lightning-persister" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93caaafeb42115b70119619c2420e362cce776670427fc4ced3e6df77b41c0b6" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "libc", - "lightning 0.0.116", - "winapi", + "lightning 0.0.118", + "windows-sys", ] [[package]] name = "lightning-rapid-gossip-sync" -version = "0.0.116" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a07af5814234924e623bca499e003fca1864024d5bd984e752230f73a131584" +version = "0.0.118" +source = "git+https://github.com/orbitalturtle/rust-lightning?branch=v0.0.118-custom#07cb4d3ec5a9b7d9b338721061f69e80ca3d5381" dependencies = [ "bitcoin 0.29.2", - "lightning 0.0.116", + "lightning 0.0.118", ] [[package]] @@ -1095,7 +1084,7 @@ dependencies = [ "configure_me_codegen", "core-rpc", "futures", - "hex 0.4.3", + "hex", "home", "ldk-sample", "lightning 0.0.118", @@ -1108,6 +1097,7 @@ dependencies = [ "tokio", "tonic 0.8.3", "tonic_lnd", + "triggered", ] [[package]] @@ -2219,7 +2209,7 @@ name = "tonic_lnd" version = "0.5.1" source = "git+https://github.com/orbitalturtle/tonic_lnd?branch=update-signer-client#de989089fdb23f87d3e6bc4796c504bda9b9be9b" dependencies = [ - "hex 0.4.3", + "hex", "prost 0.9.0", "rustls 0.19.1", "rustls-pemfile", @@ -2303,6 +2293,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "triggered" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce148eae0d1a376c1b94ae651fc3261d9cb8294788b962b7382066376503a2d1" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index a584456f..beb2e402 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ bitcoin = { version = "0.29.2", features = ["rand"] } clap = { version = "4.4.6", features = ["derive"] } futures = "0.3.26" home = "0.5.5" -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "caafcedf3fc40fc6253261218c25b254dd955a82" } +lightning = { git = "https://github.com/orbitalturtle/rust-lightning", branch = "v0.0.118-custom", features = ["max_level_trace"] } rand_chacha = "0.3.1" rand_core = "0.6.4" log = "0.4.17" @@ -29,12 +29,13 @@ tonic_lnd = { git = "https://github.com/orbitalturtle/tonic_lnd", branch = "upda hex = "0.4.3" configure_me = "0.4.0" bytes = "1.4.0" +triggered = "0.1.2" [dev-dependencies] bitcoincore-rpc = { package="core-rpc", version = "0.17.0" } bitcoind = { version = "0.30.0", features = [ "22_0" ] } chrono = { version = "0.4.26" } -ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "onion-handler" } +ldk-sample = { git = "https://github.com/lndk-org/ldk-sample", branch = "change-node-announcement-interval" } mockall = "0.11.3" tempfile = "3.5.0" diff --git a/Makefile b/Makefile index 11ca02b3..aa669da7 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,6 @@ endif itest: @$(call print, "Building lnd for itests.") git submodule update --init --recursive - cd lnd/cmd/lnd; $(GO_BUILD) -tags="peersrpc signrpc dev" -o $(TMP_DIR)/lndk-tests/bin/lnd-itest$(EXEC_SUFFIX) - $(CARGO_TEST) -- -- test '*' --test-threads=1 --nocapture + cd lnd/cmd/lnd; $(GO_BUILD) -tags="peersrpc signrpc walletrpc dev" -o $(TMP_DIR)/lndk-tests/bin/lnd-itest$(EXEC_SUFFIX) + $(CARGO_TEST) --test '*' -- --test-threads=1 --nocapture diff --git a/README.md b/README.md index d4d9f732..8313067b 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ When you encounter a problem with `LNDK`, Feel free to file issues or start [a d To run `LNDK`, you require access to a `LND` node running _at least_ [LND v0.17.0](https://github.com/lightningnetwork/lnd/releases/tag/v0.17.0-beta). -You will need to compile `LND` in `dev` mode (to enable protocol-level feature handling externally) and enable the `peersrpc` and `signerrpc` sub-servers: +You will need to compile `LND` in `dev` mode (to enable protocol-level feature handling externally) and enable the `peersrpc`, `signerrpc`, and `walletrpc` sub-servers: -`make install tags="peersrpc signrpc dev"` +`make install tags="peersrpc signrpc walletrpc dev"` Note that this guide assumes some familiarity with setting up `LND`. If you're looking to get up to speed, try [this guide](https://docs.lightning.engineering/lightning-network-tools/lnd/run-lnd). @@ -82,7 +82,7 @@ Or in a more concrete example: Rather than use the admin.macaroon with unrestricted permission to an `LND` node, we can bake a macaroon using lncli with much more specific permissions for better security. With this command, generate a macaroon which will give `LNDK` only the specific grpc endpoints it's designed to hit: ``` -lncli --save_to=/lndk.macaroon uri:/lnrpc.Lightning/GetInfo uri:/lnrpc.Lightning/ListPeers uri:/lnrpc.Lightning/SubscribePeerEvents uri:/lnrpc.Lightning/SendCustomMessage uri:/lnrpc.Lightning/SubscribeCustomMessages uri:/peersrpc.Peers/UpdateNodeAnnouncement uri:/signrpc.Signer/DeriveSharedKey +lncli bakemacaroon --save_to=/lndk.macaroon uri:/lnrpc.Lightning/GetInfo uri:/lnrpc.Lightning/ListPeers uri:/lnrpc.Lightning/SubscribePeerEvents uri:/lnrpc.Lightning/SendCustomMessage uri:/lnrpc.Lightning/SubscribeCustomMessages uri:/peersrpc.Peers/UpdateNodeAnnouncement uri:/signrpc.Signer/DeriveSharedKey ``` ## Security diff --git a/lnd b/lnd index 2fb150c8..dacb86fd 160000 --- a/lnd +++ b/lnd @@ -1 +1 @@ -Subproject commit 2fb150c8fe827df9df0520ef9916b3afb7b03a8d +Subproject commit dacb86fdbc2d15c81e780217414fcbc6e0d08186 diff --git a/src/clock.rs b/src/clock.rs index 9f27c4f9..173c3db1 100644 --- a/src/clock.rs +++ b/src/clock.rs @@ -18,3 +18,9 @@ impl Clock for TokioClock { Instant::now() } } + +impl Default for TokioClock { + fn default() -> Self { + Self::new() + } +} diff --git a/src/lib.rs b/src/lib.rs index 7e3e5734..4286b634 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,18 +2,24 @@ mod clock; #[allow(dead_code)] pub mod lnd; pub mod lndk_offers; -mod onion_messenger; +pub mod onion_messenger; mod rate_limit; use crate::lnd::{ features_support_onion_messages, get_lnd_client, string_to_network, LndCfg, LndNodeSigner, }; - -use crate::onion_messenger::{run_onion_messenger, MessengerUtilities}; -use bitcoin::secp256k1::PublicKey; +use crate::lndk_offers::OfferError; +use crate::onion_messenger::MessengerUtilities; +use bitcoin::network::constants::Network; +use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey}; use home::home_dir; +use lightning::blinded_path::BlindedPath; use lightning::ln::peer_handler::IgnoringMessageHandler; -use lightning::onion_message::{DefaultMessageRouter, OnionMessenger}; +use lightning::offers::offer::Offer; +use lightning::onion_message::{ + DefaultMessageRouter, Destination, OffersMessage, OffersMessageHandler, OnionMessenger, + PendingOnionMessage, +}; use log::{error, info, LevelFilter}; use log4rs::append::console::ConsoleAppender; use log4rs::append::file::FileAppender; @@ -21,14 +27,27 @@ use log4rs::config::{Appender, Config as LogConfig, Root}; use log4rs::encode::pattern::PatternEncoder; use std::collections::HashMap; use std::str::FromStr; -use std::sync::Once; +use std::sync::{Mutex, Once}; +use tokio::sync::mpsc::{Receiver, Sender}; use tonic_lnd::lnrpc::GetInfoRequest; +use tonic_lnd::Client; +use triggered::{Listener, Trigger}; static INIT: Once = Once::new(); pub struct Cfg { pub lnd: LndCfg, pub log_dir: Option, + pub signals: LifecycleSignals, +} + +pub struct LifecycleSignals { + // Use to externally trigger shutdown. + pub shutdown: Trigger, + // Used to listen for the signal to shutdown. + pub listener: Listener, + // Used to signal when the onion messenger has started up. + pub started: Sender, } pub fn init_logger(config: LogConfig) { @@ -37,107 +56,188 @@ pub fn init_logger(config: LogConfig) { }); } -pub async fn run(args: Cfg) -> Result<(), ()> { - let log_dir = args.log_dir.unwrap_or_else(|| { - home_dir() - .unwrap() - .join(".lndk") - .join("lndk.log") - .as_path() - .to_str() - .unwrap() - .to_string() - }); +pub struct LndkOnionMessenger { + pub offer_handler: OfferHandler, +} - // Log both to stdout and a log file. - let stdout = ConsoleAppender::builder().build(); - let lndk_logs = FileAppender::builder() - .encoder(Box::new(PatternEncoder::new("{d} - {m}{n}"))) - .build(log_dir) - .unwrap(); - - let config = LogConfig::builder() - .appender(Appender::builder().build("stdout", Box::new(stdout))) - .appender(Appender::builder().build("lndk_logs", Box::new(lndk_logs))) - .build( - Root::builder() - .appender("stdout") - .appender("lndk_logs") - .build(LevelFilter::Info), - ) - .unwrap(); +impl LndkOnionMessenger { + pub fn new(offer_handler: OfferHandler) -> Self { + LndkOnionMessenger { offer_handler } + } - init_logger(config); + pub async fn run(&self, args: Cfg) -> Result<(), ()> { + let log_dir = args.log_dir.unwrap_or_else(|| { + home_dir() + .unwrap() + .join(".lndk") + .join("lndk.log") + .as_path() + .to_str() + .unwrap() + .to_string() + }); + + // Log both to stdout and a log file. + let stdout = ConsoleAppender::builder().build(); + let lndk_logs = FileAppender::builder() + .encoder(Box::new(PatternEncoder::new("{d} - {m}{n}"))) + .build(log_dir) + .unwrap(); + + let config = LogConfig::builder() + .appender(Appender::builder().build("stdout", Box::new(stdout))) + .appender(Appender::builder().build("lndk_logs", Box::new(lndk_logs))) + .build( + Root::builder() + .appender("stdout") + .appender("lndk_logs") + .build(LevelFilter::Info), + ) + .unwrap(); + + init_logger(config); + + let mut client = get_lnd_client(args.lnd).expect("failed to connect"); + + let info = client + .lightning() + .get_info(GetInfoRequest {}) + .await + .expect("failed to get info") + .into_inner(); + + let mut network_str = None; + for chain in info.chains { + if chain.chain == "bitcoin" { + network_str = Some(chain.network.clone()) + } + } + if network_str.is_none() { + error!("lnd node is not connected to bitcoin network as expected"); + return Err(()); + } + let network = string_to_network(&network_str.unwrap()); - let mut client = get_lnd_client(args.lnd).expect("failed to connect"); + let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); + info!("Starting lndk for node: {pubkey}."); - let info = client - .lightning() - .get_info(GetInfoRequest {}) - .await - .expect("failed to get info") - .into_inner(); + if !features_support_onion_messages(&info.features) { + error!("LND must support onion messaging to run LNDK."); + return Err(()); + } - let mut network_str = None; - for chain in info.chains { - if chain.chain == "bitcoin" { - network_str = Some(chain.network.clone()) + // On startup, we want to get a list of our currently online peers to notify the onion messenger that they are + // connected. This sets up our "start state" for the messenger correctly. + let current_peers = client + .lightning() + .list_peers(tonic_lnd::lnrpc::ListPeersRequest { + latest_error: false, + }) + .await + .map_err(|e| { + error!("Could not lookup current peers: {e}."); + })?; + + let mut peer_support = HashMap::new(); + for peer in current_peers.into_inner().peers { + let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); + let onion_support = features_support_onion_messages(&peer.features); + peer_support.insert(pubkey, onion_support); } + + // Create an onion messenger that depends on LND's signer client and consume related events. + let mut node_client = client.signer().clone(); + let node_signer = LndNodeSigner::new(pubkey, &mut node_client); + let messenger_utils = MessengerUtilities::new(); + let onion_messenger = OnionMessenger::new( + &messenger_utils, + &node_signer, + &messenger_utils, + &DefaultMessageRouter {}, + &self.offer_handler, + IgnoringMessageHandler {}, + ); + + let mut peers_client = client.lightning().clone(); + self.run_onion_messenger( + peer_support, + &mut peers_client, + onion_messenger, + network.unwrap(), + args.signals, + ) + .await } - if network_str.is_none() { - error!("lnd node is not connected to bitcoin network as expected"); - return Err(()); +} + +#[allow(dead_code)] +enum OfferState { + OfferAdded, + InvoiceRequestSent, + InvoiceReceived, + InvoicePaymentDispatched, + InvoicePaid, +} + +pub struct OfferHandler { + active_offers: Mutex>, + pending_messages: Mutex>>, + messenger_utils: MessengerUtilities, +} + +#[derive(Clone)] +pub struct PayOfferParams { + pub offer: Offer, + pub amount: Option, + pub network: Network, + pub client: Client, + /// The destination the offer creator provided, which we will use to send the invoice request. + pub destination: Destination, + /// The path we will send back to the offer creator, so it knows where to send back the invoice. + pub reply_path: Option, +} + +impl OfferHandler { + pub fn new() -> Self { + OfferHandler { + active_offers: Mutex::new(HashMap::new()), + pending_messages: Mutex::new(Vec::new()), + messenger_utils: MessengerUtilities::new(), + } } - let network = string_to_network(&network_str.unwrap()); - let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); - info!("Starting lndk for node: {pubkey}."); + /// Adds an offer to be paid with the amount specified. May only be called once for a single offer. + pub async fn pay_offer( + &self, + cfg: PayOfferParams, + started: Receiver, + ) -> Result<(), OfferError> { + self.send_invoice_request(cfg, started).await?; + Ok(()) + } +} - if !features_support_onion_messages(&info.features) { - error!("LND must support onion messaging to run LNDK."); - return Err(()); +impl Default for OfferHandler { + fn default() -> Self { + Self::new() } +} - // On startup, we want to get a list of our currently online peers to notify the onion messenger that they are - // connected. This sets up our "start state" for the messenger correctly. - let current_peers = client - .lightning() - .list_peers(tonic_lnd::lnrpc::ListPeersRequest { - latest_error: false, - }) - .await - .map_err(|e| { - error!("Could not lookup current peers: {e}."); - })?; - - let mut peer_support = HashMap::new(); - for peer in current_peers.into_inner().peers { - let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); - let onion_support = features_support_onion_messages(&peer.features); - peer_support.insert(pubkey, onion_support); +impl OffersMessageHandler for OfferHandler { + fn handle_message(&self, message: OffersMessage) -> Option { + match message { + OffersMessage::InvoiceRequest(_) => { + log::error!("Invoice request received, payment not yet supported."); + None + } + OffersMessage::Invoice(_invoice) => None, + OffersMessage::InvoiceError(_error) => None, + } } - // Create an onion messenger that depends on LND's signer client and consume related events. - let mut node_client = client.signer().clone(); - let node_signer = LndNodeSigner::new(pubkey, &mut node_client); - let messenger_utils = MessengerUtilities::new(); - let onion_messenger = OnionMessenger::new( - &messenger_utils, - &node_signer, - &messenger_utils, - &DefaultMessageRouter {}, - IgnoringMessageHandler {}, - IgnoringMessageHandler {}, - ); - - let mut peers_client = client.lightning().clone(); - run_onion_messenger( - peer_support, - &mut peers_client, - onion_messenger, - network.unwrap(), - ) - .await + fn release_pending_messages(&self) -> Vec> { + core::mem::take(&mut self.pending_messages.lock().unwrap()) + } } #[cfg(test)] diff --git a/src/lnd.rs b/src/lnd.rs index 277b2161..1eb6161b 100644 --- a/src/lnd.rs +++ b/src/lnd.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt; use std::path::PathBuf; +use tonic_lnd::lnrpc::{LightningNode, ListPeersResponse}; use tonic_lnd::signrpc::KeyLocator; use tonic_lnd::tonic::Status; use tonic_lnd::{Client, ConnectError}; @@ -28,6 +29,7 @@ pub(crate) fn get_lnd_client(cfg: LndCfg) -> Result { } /// LndCfg specifies the configuration required to connect to LND's grpc client. +#[derive(Clone)] pub struct LndCfg { address: String, cert: PathBuf, @@ -185,7 +187,7 @@ pub(crate) fn string_to_network(network_str: &str) -> Result Result, Status>; async fn sign_message( &mut self, @@ -194,3 +196,11 @@ pub(crate) trait MessageSigner { tag: String, ) -> Result, Status>; } + +/// PeerConnector provides a layer of abstraction over the LND API for connecting to a peer. +#[async_trait] +pub trait PeerConnector { + async fn list_peers(&mut self) -> Result; + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status>; + async fn get_node_info(&mut self, pub_key: String) -> Result, Status>; +} diff --git a/src/lndk_offers.rs b/src/lndk_offers.rs index 26aae8fd..3fa25eb3 100644 --- a/src/lndk_offers.rs +++ b/src/lndk_offers.rs @@ -1,38 +1,71 @@ -use crate::lnd::MessageSigner; +use crate::lnd::{features_support_onion_messages, MessageSigner, PeerConnector}; +use crate::{OfferHandler, OfferState, PayOfferParams}; use async_trait::async_trait; use bitcoin::hashes::sha256::Hash; use bitcoin::network::constants::Network; use bitcoin::secp256k1::schnorr::Signature; -use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey}; +use bitcoin::secp256k1::{Error as Secp256k1Error, PublicKey, Secp256k1}; use futures::executor::block_on; +use lightning::blinded_path::BlindedPath; use lightning::offers::invoice_request::{InvoiceRequest, UnsignedInvoiceRequest}; use lightning::offers::merkle::SignError; -use lightning::offers::offer::Offer; +use lightning::offers::offer::{Amount, Offer}; use lightning::offers::parse::{Bolt12ParseError, Bolt12SemanticError}; +use lightning::onion_message::{Destination, OffersMessage, PendingOnionMessage}; +use log::error; use std::error::Error; use std::fmt::Display; +use std::str::FromStr; +use tokio::sync::mpsc::Receiver; use tokio::task; +use tonic_lnd::lnrpc::{GetInfoRequest, LightningNode, ListPeersRequest, ListPeersResponse}; use tonic_lnd::signrpc::{KeyLocator, SignMessageReq}; use tonic_lnd::tonic::Status; use tonic_lnd::Client; #[derive(Debug)] /// OfferError is an error that occurs during the process of paying an offer. -pub(crate) enum OfferError { +pub enum OfferError { + /// AlreadyProcessing indicates that we're already in the process of paying an offer. + AlreadyProcessing, /// BuildUIRFailure indicates a failure to build the unsigned invoice request. BuildUIRFailure(Bolt12SemanticError), /// SignError indicates a failure to sign the invoice request. SignError(SignError), /// DeriveKeyFailure indicates a failure to derive key for signing the invoice request. DeriveKeyFailure(Status), + /// User provided an invalid amount. + InvalidAmount(String), + /// Invalid currency contained in the offer. + InvalidCurrency, + /// Unable to connect to peer. + PeerConnectError(Status), + /// No node address. + NodeAddressNotFound, + /// Cannot list peers. + ListPeersFailure(Status), + /// Failure to build a reply path. + BuildBlindedPathFailure, } impl Display for OfferError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + OfferError::AlreadyProcessing => { + write!(f, "LNDK is already trying to pay for provided offer") + } OfferError::BuildUIRFailure(e) => write!(f, "Error building invoice request: {e:?}"), OfferError::SignError(e) => write!(f, "Error signing invoice request: {e:?}"), OfferError::DeriveKeyFailure(e) => write!(f, "Error signing invoice request: {e:?}"), + OfferError::InvalidAmount(e) => write!(f, "User provided an invalid amount: {e:?}"), + OfferError::InvalidCurrency => write!( + f, + "LNDK doesn't yet support offer currencies other than bitcoin" + ), + OfferError::PeerConnectError(e) => write!(f, "Error connecting to peer: {e:?}"), + OfferError::NodeAddressNotFound => write!(f, "Couldn't get node address"), + OfferError::ListPeersFailure(e) => write!(f, "Error listing peers: {e:?}"), + OfferError::BuildBlindedPathFailure => write!(f, "Error building blinded path"), } } } @@ -44,58 +77,314 @@ pub fn decode(offer_str: String) -> Result { offer_str.parse::() } -#[allow(dead_code)] -// create_request_invoice builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. -pub(crate) async fn create_request_invoice( - mut signer: impl MessageSigner + std::marker::Send + 'static, - offer: Offer, - metadata: Vec, - network: Network, - msats: u64, -) -> Result> { - // We use KeyFamily KeyFamilyNodeKey (6) to derive a key to represent our node id. See: - // https://github.com/lightningnetwork/lnd/blob/a3f8011ed695f6204ec6a13ad5c2a67ac542b109/keychain/derivation.go#L103 - let key_loc = KeyLocator { - key_family: 6, - key_index: 1, +impl OfferHandler { + pub async fn send_invoice_request( + &self, + mut cfg: PayOfferParams, + mut started: Receiver, + ) -> Result<(), OfferError> { + // Wait for onion messenger to give us the signal that it's ready. Once the onion messenger drops + // the channel sender, recv will return None and we'll stop blocking here. + if started.recv().await.is_some() { + error!("Error: we shouldn't receive any messages on this channel"); + } + + let validated_amount = validate_amount(&cfg.offer, cfg.amount).await?; + + // For now we connect directly to the introduction node of the blinded path so we don't need any + // intermediate nodes here. In the future we'll query for a full path to the introduction node for + // better sender privacy. + match cfg.destination { + Destination::Node(pubkey) => connect_to_peer(cfg.client.clone(), pubkey).await?, + Destination::BlindedPath(ref path) => { + connect_to_peer(cfg.client.clone(), path.introduction_node_id).await? + } + }; + + let offer_id = cfg.offer.clone().to_string(); + { + let mut active_offers = self.active_offers.lock().unwrap(); + if active_offers.contains_key(&offer_id.clone()) { + return Err(OfferError::AlreadyProcessing); + } + active_offers.insert(cfg.offer.to_string().clone(), OfferState::OfferAdded); + } + + let invoice_request = self + .create_invoice_request( + cfg.client.clone(), + cfg.offer, + vec![], + cfg.network, + validated_amount, + ) + .await?; + + if cfg.reply_path.is_none() { + let info = cfg + .client + .lightning() + .get_info(GetInfoRequest {}) + .await + .expect("failed to get info") + .into_inner(); + + let pubkey = PublicKey::from_str(&info.identity_pubkey).unwrap(); + cfg.reply_path = Some(self.create_reply_path(cfg.client.clone(), pubkey).await?) + }; + let contents = OffersMessage::InvoiceRequest(invoice_request); + let pending_message = PendingOnionMessage { + contents, + destination: cfg.destination, + reply_path: cfg.reply_path, + }; + + let mut pending_messages = self.pending_messages.lock().unwrap(); + pending_messages.push(pending_message); + std::mem::drop(pending_messages); + + Ok(()) + } + + // create_invoice_request builds and signs an invoice request, the first step in the BOLT 12 process of paying an offer. + pub async fn create_invoice_request( + &self, + mut signer: impl MessageSigner + std::marker::Send + 'static, + offer: Offer, + metadata: Vec, + network: Network, + msats: u64, + ) -> Result> { + // We use KeyFamily KeyFamilyNodeKey (6) to derive a key to represent our node id. See: + // https://github.com/lightningnetwork/lnd/blob/a3f8011ed695f6204ec6a13ad5c2a67ac542b109/keychain/derivation.go#L103 + let key_loc = KeyLocator { + key_family: 6, + key_index: 1, + }; + + let pubkey_bytes = signer + .derive_key(key_loc.clone()) + .await + .map_err(OfferError::DeriveKeyFailure)?; + let pubkey = + PublicKey::from_slice(&pubkey_bytes).expect("failed to deserialize public key"); + + let unsigned_invoice_req = offer + .request_invoice(metadata, pubkey) + .unwrap() + .chain(network) + .unwrap() + .amount_msats(msats) + .unwrap() + .build() + .map_err(OfferError::BuildUIRFailure)?; + + // To create a valid invoice request, we also need to sign it. This is spawned in a blocking + // task because we need to call block_on on sign_message so that sign_closure can be a + // synchronous closure. + task::spawn_blocking(move || { + let sign_closure = |msg: &UnsignedInvoiceRequest| { + let tagged_hash = msg.as_ref(); + let tag = tagged_hash.tag().to_string(); + + let signature = + block_on(signer.sign_message(key_loc, tagged_hash.merkle_root(), tag)) + .map_err(|_| Secp256k1Error::InvalidSignature)?; + + Signature::from_slice(&signature) + }; + + unsigned_invoice_req + .sign(sign_closure) + .map_err(OfferError::SignError) + }) + .await + .unwrap() + } + + /// create_reply_path creates a blinded path to provide to the offer maker when requesting an + /// invoice so they know where to send the invoice back to. We try to find a peer that we're + /// connected to with onion messaging support that we can use to form a blinded path, + /// otherwise we creae a blinded path directly to ourselves. + pub async fn create_reply_path( + &self, + mut connector: impl PeerConnector + std::marker::Send + 'static, + node_id: PublicKey, + ) -> Result> { + // Find an introduction node for our blinded path. + let current_peers = connector.list_peers().await.map_err(|e| { + error!("Could not lookup current peers: {e}."); + OfferError::ListPeersFailure(e) + })?; + + let mut intro_node = None; + for peer in current_peers.peers { + let pubkey = PublicKey::from_str(&peer.pub_key).unwrap(); + let onion_support = features_support_onion_messages(&peer.features); + if onion_support { + intro_node = Some(pubkey); + } + } + + let secp_ctx = Secp256k1::new(); + if intro_node.is_none() { + Ok( + BlindedPath::one_hop_for_message(node_id, &self.messenger_utils, &secp_ctx) + .map_err(|_| { + error!("Could not create blinded path."); + OfferError::BuildBlindedPathFailure + })?, + ) + } else { + Ok(BlindedPath::new_for_message( + &[intro_node.unwrap(), node_id], + &self.messenger_utils, + &secp_ctx, + ) + .map_err(|_| { + error!("Could not create blinded path."); + OfferError::BuildBlindedPathFailure + }))? + } + } +} + +// Checks that the user-provided amount matches the offer. +pub async fn validate_amount( + offer: &Offer, + amount_msats: Option, +) -> Result> { + let validated_amount = match offer.amount() { + Some(offer_amount) => { + match *offer_amount { + Amount::Bitcoin { + amount_msats: bitcoin_amt, + } => { + if let Some(msats) = amount_msats { + if msats < bitcoin_amt { + return Err(OfferError::InvalidAmount(format!( + "{msats} is less than offer amount {}", + bitcoin_amt + ))); + } + msats + } else { + // If user didn't set amount, set it to the offer amount. + if bitcoin_amt == 0 { + return Err(OfferError::InvalidAmount( + "Offer doesn't set an amount, so user must specify one".to_string(), + )); + } + bitcoin_amt + } + } + _ => { + return Err(OfferError::InvalidCurrency); + } + } + } + None => { + if let Some(msats) = amount_msats { + msats + } else { + return Err(OfferError::InvalidAmount( + "Offer doesn't set an amount, so user must specify one".to_string(), + )); + } + } }; + Ok(validated_amount) +} - let pubkey_bytes = signer - .derive_key(key_loc.clone()) +pub async fn get_destination(offer: &Offer) -> Destination { + if offer.paths().is_empty() { + Destination::Node(offer.signing_pubkey()) + } else { + Destination::BlindedPath(offer.paths()[0].clone()) + } +} + +// connect_to_peer connects to the provided node if we're not already connected. +pub async fn connect_to_peer( + mut connector: impl PeerConnector, + node_id: PublicKey, +) -> Result<(), OfferError> { + let resp = connector + .list_peers() .await - .map_err(OfferError::DeriveKeyFailure)?; - let pubkey = PublicKey::from_slice(&pubkey_bytes).expect("failed to deserialize public key"); + .map_err(OfferError::PeerConnectError)?; - let unsigned_invoice_req = offer - .request_invoice(metadata, pubkey) - .unwrap() - .chain(network) - .unwrap() - .amount_msats(msats) - .unwrap() - .build() - .map_err(OfferError::BuildUIRFailure)?; + let node_id_str = node_id.to_string(); + for peer in resp.peers.iter() { + if peer.pub_key == node_id_str { + return Ok(()); + } + } + + let node = connector + .get_node_info(node_id_str.clone()) + .await + .map_err(OfferError::PeerConnectError)?; + + let node = match node { + Some(node) => node, + None => return Err(OfferError::NodeAddressNotFound), + }; + + if node.addresses.is_empty() { + return Err(OfferError::NodeAddressNotFound); + } + + connector + .connect_peer(node_id_str, node.addresses[0].clone().addr) + .await + .map_err(OfferError::PeerConnectError)?; + + Ok(()) +} + +#[async_trait] +impl PeerConnector for Client { + async fn list_peers(&mut self) -> Result { + let list_req = ListPeersRequest { + ..Default::default() + }; + + self.lightning() + .list_peers(list_req) + .await + .map(|resp| resp.into_inner()) + } + + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status> { + let ln_addr = tonic_lnd::lnrpc::LightningAddress { + pubkey: node_id, + host: addr, + }; - // To create a valid invoice request, we also need to sign it. This is spawned in a blocking - // task because we need to call block_on on sign_message so that sign_closure can be a - // synchronous closure. - task::spawn_blocking(move || { - let sign_closure = |msg: &UnsignedInvoiceRequest| { - let tagged_hash = msg.as_ref(); - let tag = tagged_hash.tag().to_string(); + let connect_req = tonic_lnd::lnrpc::ConnectPeerRequest { + addr: Some(ln_addr), + timeout: 20, + ..Default::default() + }; - let signature = block_on(signer.sign_message(key_loc, tagged_hash.merkle_root(), tag)) - .map_err(|_| Secp256k1Error::InvalidSignature)?; + self.lightning() + .connect_peer(connect_req.clone()) + .await + .map(|_| ()) + } - Signature::from_slice(&signature) + async fn get_node_info(&mut self, pub_key: String) -> Result, Status> { + let req = tonic_lnd::lnrpc::NodeInfoRequest { + pub_key, + include_channels: false, }; - unsigned_invoice_req - .sign(sign_closure) - .map_err(OfferError::SignError) - }) - .await - .unwrap() + self.lightning() + .get_node_info(req) + .await + .map(|resp| resp.into_inner().node) + } } #[async_trait] @@ -132,13 +421,33 @@ impl MessageSigner for Client { #[cfg(test)] mod tests { use super::*; + use bitcoin::secp256k1::{KeyPair, Secp256k1, SecretKey}; + use lightning::offers::offer::{OfferBuilder, Quantity}; use mockall::mock; + use std::collections::HashMap; use std::str::FromStr; + use std::time::{Duration, SystemTime}; + use tonic_lnd::lnrpc::NodeAddress; fn get_offer() -> String { "lno1qgsqvgnwgcg35z6ee2h3yczraddm72xrfua9uve2rlrm9deu7xyfzrcgqgn3qzsyvfkx26qkyypvr5hfx60h9w9k934lt8s2n6zc0wwtgqlulw7dythr83dqx8tzumg".to_string() } + fn build_custom_offer(amount_msats: u64) -> Offer { + let secp_ctx = Secp256k1::new(); + let keys = KeyPair::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap()); + let pubkey = PublicKey::from(keys); + + let expiration = SystemTime::now() + Duration::from_secs(24 * 60 * 60); + OfferBuilder::new("coffee".to_string(), pubkey) + .amount_msats(amount_msats) + .supported_quantity(Quantity::Unbounded) + .absolute_expiry(expiration.duration_since(SystemTime::UNIX_EPOCH).unwrap()) + .issuer("Foo Bar".to_string()) + .build() + .unwrap() + } + fn get_pubkey() -> String { "0313ba7ccbd754c117962b9afab6c2870eb3ef43f364a9f6c43d0fabb4553776ba".to_string() } @@ -157,6 +466,17 @@ mod tests { } } + mock! { + TestPeerConnector{} + + #[async_trait] + impl PeerConnector for TestPeerConnector { + async fn list_peers(&mut self) -> Result; + async fn get_node_info(&mut self, pub_key: String) -> Result, Status>; + async fn connect_peer(&mut self, node_id: String, addr: String) -> Result<(), Status>; + } + } + #[tokio::test] async fn test_request_invoice() { let mut signer_mock = MockTestBolt12Signer::new(); @@ -176,12 +496,11 @@ mod tests { }); let offer = decode(get_offer()).unwrap(); - - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_ok() - ) + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_ok()) } #[tokio::test] @@ -200,12 +519,11 @@ mod tests { }); let offer = decode(get_offer()).unwrap(); - - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err() - ) + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) } #[tokio::test] @@ -224,11 +542,184 @@ mod tests { .returning(|_, _, _| Err(Status::unknown("error testing"))); let offer = decode(get_offer()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_invoice_request(signer_mock, offer, vec![], Network::Regtest, 10000) + .await + .is_err()) + } - assert!( - create_request_invoice(signer_mock, offer, vec![], Network::Regtest, 10000) - .await - .is_err() - ) + #[tokio::test] + async fn test_validate_amount() { + // If the amount the user provided is greater than the offer-provided amount, then + // we should be good. + let offer = build_custom_offer(20000); + assert!(validate_amount(&offer, Some(20000)).await.is_ok()); + + let offer = build_custom_offer(0); + assert!(validate_amount(&offer, Some(20000)).await.is_ok()); + } + + #[tokio::test] + async fn test_validate_invalid_amount() { + // If the amount the user provided is lower than the offer amount, we error. + let offer = build_custom_offer(20000); + assert!(validate_amount(&offer, Some(1000)).await.is_err()); + + // Both user amount and offer amount can't be 0. + let offer = build_custom_offer(0); + assert!(validate_amount(&offer, None).await.is_err()); + } + + #[tokio::test] + async fn test_connect_peer() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + Ok(ListPeersResponse { + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + connector_mock + .expect_connect_peer() + .returning(|_, _| Ok(())); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_ok()); + } + + #[tokio::test] + async fn test_connect_peer_already_connected() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + let peer = tonic_lnd::lnrpc::Peer { + pub_key: get_pubkey(), + ..Default::default() + }; + + Ok(ListPeersResponse { + peers: vec![peer], + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_ok()); + } + + #[tokio::test] + async fn test_connect_peer_connect_error() { + let mut connector_mock = MockTestPeerConnector::new(); + connector_mock.expect_list_peers().returning(|| { + Ok(ListPeersResponse { + ..Default::default() + }) + }); + + connector_mock.expect_get_node_info().returning(|_| { + let node_addr = NodeAddress { + network: String::from("regtest"), + addr: String::from("127.0.0.1"), + }; + let node = LightningNode { + addresses: vec![node_addr], + ..Default::default() + }; + + Ok(Some(node)) + }); + + connector_mock + .expect_connect_peer() + .returning(|_, _| Err(Status::unknown(""))); + + let pubkey = PublicKey::from_str(&get_pubkey()).unwrap(); + assert!(connect_to_peer(connector_mock, pubkey).await.is_err()); + } + + #[tokio::test] + async fn test_create_reply_path() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock.expect_list_peers().returning(|| { + let feature = tonic_lnd::lnrpc::Feature { + ..Default::default() + }; + let mut feature_entry = HashMap::new(); + feature_entry.insert(38, feature); + + let peer = tonic_lnd::lnrpc::Peer { + pub_key: get_pubkey(), + features: feature_entry, + ..Default::default() + }; + Ok(ListPeersResponse { peers: vec![peer] }) + }); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_ok()) + } + + #[tokio::test] + // Test that create_reply_path works fine when no suitable introduction node peer is found. + async fn test_create_reply_path_no_intro_node() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock + .expect_list_peers() + .returning(|| Ok(ListPeersResponse { peers: vec![] })); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_ok()) + } + + #[tokio::test] + async fn test_create_reply_path_list_peers_error() { + let mut connector_mock = MockTestPeerConnector::new(); + + connector_mock + .expect_list_peers() + .returning(|| Err(Status::unknown("unknown error"))); + + let receiver_node_id = PublicKey::from_str(&get_pubkey()).unwrap(); + let handler = OfferHandler::new(); + assert!(handler + .create_reply_path(connector_mock, receiver_node_id) + .await + .is_err()) } } diff --git a/src/main.rs b/src/main.rs index 746308c8..a98fb46e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#[allow(unused_imports)] mod internal { #![allow(clippy::enum_variant_names)] #![allow(clippy::unnecessary_lazy_evaluations)] @@ -10,7 +11,9 @@ mod internal { use internal::*; use lndk::lnd::LndCfg; -use lndk::Cfg; +use lndk::{Cfg, LifecycleSignals, LndkOnionMessenger, OfferHandler}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; #[macro_use] extern crate configure_me; @@ -22,10 +25,21 @@ async fn main() -> Result<(), ()> { .0; let lnd_args = LndCfg::new(config.address, config.cert, config.macaroon); + let (shutdown, listener) = triggered::trigger(); + // Create the channel which will tell us when the onion messenger has finished starting up. + let (tx, _): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown, + listener, + started: tx, + }; let args = Cfg { lnd: lnd_args, log_dir: config.log_dir, + signals, }; - lndk::run(args).await + let handler = OfferHandler::new(); + let messenger = LndkOnionMessenger::new(handler); + messenger.run(args).await } diff --git a/src/onion_messenger.rs b/src/onion_messenger.rs index 6a8b9df0..283bde66 100644 --- a/src/onion_messenger.rs +++ b/src/onion_messenger.rs @@ -1,6 +1,7 @@ use crate::clock::TokioClock; use crate::lnd::{features_support_onion_messages, ONION_MESSAGES_OPTIONAL}; use crate::rate_limit::{RateLimiter, TokenLimiter}; +use crate::{LifecycleSignals, LndkOnionMessenger}; use async_trait::async_trait; use bitcoin::blockdata::constants::ChainHash; use bitcoin::network::constants::Network; @@ -32,6 +33,7 @@ use tonic_lnd::{ lnrpc::CustomMessage, lnrpc::PeerEvent, lnrpc::SendCustomMessageRequest, lnrpc::SendCustomMessageResponse, tonic::Status, LightningClient, }; +use triggered::Listener; /// ONION_MESSAGE_TYPE is the message type number used in BOLT1 message types for onion messages. const ONION_MESSAGE_TYPE: u32 = 513; @@ -51,18 +53,24 @@ const DEFAULT_CALL_FREQUENCY: Duration = Duration::from_secs(1); /// A refcell is used for entropy_source to provide interior mutability for ChaCha20Rng. We need a mutable reference /// to be able to use the chacha library’s fill_bytes method, but the EntropySource interface in LDK is for an /// immutable reference. -pub(crate) struct MessengerUtilities { +pub struct MessengerUtilities { entropy_source: RefCell, } impl MessengerUtilities { - pub(crate) fn new() -> Self { + pub fn new() -> Self { MessengerUtilities { entropy_source: RefCell::new(ChaCha20Rng::from_entropy()), } } } +impl Default for MessengerUtilities { + fn default() -> Self { + Self::new() + } +} + impl EntropySource for MessengerUtilities { // TODO: surface LDK's EntropySource and use instead. fn get_secure_random_bytes(&self) -> [u8; 32] { @@ -88,169 +96,177 @@ impl Logger for MessengerUtilities { } } -/// run_onion_messenger is the main event loop for connecting an OnionMessenger to LND's various APIs to handle -/// onion messages externally to LND. It follows a producer / consumer pattern, with many producers -/// creating MessengerEvents that are handled by a single consumer that drives the OnionMessenger accordingly. This -/// function will block until consumer errors or one of the producers exits. -/// -/// Producers: -/// 1. Peer Events: Sourced from LND's PeerEventSubscription API, produces peer online and offline events. -/// 2. Incoming Messages: Sourced from LND's SubscribeCustomMessages API, produces incoming onion message events. -/// 3. Outgoing Poll: Using a simple ticker, produces polling events to check for outgoing onion messages. -/// -/// The main consumer processes one MessengerEvent at a time, applying basic rate limiting to each peer to prevent spam. -pub(crate) async fn run_onion_messenger< - ES: Deref, - NS: Deref, - L: Deref, - MR: Deref, - OMH: Deref, - CMH: Deref, ->( - current_peers: HashMap, - ln_client: &mut tonic_lnd::LightningClient, - onion_messenger: OnionMessenger, - network: Network, -) -> Result<(), ()> -where - ES::Target: EntropySource, - NS::Target: NodeSigner, - L::Target: Logger, - MR::Target: MessageRouter, - OMH::Target: OffersMessageHandler, - CMH::Target: CustomOnionMessageHandler + Sized, -{ - // Setup channels that we'll use to communicate onion messenger events. We buffer our channels by the number of - // peers (+1 because we require a non-zero buffer) that the node currently has so that we can send all of our - // startup online events in one go (before we boot up the consumer). The number of peers that we have is also - // related to the number of events we can expect to process, so it's a sensible enough buffer size. - let (sender, receiver) = channel(current_peers.len() + 1); - for (peer, onion_support) in current_peers.clone() { - sender - .send(MessengerEvents::PeerConnected(peer, onion_support)) - .await - .map_err(|e| { - error!("Notify peer connected: {e}."); - })? - } +impl LndkOnionMessenger { + /// run_onion_messenger is the main event loop for connecting an OnionMessenger to LND's various APIs to handle + /// onion messages externally to LND. It follows a producer / consumer pattern, with many producers + /// creating MessengerEvents that are handled by a single consumer that drives the OnionMessenger accordingly. This + /// function will block until consumer errors or one of the producers exits. + /// + /// Producers: + /// 1. Peer Events: Sourced from LND's PeerEventSubscription API, produces peer online and offline events. + /// 2. Incoming Messages: Sourced from LND's SubscribeCustomMessages API, produces incoming onion message events. + /// 3. Outgoing Poll: Using a simple ticker, produces polling events to check for outgoing onion messages. + /// + /// The main consumer processes one MessengerEvent at a time, applying basic rate limiting to each peer to prevent spam. + pub(crate) async fn run_onion_messenger< + ES: Deref, + NS: Deref, + L: Deref, + MR: Deref, + OMH: Deref, + CMH: Deref, + >( + &self, + current_peers: HashMap, + ln_client: &mut tonic_lnd::LightningClient, + onion_messenger: OnionMessenger, + network: Network, + signals: LifecycleSignals, + ) -> Result<(), ()> + where + ES::Target: EntropySource, + NS::Target: NodeSigner, + L::Target: Logger, + MR::Target: MessageRouter, + OMH::Target: OffersMessageHandler, + CMH::Target: CustomOnionMessageHandler + Sized, + { + // Setup channels that we'll use to communicate onion messenger events. We buffer our channels by the number of + // peers (+1 because we require a non-zero buffer) that the node currently has so that we can send all of our + // startup online events in one go (before we boot up the consumer). The number of peers that we have is also + // related to the number of events we can expect to process, so it's a sensible enough buffer size. + let (sender, receiver) = channel(current_peers.len() + 1); + for (peer, onion_support) in current_peers.clone() { + sender + .send(MessengerEvents::PeerConnected(peer, onion_support)) + .await + .map_err(|e| { + error!("Notify peer connected: {e}."); + })? + } - // Setup channels that we'll use to signal to spawned producers that an exit has occurred elsewhere so they should - // exit, and a tokio task set to track all our spawned tasks. - // TODO: Combine these channels into a single channel. - let (peers_exit_sender, peers_exit_receiver) = channel(1); - let (in_messages_exit_sender, in_messages_exit_receiver) = channel(1); - let (out_messages_exit_sender, out_messages_exit_receiver) = channel(1); - let mut set = tokio::task::JoinSet::new(); - - // Subscribe to peer events from LND first thing so that we don't miss any online/offline events while we are - // starting up. The onion messenger can handle superfluous online/offline reports, so it's okay if this ends - // up creating some duplicate events. The event subscription from LND blocks until it gets its first event (which - // could take very long), so we get the subscription itself inside of our producer thread. - let mut peers_client = ln_client.clone(); - let peers_sender = sender.clone(); - set.spawn(async move { - let peer_subscription = peers_client - .subscribe_peer_events(tonic_lnd::lnrpc::PeerEventSubscription {}) - .await - .expect("peer subscription failed") - .into_inner(); + let mut set = tokio::task::JoinSet::new(); + + // Subscribe to peer events from LND first thing so that we don't miss any online/offline events while we are + // starting up. The onion messenger can handle superfluous online/offline reports, so it's okay if this ends + // up creating some duplicate events. The event subscription from LND blocks until it gets its first event (which + // could take very long), so we get the subscription itself inside of our producer thread. + let mut peers_client = ln_client.clone(); + let peers_sender = sender.clone(); + let (peers_shutdown, peers_listener) = (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + let peer_subscription = peers_client + .subscribe_peer_events(tonic_lnd::lnrpc::PeerEventSubscription {}) + .await + .expect("peer subscription failed") + .into_inner(); - let peer_stream = PeerStream { - peer_subscription, - client: peers_client, - }; + let peer_stream = PeerStream { + peer_subscription, + client: peers_client, + }; - match produce_peer_events(peer_stream, peers_sender, peers_exit_receiver).await { - Ok(_) => debug!("Peer events producer exited."), - Err(e) => error!("Peer events producer exited: {e}."), - }; - }); - - // Subscribe to custom messaging events from LND so that we can receive incoming messages. - let mut messages_client = ln_client.clone(); - let in_msg_sender = sender.clone(); - set.spawn(async move { - let message_subscription = messages_client - .subscribe_custom_messages(tonic_lnd::lnrpc::SubscribeCustomMessagesRequest {}) - .await - .expect("message subscription failed") - .into_inner(); + match produce_peer_events(peer_stream, peers_sender, peers_listener).await { + Ok(_) => debug!("Peer events producer exited."), + Err(e) => { + peers_shutdown.trigger(); + error!("Peer events producer exited: {e}."); + } + }; + }); - let message_stream = MessageStream { - message_subscription, - }; + // Subscribe to custom messaging events from LND so that we can receive incoming messages. + let mut messages_client = ln_client.clone(); + let in_msg_sender = sender.clone(); + let (messages_shutdown, messages_listener) = + (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + let message_subscription = messages_client + .subscribe_custom_messages(tonic_lnd::lnrpc::SubscribeCustomMessagesRequest {}) + .await + .expect("message subscription failed") + .into_inner(); + + let message_stream = MessageStream { + message_subscription, + }; - match produce_incoming_message_events( - message_stream, - in_msg_sender, - in_messages_exit_receiver, + match produce_incoming_message_events(message_stream, in_msg_sender, messages_listener) + .await + { + Ok(_) => debug!("Message events producer exited."), + Err(e) => { + messages_shutdown.trigger(); + error!("Message events producer exited: {e}."); + } + } + }); + + // Spin up a ticker that polls at an interval for any outgoing messages so that we can pass on outgoing messages to + // LND. + let interval = time::interval(MSG_POLL_INTERVAL); + let (events_shutdown, events_listener) = + (signals.shutdown.clone(), signals.listener.clone()); + set.spawn(async move { + match produce_outgoing_message_events(sender, events_listener, interval).await { + Ok(_) => debug!("Outgoing message events producer exited."), + Err(e) => { + events_shutdown.trigger(); + error!("Outgoing message events producer exited: {e}."); + } + } + }); + + // By dropping the sender, we signal to the receiver that the onion messenger has successfully started up. + drop(signals.started); + + // Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to + // allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread. + // This function is expected to finish if any producing thread exits (because we're no longer receiving the + // events we need). + let rate_limiter = &mut TokenLimiter::new( + current_peers.keys().copied(), + DEFAULT_CALL_COUNT, + DEFAULT_CALL_FREQUENCY, + TokioClock::new(), + ); + let mut message_sender = CustomMessenger { + client: ln_client.clone(), + }; + let consume_result = consume_messenger_events( + onion_messenger, + receiver, + &mut message_sender, + rate_limiter, + network, ) - .await - { - Ok(_) => debug!("Message events producer exited."), - Err(e) => error!("Message events producer exited: {e}."), + .await; + match consume_result { + Ok(_) => info!("Consume messenger events exited."), + Err(e) => { + signals.shutdown.trigger(); + error!("Consume messenger events exited: {e}."); + } } - }); - - // Spin up a ticker that polls at an interval for any outgoing messages so that we can pass on outgoing messages to - // LND. - let interval = time::interval(MSG_POLL_INTERVAL); - set.spawn(async move { - match produce_outgoing_message_events(sender, out_messages_exit_receiver, interval).await { - Ok(_) => debug!("Outgoing message events producer exited."), - Err(e) => error!("Outgoing message events producer exited: {e}."), + + // Tasks will independently exit, so we can assert that they do so in any order. + let mut task_err = false; + while let Some(res) = set.join_next().await { + match res { + Ok(_) => info!("Producer exited."), + Err(_) => { + task_err = true; + error!("Producer exited with an error."); + } + }; + } + // Exit with an error if any task did not exit cleanly. + if consume_result.is_err() || task_err { + return Err(()); } - }); - - // Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to - // allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread. - // This function is expected to finish if any producing thread exits (because we're no longer receiving the - // events we need). - let rate_limiter = &mut TokenLimiter::new( - current_peers.keys().copied(), - DEFAULT_CALL_COUNT, - DEFAULT_CALL_FREQUENCY, - TokioClock::new(), - ); - let mut message_sender = CustomMessenger { - client: ln_client.clone(), - }; - let consume_result = consume_messenger_events( - onion_messenger, - receiver, - &mut message_sender, - rate_limiter, - network, - ) - .await; - match consume_result { - Ok(_) => info!("Consume messenger events exited."), - Err(e) => error!("Consume messenger events exited: {e}."), - } - // Once the consumer has exited, we drop our exit signal channel's sender so that the receiving channels will close. - // This signals to all producers that it's time to exit, so we can await their exit once we've done this. - drop(peers_exit_sender); - drop(in_messages_exit_sender); - drop(out_messages_exit_sender); - - // Tasks will independently exit, so we can assert that they do so in any order. - let mut task_err = false; - while let Some(res) = set.join_next().await { - match res { - Ok(_) => info!("Producer exited."), - Err(_) => { - task_err = true; - error!("Producer exited with an error."); - } - }; - } - // Exit with an error if any task did not exit cleanly. - if consume_result.is_err() || task_err { - return Err(()); + Ok(()) } - - Ok(()) } /// lookup_onion_support performs a best-effort lookup in the node's list of current peers to determine whether it @@ -338,7 +354,7 @@ impl PeerEventProducer for PeerStream { async fn produce_peer_events( mut source: impl PeerEventProducer, events: Sender, - mut exit: Receiver<()>, + listener: Listener, ) -> Result<(), ProducerError> { loop { select! ( @@ -347,7 +363,7 @@ async fn produce_peer_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Peer events received signal to quit."); return Ok(()) } @@ -421,7 +437,7 @@ impl IncomingMessageProducer for MessageStream { async fn produce_incoming_message_events( mut source: impl IncomingMessageProducer, events: Sender, - mut exit: Receiver<()>, + listener: Listener, ) -> Result<(), ProducerError> { loop { select! ( @@ -430,7 +446,7 @@ async fn produce_incoming_message_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Peer events received signal to quit."); return Ok(()) } @@ -641,7 +657,7 @@ impl SendCustomMessage for CustomMessenger { /// events anyway. async fn produce_outgoing_message_events( events: Sender, - mut exit: Receiver<()>, + listener: Listener, mut interval: Interval, ) -> Result<(), ProducerError> { loop { @@ -651,7 +667,7 @@ async fn produce_outgoing_message_events( // of events that can't be consumed, possibly blocking if the channel buffer is small). biased; - _ = exit.recv() => { + _ = listener.clone() => { info!("Outgoing messenger events received signal to quit."); return Ok(()); } @@ -942,7 +958,7 @@ mod tests { #[tokio::test] async fn test_produce_peer_events() { let (sender, mut receiver) = channel(4); - let (_exit_sender, exit) = channel(1); + let (_shutdown, listener) = triggered::trigger(); let mut mock = MockPeerProducer::new(); @@ -977,7 +993,7 @@ mod tests { .returning(|| Err(Status::unknown("mock stream err"))); matches!( - produce_peer_events(mock, sender, exit) + produce_peer_events(mock, sender, listener) .await .expect_err("producer should error"), ProducerError::StreamError(_) @@ -1007,11 +1023,11 @@ mod tests { #[tokio::test] async fn test_produce_peer_events_exit() { let (sender, _receiver) = channel(1); - let (exit_sender, exit) = channel(1); + let (shutdown, listener) = triggered::trigger(); let mock = MockPeerProducer::new(); - drop(exit_sender); - assert!(produce_peer_events(mock, sender, exit).await.is_ok()); + shutdown.trigger(); + assert!(produce_peer_events(mock, sender, listener).await.is_ok()); } mock! { @@ -1026,7 +1042,7 @@ mod tests { #[tokio::test] async fn test_produce_incoming_message_events() { let (sender, mut receiver) = channel(2); - let (_exit_sender, exit) = channel(1); + let (_shutdown, listener) = triggered::trigger(); let mut mock = MockMessageProducer::new(); @@ -1056,7 +1072,7 @@ mod tests { .returning(|| Err(Status::unknown("mock stream err"))); matches!( - produce_incoming_message_events(mock, sender, exit) + produce_incoming_message_events(mock, sender, listener) .await .expect_err("producer should error"), ProducerError::StreamError(_), @@ -1076,11 +1092,11 @@ mod tests { #[tokio::test] async fn test_produce_incoming_message_exit() { let (sender, _receiver) = channel(2); - let (exit_sender, exit) = channel(1); + let (shutdown, listener) = triggered::trigger(); let mock = MockMessageProducer::new(); - drop(exit_sender); - assert!(produce_incoming_message_events(mock, sender, exit) + shutdown.trigger(); + assert!(produce_incoming_message_events(mock, sender, listener) .await .is_ok()); } @@ -1088,16 +1104,14 @@ mod tests { #[tokio::test] async fn test_produce_outgoing_message_events_exit() { let (sender, _) = channel(1); - let (exit_sender, exit_receiver) = channel(1); + let (shutdown, listener) = triggered::trigger(); let interval = time::interval(MSG_POLL_INTERVAL); // Let's test that produce_outgoing_message_events successfully exits when it receives the signal, rather than // loop infinitely. - drop(exit_sender); - assert!( - produce_outgoing_message_events(sender, exit_receiver, interval) - .await - .is_ok() - ); + shutdown.trigger(); + assert!(produce_outgoing_message_events(sender, listener, interval) + .await + .is_ok()); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 5d46d977..af7e505e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,13 +7,14 @@ use bitcoind::{get_available_port, BitcoinD, Conf, ConnectParams}; use chrono::Utc; use ldk_sample::config::LdkUserInfo; use ldk_sample::node_api::Node as LdkNode; -use std::net::SocketAddr; +use lightning::util::logger::Level; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; use std::process::{Child, Command, Stdio}; use std::thread; use std::{env, fs}; use tempfile::{tempdir, Builder, TempDir}; -use tokio::time::Duration; +use tokio::time::{sleep, timeout, Duration}; use tonic_lnd::lnrpc::GetInfoRequest; use tonic_lnd::Client; @@ -34,28 +35,36 @@ pub async fn setup_test_infrastructure( let connect_params = bitcoind.node.params.get_cookie_values().unwrap(); + let port = get_available_port().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let ldk1_config = LdkUserInfo { bitcoind_rpc_username: connect_params.0.clone().unwrap(), bitcoind_rpc_password: connect_params.1.clone().unwrap(), bitcoind_rpc_host: String::from("localhost"), bitcoind_rpc_port: bitcoind.node.params.rpc_socket.port(), ldk_data_dir: ldk_test_dir.clone(), - ldk_announced_listen_addr: Vec::new(), - ldk_peer_listening_port: get_available_port().unwrap(), + ldk_announced_listen_addr: vec![addr.into()], + ldk_peer_listening_port: port, ldk_announced_node_name: [0; 32], network: Network::Regtest, + log_level: Level::Trace, + node_num: 1, }; + let port = get_available_port().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let ldk2_config = LdkUserInfo { bitcoind_rpc_username: connect_params.0.unwrap(), bitcoind_rpc_password: connect_params.1.unwrap(), bitcoind_rpc_host: String::from("localhost"), bitcoind_rpc_port: bitcoind.node.params.rpc_socket.port(), ldk_data_dir: ldk_test_dir, - ldk_announced_listen_addr: Vec::new(), - ldk_peer_listening_port: get_available_port().unwrap(), + ldk_announced_listen_addr: vec![addr.into()], + ldk_peer_listening_port: port, ldk_announced_node_name: [0; 32], network: Network::Regtest, + log_level: Level::Trace, + node_num: 2, }; let ldk1 = ldk_sample::start_ldk(ldk1_config, test_name).await; @@ -104,7 +113,7 @@ fn setup_test_dirs(test_name: &str) -> (PathBuf, PathBuf, PathBuf) { // BitcoindNode holds the tools we need to interact with a Bitcoind node. pub struct BitcoindNode { - node: BitcoinD, + pub node: BitcoinD, _data_dir: TempDir, zmq_block_port: u16, zmq_tx_port: u16, @@ -146,7 +155,7 @@ pub struct LndNode { pub cert_path: String, pub macaroon_path: String, _handle: Child, - client: Option, + pub client: Option, } impl LndNode { @@ -321,4 +330,70 @@ impl LndNode { resp } + + // disconnect_peer disconnects the specified peer. + #[allow(dead_code)] + pub async fn disconnect_peer( + &mut self, + node_id: PublicKey, + ) -> tonic_lnd::lnrpc::DisconnectPeerResponse { + let disconnect_req = tonic_lnd::lnrpc::DisconnectPeerRequest { + pub_key: node_id.to_string(), + ..Default::default() + }; + + let resp = if let Some(client) = self.client.clone() { + let make_request = || async { + client + .clone() + .lightning() + .disconnect_peer(disconnect_req.clone()) + .await + }; + let resp = test_utils::retry_async(make_request, String::from("disconnect_peer")); + resp.await.unwrap() + } else { + panic!("No client") + }; + + resp + } + + // wait_for_chain_sync waits until we're synced to chain according to the get_info response. + // We'll timeout if it takes too long. + pub async fn wait_for_chain_sync(&mut self) { + match timeout(Duration::from_secs(100), self.check_chain_sync()).await { + Err(_) => panic!("timeout before lnd synced to chain"), + _ => {} + }; + } + + pub async fn check_chain_sync(&mut self) { + loop { + let resp = self.get_info().await; + if resp.synced_to_chain { + return; + } + sleep(Duration::from_secs(2)).await; + } + } + + // wait_for_lnd_sync waits until we're synced to graph according to the get_info response. + // We'll timeout if it takes too long. + pub async fn wait_for_graph_sync(&mut self) { + match timeout(Duration::from_secs(100), self.check_graph_sync()).await { + Err(_) => panic!("timeout before lnd synced to graph"), + _ => {} + }; + } + + pub async fn check_graph_sync(&mut self) { + loop { + let resp = self.get_info().await; + if resp.synced_to_graph { + return; + } + sleep(Duration::from_secs(2)).await; + } + } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 2bfa72d1..7e21ed60 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,12 +1,23 @@ mod common; use lndk; -use bitcoin::secp256k1::PublicKey; +use bitcoin::secp256k1::{PublicKey, Secp256k1}; +use bitcoin::Network; +use bitcoincore_rpc::bitcoin::Network as RpcNetwork; +use bitcoincore_rpc::RpcApi; use chrono::Utc; use ldk_sample::node_api::Node as LdkNode; +use lightning::blinded_path::BlindedPath; +use lightning::offers::offer::Quantity; +use lightning::onion_message::Destination; +use lndk::onion_messenger::MessengerUtilities; +use lndk::{LifecycleSignals, PayOfferParams}; use std::path::PathBuf; use std::str::FromStr; +use std::time::SystemTime; use tokio::select; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{sleep, timeout, Duration}; async fn wait_to_receive_onion_message( @@ -33,7 +44,6 @@ async fn wait_to_receive_onion_message( async fn check_for_message(ldk: LdkNode) -> LdkNode { loop { if ldk.onion_message_handler.messages.lock().unwrap().len() == 1 { - println!("MESSAGE: {:?}", ldk.onion_message_handler.messages); return ldk; } sleep(Duration::from_secs(2)).await; @@ -59,6 +69,7 @@ async fn test_lndk_forwards_onion_message() { // Now we'll spin up lndk. Even though ldk1 and ldk2 are not directly connected, we'll show that lndk // successfully helps lnd forward the onion message from ldk1 to ldk2. + let (shutdown, listener) = triggered::trigger(); let lnd_cfg = lndk::lnd::LndCfg::new( lnd.address, PathBuf::from_str(&lnd.cert_path).unwrap(), @@ -66,6 +77,12 @@ async fn test_lndk_forwards_onion_message() { ); let now_timestamp = Utc::now(); let timestamp = now_timestamp.format("%d-%m-%Y-%H%M"); + let (tx, _): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + started: tx, + }; let lndk_cfg = lndk::Cfg { lnd: lnd_cfg, log_dir: Some( @@ -75,13 +92,205 @@ async fn test_lndk_forwards_onion_message() { .unwrap() .to_string(), ), + signals, }; + + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); select! { - val = lndk::run(lndk_cfg) => { + val = messenger.run(lndk_cfg) => { panic!("lndk should not have completed first {:?}", val); }, // We wait for ldk2 to receive the onion message. (ldk1, ldk2) = wait_to_receive_onion_message(ldk1, ldk2, PublicKey::from_str(&lnd_info.identity_pubkey).unwrap()) => { + shutdown.trigger(); + ldk1.stop().await; + ldk2.stop().await; + } + } +} + +#[tokio::test(flavor = "multi_thread")] +// Here we test the beginning of the BOLT 12 offers flow. We show that lndk successfully builds an +// invoice_request and sends it. +async fn test_lndk_send_invoice_request() { + let test_name = "lndk_send_invoice_request"; + let (bitcoind, mut lnd, ldk1, ldk2, lndk_dir) = + common::setup_test_infrastructure(test_name).await; + + // Here we'll produce a little network. ldk1 will be the offer creator in this scenario. We'll + // connect ldk1 and ldk2 with a channel so ldk1 can create an offer and ldk2 can be the + // introduction node for the blinded path. + // + // Later on we'll disconnect lnd to ldk2 to make sure lnd can still auto-connect to the + // introduction node. + // + // ldk1 <--- channel ---> ldk2 <--- peer connection ---> lnd + // + // ldk1 will be the offer creator, which will build a blinded route from ldk2 to ldk1. + let (pubkey, addr) = ldk1.get_node_info(); + let (pubkey_2, addr_2) = ldk2.get_node_info(); + let lnd_info = lnd.get_info().await; + let lnd_pubkey = PublicKey::from_str(&lnd_info.identity_pubkey).unwrap(); + + ldk1.connect_to_peer(pubkey_2, addr_2).await.unwrap(); + lnd.connect_to_peer(pubkey_2, addr_2).await; + + let ldk2_fund_addr = ldk2.bitcoind_client.get_new_address().await; + + // We need to convert funding addresses to the form that the bitcoincore_rpc library recognizes. + let ldk2_addr_string = ldk2_fund_addr.to_string(); + let ldk2_addr = bitcoind::bitcoincore_rpc::bitcoin::Address::from_str(&ldk2_addr_string) + .unwrap() + .require_network(RpcNetwork::Regtest) + .unwrap(); + + // Fund both of these nodes, open the channels, and synchronize the network. + bitcoind + .node + .client + .generate_to_address(6, &ldk2_addr) + .unwrap(); + + lnd.wait_for_chain_sync().await; + + ldk2.open_channel(pubkey, addr, 200000, 0, true) + .await + .unwrap(); + + lnd.wait_for_graph_sync().await; + + bitcoind + .node + .client + .generate_to_address(20, &ldk2_addr) + .unwrap(); + + lnd.wait_for_chain_sync().await; + + let path_pubkeys = vec![pubkey_2, pubkey]; + let expiration = SystemTime::now() + Duration::from_secs(24 * 60 * 60); + let offer = ldk1 + .create_offer( + &path_pubkeys, + Network::Regtest, + 20_000, + Quantity::One, + expiration, + ) + .await + .expect("should create offer"); + + // Now we'll spin up lndk, which should forward the invoice request to ldk2. + let (shutdown, listener) = triggered::trigger(); + let lnd_cfg = lndk::lnd::LndCfg::new( + lnd.address.clone(), + PathBuf::from_str(&lnd.cert_path).unwrap(), + PathBuf::from_str(&lnd.macaroon_path).unwrap(), + ); + let (tx, rx): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + started: tx, + }; + + let lndk_cfg = lndk::Cfg { + lnd: lnd_cfg.clone(), + log_dir: Some( + lndk_dir + .join(format!("lndk-logs.txt")) + .to_str() + .unwrap() + .to_string(), + ), + signals, + }; + + let mut client = lnd.client.clone().unwrap(); + let blinded_path = offer.paths()[0].clone(); + + let messenger_utils = MessengerUtilities::new(); + let secp_ctx = Secp256k1::new(); + let reply_path = + BlindedPath::new_for_message(&[pubkey_2, lnd_pubkey], &messenger_utils, &secp_ctx).unwrap(); + + let mut stream = client + .lightning() + .subscribe_channel_graph(tonic_lnd::lnrpc::GraphTopologySubscription {}) + .await + .unwrap() + .into_inner(); + + // Wait for ldk2's graph update to come through, otherwise when we try to auto-connect to + // the introduction node later on, the address won't be available when we call the + // describe_graph API method. + 'outer: while let Some(update) = stream.message().await.unwrap() { + for node in update.node_updates.iter() { + for node_addr in node.node_addresses.iter() { + if node_addr.addr == addr_2.to_string() { + break 'outer; + } + } + } + } + + // Make sure lndk successfully sends the invoice_request. + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); + let pay_cfg = PayOfferParams { + offer: offer.clone(), + amount: Some(20_000), + network: Network::Regtest, + client: client.clone(), + destination: Destination::BlindedPath(blinded_path.clone()), + reply_path: Some(reply_path.clone()), + }; + select! { + val = messenger.run(lndk_cfg) => { + panic!("lndk should not have completed first {:?}", val); + }, + // We wait for ldk2 to receive the onion message. + res = messenger.offer_handler.send_invoice_request(pay_cfg.clone(), rx) => { + assert!(res.is_ok()); + } + } + + // Let's try again, but, make sure we can request the invoice when the LND node is not already connected + // to the introduction node (LDK2). + lnd.disconnect_peer(pubkey_2).await; + lnd.wait_for_chain_sync().await; + + let (shutdown, listener) = triggered::trigger(); + let (tx, rx): (Sender, Receiver) = mpsc::channel(1); + let signals = LifecycleSignals { + shutdown: shutdown.clone(), + listener, + started: tx, + }; + + let lndk_cfg = lndk::Cfg { + lnd: lnd_cfg, + log_dir: Some( + lndk_dir + .join(format!("lndk-logs.txt")) + .to_str() + .unwrap() + .to_string(), + ), + signals, + }; + + let handler = lndk::OfferHandler::new(); + let messenger = lndk::LndkOnionMessenger::new(handler); + select! { + val = messenger.run(lndk_cfg) => { + panic!("lndk should not have completed first {:?}", val); + }, + // We wait for ldk2 to receive the onion message. + res = messenger.offer_handler.send_invoice_request(pay_cfg, rx) => { + assert!(res.is_ok()); + shutdown.trigger(); ldk1.stop().await; ldk2.stop().await; }