From 36be11f16dc592543e6d3d13c0083811b5af0376 Mon Sep 17 00:00:00 2001 From: bennyhodl Date: Tue, 17 Dec 2024 19:43:24 -0500 Subject: [PATCH] Lightning Transport v2 --- .github/workflows/rust.yml | 13 ++ ddk/Cargo.toml | 7 +- ddk/examples/{ddk.rs => lightning.rs} | 0 ddk/src/transport/lightning/mod.rs | 72 ++----- ddk/src/transport/lightning/peer_manager.rs | 223 +++++++++++++++++++- ddk/tests/nostr.rs | 105 +++++++++ 6 files changed, 367 insertions(+), 53 deletions(-) rename ddk/examples/{ddk.rs => lightning.rs} (100%) create mode 100644 ddk/tests/nostr.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index eef90de..82db501 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -35,6 +35,19 @@ jobs: override: true - name: Check format run: cargo fmt --check + check: + name: check + runs-on: ubuntu-latest + steps: + - name: Install Protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + override: true + - name: cargo check + run: cargo check --all-features integration_tests_prepare: runs-on: ubuntu-latest timeout-minutes: 30 diff --git a/ddk/Cargo.toml b/ddk/Cargo.toml index 5725af4..9df155d 100644 --- a/ddk/Cargo.toml +++ b/ddk/Cargo.toml @@ -76,5 +76,10 @@ bitcoincore-rpc = "0.19.0" [[example]] name = "lighnting" -path = "examples/ddk.rs" +path = "examples/lightning.rs" required-features = ["lightning", "kormir", "sled"] + +[[example]] +name = "nostr" +path = "examples/nostr.rs" +required-features = ["nostr"] \ No newline at end of file diff --git a/ddk/examples/ddk.rs b/ddk/examples/lightning.rs similarity index 100% rename from ddk/examples/ddk.rs rename to ddk/examples/lightning.rs diff --git a/ddk/src/transport/lightning/mod.rs b/ddk/src/transport/lightning/mod.rs index d5cd829..ccc0be3 100644 --- a/ddk/src/transport/lightning/mod.rs +++ b/ddk/src/transport/lightning/mod.rs @@ -1,9 +1,9 @@ use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport}; use async_trait::async_trait; use bitcoin::secp256k1::PublicKey; -use lightning_net_tokio::{connect_outbound, setup_inbound}; -use std::{sync::Arc, time::Duration}; -use tokio::net::TcpListener; +use lightning_net_tokio::connect_outbound; +use std::sync::Arc; +use tokio::sync::watch; pub(crate) mod peer_manager; pub use peer_manager::LightningTransport; @@ -18,63 +18,35 @@ impl Transport for LightningTransport { self.node_id } - /// Creates a TCP listener and accepts incoming connection spawning a tokio thread. - async fn listen(&self) { - let listener = TcpListener::bind(format!("0.0.0.0:{}", self.listening_port)) - .await - .expect("Coldn't get port."); - - loop { - let peer_mgr = self.peer_manager.clone(); - let (tcp_stream, socket) = listener.accept().await.unwrap(); - tokio::spawn(async move { - tracing::info!(connection = socket.to_string(), "Received connection."); - setup_inbound(peer_mgr.clone(), tcp_stream.into_std().unwrap()).await; - }); - } - } - /// Sends a message to a peer. - /// - /// TODO: Assert that we are connected to the peer before sending. fn send_message(&self, counterparty: PublicKey, message: dlc_messages::Message) { - self.message_handler.send_message(counterparty, message) + tracing::info!(message=?message, "Sending message to {}", counterparty.to_string()); + if self.peer_manager.peer_by_node_id(&counterparty).is_some() { + self.message_handler.send_message(counterparty, message) + } else { + tracing::warn!( + pubkey = counterparty.to_string(), + "Not connected to counterparty. Message not sent" + ) + } } /// Gets and clears the message queue with messages to be processed. /// Takes the manager to process the DLC messages that are received. - async fn receive_messages( + async fn start( &self, + mut stop_signal: watch::Receiver, manager: Arc>, - ) { - let mut timer = tokio::time::interval(Duration::from_secs(5)); - loop { - timer.tick().await; - let messages = self.message_handler.get_and_clear_received_messages(); - - for (counter_party, message) in messages { - tracing::info!( - counter_party = counter_party.to_string(), - "Processing DLC message" - ); + ) -> Result<(), anyhow::Error> { + let listen_handle = self.listen(stop_signal.clone()); - match manager.on_dlc_message(&message, counter_party).await { - Err(e) => { - tracing::error!(error =? e, "On message error.") - } - Ok(contract) => { - if let Some(msg) = contract { - tracing::info!("Responding to message received."); - tracing::debug!(message=?msg); - self.message_handler.send_message(counter_party, msg); - } - } - }; - } + let process_handle = self.process_messages(stop_signal.clone(), manager.clone()); - if self.message_handler.has_pending_messages() { - self.peer_manager.process_events() - } + // Wait for either task to complete or stop signal + tokio::select! { + _ = stop_signal.changed() => Ok(()), + res = listen_handle => res?, + res = process_handle => res?, } } diff --git a/ddk/src/transport/lightning/peer_manager.rs b/ddk/src/transport/lightning/peer_manager.rs index db25ae6..f6d1771 100644 --- a/ddk/src/transport/lightning/peer_manager.rs +++ b/ddk/src/transport/lightning/peer_manager.rs @@ -9,8 +9,14 @@ use lightning::{ sign::{KeysManager, NodeSigner}, util::logger::{Logger, Record}, }; -use lightning_net_tokio::SocketDescriptor; -use std::{sync::Arc, time::SystemTime}; +use lightning_net_tokio::{setup_inbound, SocketDescriptor}; +use std::{ + sync::Arc, + time::{Duration, SystemTime}, +}; +use tokio::{net::TcpListener, sync::watch, task::JoinHandle, time::interval}; + +use crate::{ddk::DlcDevKitDlcManager, Oracle, Storage}; pub struct DlcDevKitLogger; @@ -77,4 +83,217 @@ impl LightningTransport { listening_port, }) } + + pub fn listen( + &self, + stop_signal: watch::Receiver, + ) -> JoinHandle> { + let listening_port = self.listening_port; + let mut listen_stop = stop_signal.clone(); + let peer_manager = Arc::clone(&self.peer_manager); + tokio::spawn(async move { + let listener = TcpListener::bind(format!("0.0.0.0:{}", listening_port)) + .await + .expect("Coldn't get port."); + + tracing::info!( + addr =? listener.local_addr().unwrap(), + "Starting lightning peer manager listener." + ); + loop { + tokio::select! { + _ = listen_stop.changed() => { + if *listen_stop.borrow() { + tracing::warn!("Stop signal for lightning connection manager."); + break; + } + }, + accept_result = listener.accept() => { + match accept_result { + Ok((tcp_stream, socket)) => { + let peer_mgr = Arc::clone(&peer_manager); + tokio::spawn(async move { + tracing::info!( + connection = socket.to_string(), + "Received connection." + ); + setup_inbound(peer_mgr, tcp_stream.into_std().unwrap()).await; + }); + } + Err(e) => { + tracing::error!("Error accepting connection: {}", e); + } + } + } + } + } + Ok::<_, anyhow::Error>(()) + }) + } + + pub fn process_messages( + &self, + stop_signal: watch::Receiver, + manager: Arc>, + ) -> JoinHandle> { + let mut message_stop = stop_signal.clone(); + let message_manager = Arc::clone(&manager); + // let peer_manager = Arc::clone(&self.peer_manager); + let message_handler = Arc::clone(&self.message_handler); + tokio::spawn(async move { + let mut message_interval = interval(Duration::from_secs(5)); + // let mut event_interval = interval(Duration::from_secs(2)); + loop { + tokio::select! { + _ = message_stop.changed() => { + if *message_stop.borrow() { + tracing::warn!("Stop signal for lightning message processor."); + break; + } + }, + _ = message_interval.tick() => { + let messages = message_handler.get_and_clear_received_messages(); + for (counter_party, message) in messages { + tracing::info!( + counter_party = counter_party.to_string(), + "Processing DLC message" + ); + match message_manager.on_dlc_message(&message, counter_party).await { + Ok(Some(response)) => { + message_handler.send_message(counter_party, response); + } + Ok(None) => (), + Err(e) => { + tracing::error!( + error=e.to_string(), + counterparty=counter_party.to_string(), + message=?message, + "Could not process dlc message." + ); + } + } + } + } + // _ = event_interval.tick() => { + // peer_manager.process_events() + // } + } + } + Ok::<_, anyhow::Error>(()) + }) + } +} + +#[cfg(test)] +mod tests { + use ddk_manager::Storage; + use dlc_messages::{Message, OfferDlc}; + + use crate::{ + builder::Builder, oracle::memory::MemoryOracle, storage::memory::MemoryStorage, DlcDevKit, + Transport, + }; + + use super::*; + + fn get_offer() -> OfferDlc { + let offer_string = include_str!("../../../../ddk-manager/test_inputs/offer_contract.json"); + let offer: OfferDlc = serde_json::from_str(&offer_string).unwrap(); + offer + } + + fn create_peer_manager(listening_port: u16) -> (LightningTransport, PublicKey) { + let mut seed = [0u8; 32]; + seed.try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); + let peer_manager = LightningTransport::new(&seed, listening_port).unwrap(); + let pubkey = peer_manager.node_id.clone(); + (peer_manager, pubkey) + } + + async fn manager( + listening_port: u16, + ) -> DlcDevKit { + let mut seed_bytes = [0u8; 32]; + seed_bytes + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); + + let transport = Arc::new(LightningTransport::new(&seed_bytes, listening_port).unwrap()); + let storage = Arc::new(MemoryStorage::new()); + let oracle_client = Arc::new(MemoryOracle::default()); + + let mut builder = Builder::new(); + builder.set_seed_bytes(seed_bytes); + builder.set_transport(transport.clone()); + builder.set_storage(storage.clone()); + builder.set_oracle(oracle_client.clone()); + builder.finish().await.unwrap() + } + + #[test_log::test(tokio::test)] + async fn send_offer() { + let alice = manager(1776).await; + let alice_pk = alice.transport.public_key(); + let bob = manager(1777).await; + let _bob_pk = bob.transport.public_key(); + + bob.start().unwrap(); + alice.start().unwrap(); + + bob.transport + .connect_outbound(alice_pk, "127.0.0.1:1776") + .await; + + let mut connected = false; + let mut retries = 0; + + while !connected { + if retries > 10 { + bob.stop().unwrap(); + alice.stop().unwrap(); + panic!("Bob could not connect to alice.") + } + if bob + .transport + .peer_manager + .peer_by_node_id(&alice_pk) + .is_some() + { + connected = true + } + retries += 1; + tokio::time::sleep(Duration::from_millis(100)).await + } + + let offer = get_offer(); + bob.transport + .send_message(alice_pk, Message::Offer(offer.clone())); + + let mut connected = false; + let mut retries = 0; + + while !connected { + if retries > 10 { + bob.stop().unwrap(); + alice.stop().unwrap(); + panic!("Contract was not offered to alice") + } + if bob + .storage + .get_contract(&offer.temporary_contract_id) + .unwrap() + .is_some() + { + connected = true + } + retries += 1; + tokio::time::sleep(Duration::from_millis(500)).await + } + + bob.stop().unwrap(); + alice.stop().unwrap(); + assert!(true) + // alice.0.send_message(bob.1, Message::Offer(offer)); + } } diff --git a/ddk/tests/nostr.rs b/ddk/tests/nostr.rs new file mode 100644 index 0000000..00436fa --- /dev/null +++ b/ddk/tests/nostr.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use bitcoin::{key::rand::Fill, secp256k1::PublicKey, Network}; +use chrono::{Local, TimeDelta}; +use ddk::oracle::memory::MemoryOracle; +use ddk::storage::memory::MemoryStorage; +use ddk::transport::nostr::NostrDlc; +use ddk::DlcDevKit; +use ddk::{builder::Builder, Transport}; +use dlc::{EnumerationPayout, Payout}; + +mod test_util; + +type NostrDlcDevKit = DlcDevKit; + +async fn nostr_ddk(name: &str, oracle: Arc) -> NostrDlcDevKit { + let mut seed = [0u8; 32]; + seed.try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); + let esplora_host = "http://127.0.0.1:30000".to_string(); + + let transport = Arc::new( + NostrDlc::new(&seed, "wss://nostr.dlcdevkit.com", Network::Regtest) + .await + .unwrap(), + ); + let storage = Arc::new(MemoryStorage::new()); + + let ddk: NostrDlcDevKit = Builder::new() + .set_network(Network::Regtest) + .set_seed_bytes(seed) + .set_esplora_host(esplora_host) + .set_name(name) + .set_oracle(oracle) + .set_transport(transport) + .set_storage(storage) + .finish() + .await + .unwrap(); + ddk +} + +const EVENT_ID: &str = "nostr-event"; + +#[test_log::test(tokio::test)] +async fn nostr_contract() { + let oracle = Arc::new(MemoryOracle::default()); + let alice = nostr_ddk("alice-nostr", oracle.clone()).await; + let bob = nostr_ddk("bob-nostr", oracle.clone()).await; + + alice.start(); + bob.start(); + + let alice_address = alice.wallet.new_external_address().unwrap().address; + let bob_address = bob.wallet.new_external_address().unwrap().address; + test_util::fund_addresses(&alice_address, &bob_address); + + let expiry = TimeDelta::seconds(15); + let timestamp: u32 = Local::now() + .checked_add_signed(expiry) + .unwrap() + .timestamp() + .try_into() + .unwrap(); + + let announcement = oracle + .oracle + .create_enum_event( + "nostr-event".to_string(), + vec!["cat".to_string(), "ctv".to_string()], + timestamp, + ) + .await + .unwrap(); + + let contract_input = ddk_payouts::enumeration::create_contract_input( + vec![ + EnumerationPayout { + outcome: "cat".to_string(), + payout: Payout { + offer: 100_000_000, + accept: 0, + }, + }, + EnumerationPayout { + outcome: "ctv".to_string(), + payout: Payout { + offer: 0, + accept: 100_000_000, + }, + }, + ], + 100_000_000, + 100_000_000, + 1, + oracle.oracle.public_key().to_string(), + EVENT_ID.to_string(), + ); + let alice_pubkey = + PublicKey::from_slice(&alice.transport.public_key().serialize()).expect("cant convert"); + let offer = bob + .send_dlc_offer(&contract_input, alice_pubkey, vec![announcement]) + .unwrap(); + loop {} +}