Skip to content

Commit

Permalink
Lightning Transport v2
Browse files Browse the repository at this point in the history
  • Loading branch information
bennyhodl committed Dec 18, 2024
1 parent f5b0fec commit 36be11f
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 53 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ jobs:
override: true
- name: Check format
run: cargo fmt --check
check:
name: check
runs-on: ubuntu-latest
steps:
- name: Install Protoc
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- name: cargo check
run: cargo check --all-features
integration_tests_prepare:
runs-on: ubuntu-latest
timeout-minutes: 30
Expand Down
7 changes: 6 additions & 1 deletion ddk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@ bitcoincore-rpc = "0.19.0"

[[example]]
name = "lighnting"
path = "examples/ddk.rs"
path = "examples/lightning.rs"
required-features = ["lightning", "kormir", "sled"]

[[example]]
name = "nostr"
path = "examples/nostr.rs"
required-features = ["nostr"]
File renamed without changes.
72 changes: 22 additions & 50 deletions ddk/src/transport/lightning/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport};
use async_trait::async_trait;
use bitcoin::secp256k1::PublicKey;
use lightning_net_tokio::{connect_outbound, setup_inbound};
use std::{sync::Arc, time::Duration};
use tokio::net::TcpListener;
use lightning_net_tokio::connect_outbound;
use std::sync::Arc;
use tokio::sync::watch;

pub(crate) mod peer_manager;
pub use peer_manager::LightningTransport;
Expand All @@ -18,63 +18,35 @@ impl Transport for LightningTransport {
self.node_id
}

/// Creates a TCP listener and accepts incoming connection spawning a tokio thread.
async fn listen(&self) {
let listener = TcpListener::bind(format!("0.0.0.0:{}", self.listening_port))
.await
.expect("Coldn't get port.");

loop {
let peer_mgr = self.peer_manager.clone();
let (tcp_stream, socket) = listener.accept().await.unwrap();
tokio::spawn(async move {
tracing::info!(connection = socket.to_string(), "Received connection.");
setup_inbound(peer_mgr.clone(), tcp_stream.into_std().unwrap()).await;
});
}
}

/// Sends a message to a peer.
///
/// TODO: Assert that we are connected to the peer before sending.
fn send_message(&self, counterparty: PublicKey, message: dlc_messages::Message) {
self.message_handler.send_message(counterparty, message)
tracing::info!(message=?message, "Sending message to {}", counterparty.to_string());
if self.peer_manager.peer_by_node_id(&counterparty).is_some() {
self.message_handler.send_message(counterparty, message)
} else {
tracing::warn!(
pubkey = counterparty.to_string(),
"Not connected to counterparty. Message not sent"
)
}
}

/// Gets and clears the message queue with messages to be processed.
/// Takes the manager to process the DLC messages that are received.
async fn receive_messages<S: Storage, O: Oracle>(
async fn start<S: Storage, O: Oracle>(
&self,
mut stop_signal: watch::Receiver<bool>,
manager: Arc<DlcDevKitDlcManager<S, O>>,
) {
let mut timer = tokio::time::interval(Duration::from_secs(5));
loop {
timer.tick().await;
let messages = self.message_handler.get_and_clear_received_messages();

for (counter_party, message) in messages {
tracing::info!(
counter_party = counter_party.to_string(),
"Processing DLC message"
);
) -> Result<(), anyhow::Error> {
let listen_handle = self.listen(stop_signal.clone());

match manager.on_dlc_message(&message, counter_party).await {
Err(e) => {
tracing::error!(error =? e, "On message error.")
}
Ok(contract) => {
if let Some(msg) = contract {
tracing::info!("Responding to message received.");
tracing::debug!(message=?msg);
self.message_handler.send_message(counter_party, msg);
}
}
};
}
let process_handle = self.process_messages(stop_signal.clone(), manager.clone());

if self.message_handler.has_pending_messages() {
self.peer_manager.process_events()
}
// Wait for either task to complete or stop signal
tokio::select! {
_ = stop_signal.changed() => Ok(()),
res = listen_handle => res?,
res = process_handle => res?,
}
}

Expand Down
223 changes: 221 additions & 2 deletions ddk/src/transport/lightning/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ use lightning::{
sign::{KeysManager, NodeSigner},
util::logger::{Logger, Record},
};
use lightning_net_tokio::SocketDescriptor;
use std::{sync::Arc, time::SystemTime};
use lightning_net_tokio::{setup_inbound, SocketDescriptor};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{net::TcpListener, sync::watch, task::JoinHandle, time::interval};

use crate::{ddk::DlcDevKitDlcManager, Oracle, Storage};

pub struct DlcDevKitLogger;

Expand Down Expand Up @@ -77,4 +83,217 @@ impl LightningTransport {
listening_port,
})
}

pub fn listen(
&self,
stop_signal: watch::Receiver<bool>,
) -> JoinHandle<Result<(), anyhow::Error>> {
let listening_port = self.listening_port;
let mut listen_stop = stop_signal.clone();
let peer_manager = Arc::clone(&self.peer_manager);
tokio::spawn(async move {
let listener = TcpListener::bind(format!("0.0.0.0:{}", listening_port))
.await
.expect("Coldn't get port.");

tracing::info!(
addr =? listener.local_addr().unwrap(),
"Starting lightning peer manager listener."
);
loop {
tokio::select! {
_ = listen_stop.changed() => {
if *listen_stop.borrow() {
tracing::warn!("Stop signal for lightning connection manager.");
break;
}
},
accept_result = listener.accept() => {
match accept_result {
Ok((tcp_stream, socket)) => {
let peer_mgr = Arc::clone(&peer_manager);
tokio::spawn(async move {
tracing::info!(
connection = socket.to_string(),
"Received connection."
);
setup_inbound(peer_mgr, tcp_stream.into_std().unwrap()).await;
});
}
Err(e) => {
tracing::error!("Error accepting connection: {}", e);
}
}
}
}
}
Ok::<_, anyhow::Error>(())
})
}

pub fn process_messages<S: Storage, O: Oracle>(
&self,
stop_signal: watch::Receiver<bool>,
manager: Arc<DlcDevKitDlcManager<S, O>>,
) -> JoinHandle<Result<(), anyhow::Error>> {
let mut message_stop = stop_signal.clone();
let message_manager = Arc::clone(&manager);
// let peer_manager = Arc::clone(&self.peer_manager);
let message_handler = Arc::clone(&self.message_handler);
tokio::spawn(async move {
let mut message_interval = interval(Duration::from_secs(5));
// let mut event_interval = interval(Duration::from_secs(2));
loop {
tokio::select! {
_ = message_stop.changed() => {
if *message_stop.borrow() {
tracing::warn!("Stop signal for lightning message processor.");
break;
}
},
_ = message_interval.tick() => {
let messages = message_handler.get_and_clear_received_messages();
for (counter_party, message) in messages {
tracing::info!(
counter_party = counter_party.to_string(),
"Processing DLC message"
);
match message_manager.on_dlc_message(&message, counter_party).await {
Ok(Some(response)) => {
message_handler.send_message(counter_party, response);
}
Ok(None) => (),
Err(e) => {
tracing::error!(
error=e.to_string(),
counterparty=counter_party.to_string(),
message=?message,
"Could not process dlc message."
);
}
}
}
}
// _ = event_interval.tick() => {
// peer_manager.process_events()
// }
}
}
Ok::<_, anyhow::Error>(())
})
}
}

#[cfg(test)]
mod tests {
use ddk_manager::Storage;
use dlc_messages::{Message, OfferDlc};

use crate::{
builder::Builder, oracle::memory::MemoryOracle, storage::memory::MemoryStorage, DlcDevKit,
Transport,
};

use super::*;

fn get_offer() -> OfferDlc {
let offer_string = include_str!("../../../../ddk-manager/test_inputs/offer_contract.json");
let offer: OfferDlc = serde_json::from_str(&offer_string).unwrap();
offer
}

fn create_peer_manager(listening_port: u16) -> (LightningTransport, PublicKey) {
let mut seed = [0u8; 32];
seed.try_fill(&mut bitcoin::key::rand::thread_rng())
.unwrap();
let peer_manager = LightningTransport::new(&seed, listening_port).unwrap();
let pubkey = peer_manager.node_id.clone();
(peer_manager, pubkey)
}

async fn manager(
listening_port: u16,
) -> DlcDevKit<LightningTransport, MemoryStorage, MemoryOracle> {
let mut seed_bytes = [0u8; 32];
seed_bytes
.try_fill(&mut bitcoin::key::rand::thread_rng())
.unwrap();

let transport = Arc::new(LightningTransport::new(&seed_bytes, listening_port).unwrap());
let storage = Arc::new(MemoryStorage::new());
let oracle_client = Arc::new(MemoryOracle::default());

let mut builder = Builder::new();
builder.set_seed_bytes(seed_bytes);
builder.set_transport(transport.clone());
builder.set_storage(storage.clone());
builder.set_oracle(oracle_client.clone());
builder.finish().await.unwrap()
}

#[test_log::test(tokio::test)]
async fn send_offer() {
let alice = manager(1776).await;
let alice_pk = alice.transport.public_key();
let bob = manager(1777).await;
let _bob_pk = bob.transport.public_key();

bob.start().unwrap();
alice.start().unwrap();

bob.transport
.connect_outbound(alice_pk, "127.0.0.1:1776")
.await;

let mut connected = false;
let mut retries = 0;

while !connected {
if retries > 10 {
bob.stop().unwrap();
alice.stop().unwrap();
panic!("Bob could not connect to alice.")
}
if bob
.transport
.peer_manager
.peer_by_node_id(&alice_pk)
.is_some()
{
connected = true
}
retries += 1;
tokio::time::sleep(Duration::from_millis(100)).await
}

let offer = get_offer();
bob.transport
.send_message(alice_pk, Message::Offer(offer.clone()));

let mut connected = false;
let mut retries = 0;

while !connected {
if retries > 10 {
bob.stop().unwrap();
alice.stop().unwrap();
panic!("Contract was not offered to alice")
}
if bob
.storage
.get_contract(&offer.temporary_contract_id)
.unwrap()
.is_some()
{
connected = true
}
retries += 1;
tokio::time::sleep(Duration::from_millis(500)).await
}

bob.stop().unwrap();
alice.stop().unwrap();
assert!(true)
// alice.0.send_message(bob.1, Message::Offer(offer));
}
}
Loading

0 comments on commit 36be11f

Please sign in to comment.