Skip to content

Commit

Permalink
offers: wait for onion messenger ready signal before sending request
Browse files Browse the repository at this point in the history
Before we can send an invoice request, LNDK's onion messenger needs to be fully
started and running. We use a channel close to signal when the messenger is
ready.
  • Loading branch information
orbitalturtle committed Feb 1, 2024
1 parent 603de3d commit c6ab86e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use log4rs::append::console::ConsoleAppender;
use log4rs::append::file::FileAppender;
use log4rs::config::{Appender, Config as LogConfig, Root};
use log4rs::encode::pattern::PatternEncoder;
use std::cell::RefCell;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Mutex, Once};
use tokio::sync::mpsc::{Receiver, Sender};
use tonic_lnd::lnrpc::GetInfoRequest;
use triggered::{Listener, Trigger};

Expand All @@ -39,6 +41,8 @@ pub struct Signals {
pub shutdown: Trigger,
// Used to listen for the signal to shutdown.
pub listener: Listener,
// Used to signal when the onion messenger has started up.
pub started: Sender<u32>,
}

pub fn init_logger(config: LogConfig) {
Expand Down Expand Up @@ -173,13 +177,16 @@ enum OfferState {
pub struct OfferHandler {
_active_offers: Mutex<HashMap<String, OfferState>>,
pending_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
// This channel will close once the onion messenger has successfully started up.
_started: RefCell<Receiver<u32>>,
}

impl OfferHandler {
pub fn new() -> Self {
pub fn new(started: Receiver<u32>) -> Self {
OfferHandler {
_active_offers: Mutex::new(HashMap::new()),
pending_messages: Mutex::new(Vec::new()),
_started: RefCell::new(started),
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod internal {
use internal::*;
use lndk::lnd::LndCfg;
use lndk::{Cfg, LndkOnionMessenger, OfferHandler, Signals};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};

#[macro_use]
extern crate configure_me;
Expand All @@ -24,14 +26,20 @@ async fn main() -> Result<(), ()> {

let lnd_args = LndCfg::new(config.address, config.cert, config.macaroon);
let (shutdown, listener) = triggered::trigger();
let signals = Signals { shutdown, listener };
// Create the channel which will tell us when the onion messenger has finished starting up.
let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel(1);
let signals = Signals {
shutdown,
listener,
started: tx,
};
let args = Cfg {
lnd: lnd_args,
log_dir: config.log_dir,
signals,
};

let handler = OfferHandler::new();
let handler = OfferHandler::new(rx);
let messenger = LndkOnionMessenger::new(handler);
messenger.run(args).await
}
3 changes: 3 additions & 0 deletions src/onion_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ impl LndkOnionMessenger {
}
});

// By dropping the sender, we signal to the receiver that the onion messenger has successfully started up.
drop(signals.started);

// Consume events is our main controlling loop, so we run it inline here. We use a RefCell in onion_messenger to
// allow interior mutability (see LndNodeSigner) so this function can't safely be passed off to another thread.
// This function is expected to finish if any producing thread exits (because we're no longer receiving the
Expand Down
6 changes: 5 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use lndk::Signals;
use std::path::PathBuf;
use std::str::FromStr;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{sleep, timeout, Duration};

async fn wait_to_receive_onion_message(
Expand Down Expand Up @@ -68,9 +70,11 @@ async fn test_lndk_forwards_onion_message() {
);
let now_timestamp = Utc::now();
let timestamp = now_timestamp.format("%d-%m-%Y-%H%M");
let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel(1);
let signals = Signals {
shutdown: shutdown.clone(),
listener,
started: tx,
};
let lndk_cfg = lndk::Cfg {
lnd: lnd_cfg,
Expand All @@ -84,7 +88,7 @@ async fn test_lndk_forwards_onion_message() {
signals,
};

let handler = lndk::OfferHandler::new();
let handler = lndk::OfferHandler::new(rx);
let messenger = lndk::LndkOnionMessenger::new(handler);
select! {
val = messenger.run(lndk_cfg) => {
Expand Down

0 comments on commit c6ab86e

Please sign in to comment.