From 64570d2e5be4fc2eedf87d748b75d095ea59f2cc Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 19 Dec 2023 14:19:07 +0100 Subject: [PATCH] Drop `APeerManager` dependency and replace with `process_msgs_callback` Previously, `LiquidityManager` (or at least its internal message queue) was holding a reference to `PeerManager`/`impl APeerManager` in order to be able to trigger message processing after new messages were enqueued. However, this had two major drawbacks: firstly, it introduced an ugly (and hard to resolve) cycle in the type definitions as `PeerManager` depends on `CustomMessageHandler`, which in this case is `LiquidityManager`, itself depending on said `PeerManager` reference. Secondly, it introduced the complex `PeerManager` LDK object to the LSPS implementation which is otherwise not that depedent on LDK's internal types. To resolve these issues, we heere replace the dependency on `PeerManager`/`impl APeerManager` with a simple generic callback that will be called every time new messages are enqueued. --- src/manager.rs | 143 ++++++++++++++++++++++++++----------------- src/message_queue.rs | 41 +++++-------- 2 files changed, 100 insertions(+), 84 deletions(-) diff --git a/src/manager.rs b/src/manager.rs index 355e88f..3de44f4 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -23,7 +23,7 @@ use lightning::chain::{self, BestBlock, Confirm, Filter, Listen}; use lightning::ln::channelmanager::{AChannelManager, ChainParameters}; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ErrorAction, LightningError}; -use lightning::ln::peer_handler::{APeerManager, CustomMessageHandler}; +use lightning::ln::peer_handler::CustomMessageHandler; use lightning::ln::wire::CustomMessageReader; use lightning::sign::EntropySource; use lightning::util::logger::Level; @@ -61,12 +61,11 @@ pub struct LiquidityClientConfig { /// The main interface into LSP functionality. /// -/// Should be used as a [`CustomMessageHandler`] for your -/// [`PeerManager`]'s [`MessageHandler`]. +/// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`]. /// -/// Should provide a reference to your [`PeerManager`] by calling -/// [`LiquidityManager::set_peer_manager`] post construction. This allows the [`LiquidityManager`] to -/// wake the [`PeerManager`] when there are pending messages to be sent. +/// Users should provide a callback to process queued messages via +/// [`LiquidityManager::set_process_msgs_callback`] post construction. This allows the +/// [`LiquidityManager`] to wake the [`PeerManager`] when there are pending messages to be sent. /// /// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface /// [`Event`]'s that likely need to be handled. @@ -78,40 +77,33 @@ pub struct LiquidityClientConfig { /// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler /// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted /// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady -pub struct LiquidityManager< - ES: Deref + Clone, - CM: Deref + Clone, - PM: Deref + Clone, - C: Deref + Clone, -> where +pub struct LiquidityManager +where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { - pending_messages: Arc>, + pending_messages: Arc, pending_events: Arc, request_id_to_method_map: Mutex>, - lsps0_client_handler: LSPS0ClientHandler>>, - lsps0_service_handler: Option>>>, + lsps0_client_handler: LSPS0ClientHandler>, + lsps0_service_handler: Option>>, #[cfg(lsps1)] - lsps1_service_handler: Option>, C>>, + lsps1_service_handler: Option, C>>, #[cfg(lsps1)] - lsps1_client_handler: Option>, C>>, - lsps2_service_handler: Option>>>, - lsps2_client_handler: Option>>>, + lsps1_client_handler: Option, C>>, + lsps2_service_handler: Option>>, + lsps2_client_handler: Option>>, service_config: Option, _client_config: Option, best_block: Option>, _chain_source: Option, } -impl - LiquidityManager +impl LiquidityManager where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { /// Constructor for the [`LiquidityManager`]. @@ -208,14 +200,12 @@ where { } /// Returns a reference to the LSPS0 client-side handler. - pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler>> { + pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler> { &self.lsps0_client_handler } /// Returns a reference to the LSPS0 server-side handler. - pub fn lsps0_service_handler( - &self, - ) -> Option<&LSPS0ServiceHandler>>> { + pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler>> { self.lsps0_service_handler.as_ref() } @@ -223,7 +213,7 @@ where { #[cfg(lsps1)] pub fn lsps1_client_handler( &self, - ) -> Option<&LSPS1ClientHandler>, C>> { + ) -> Option<&LSPS1ClientHandler, C>> { self.lsps1_client_handler.as_ref() } @@ -231,24 +221,83 @@ where { #[cfg(lsps1)] pub fn lsps1_service_handler( &self, - ) -> Option<&LSPS1ServiceHandler>, C>> { + ) -> Option<&LSPS1ServiceHandler, C>> { self.lsps1_service_handler.as_ref() } /// Returns a reference to the LSPS2 client-side handler. pub fn lsps2_client_handler( &self, - ) -> Option<&LSPS2ClientHandler>>> { + ) -> Option<&LSPS2ClientHandler>> { self.lsps2_client_handler.as_ref() } /// Returns a reference to the LSPS2 server-side handler. pub fn lsps2_service_handler( &self, - ) -> Option<&LSPS2ServiceHandler>>> { + ) -> Option<&LSPS2ServiceHandler>> { self.lsps2_service_handler.as_ref() } + /// Allows to set a callback that will be called after new messages are pushed to the message + /// queue. + /// + /// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the + /// message queue. For example: + /// + /// ``` + /// # use lightning::io; + /// # use lightning_liquidity::LiquidityManager; + /// # use std::sync::{Arc, RwLock}; + /// # use std::sync::atomic::{AtomicBool, Ordering}; + /// # use std::time::SystemTime; + /// # struct MyStore {} + /// # impl lightning::util::persist::KVStore for MyStore { + /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } + /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } + /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } + /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } + /// # } + /// # struct MyEntropySource {} + /// # impl lightning::sign::EntropySource for MyEntropySource { + /// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] } + /// # } + /// # struct MyEventHandler {} + /// # impl MyEventHandler { + /// # async fn handle_event(&self, _: lightning::events::Event) {} + /// # } + /// # #[derive(Eq, PartialEq, Clone, Hash)] + /// # struct MySocketDescriptor {} + /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { + /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } + /// # fn disconnect_socket(&mut self) {} + /// # } + /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; + /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; + /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; + /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; + /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; + /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; + /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; + /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; + /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; + /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; + /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; + /// # type MyScorer = RwLock, Arc>>; + /// # type MyLiquidityManager = LiquidityManager, Arc, Arc>; + /// # fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_logger: Arc, my_peer_manager: Arc, my_liquidity_manager: Arc) { + /// let process_msgs_pm = Arc::clone(&my_peer_manager); + /// let process_msgs_callback = move || process_msgs_pm.process_events(); + /// + /// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback); + /// # } + /// ``` + /// + /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events + pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) { + self.pending_messages.set_process_msgs_callback(callback) + } + /// Blocks the current thread until next event is ready and returns it. /// /// Typically you would spawn a thread or task that calls this in a loop. @@ -271,20 +320,6 @@ where { self.pending_events.get_and_clear_pending_events() } - /// Set a [`PeerManager`] reference for all configured message handlers. - /// - /// This allows the message handlers to wake the [`PeerManager`] by calling - /// [`PeerManager::process_events`] after enqueing messages to be sent. - /// - /// Without this the messages will be sent based on whatever polling interval - /// your background processor uses. - /// - /// [`PeerManager`]: lightning::ln::peer_handler::PeerManager - /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events - pub fn set_peer_manager(&self, peer_manager: PM) { - self.pending_messages.set_peer_manager(peer_manager); - } - fn handle_lsps_message( &self, msg: LSPSMessage, sender_node_id: &PublicKey, ) -> Result<(), lightning::ln::msgs::LightningError> { @@ -348,12 +383,11 @@ where { } } -impl - CustomMessageReader for LiquidityManager +impl CustomMessageReader + for LiquidityManager where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { type CustomMessage = RawLSPSMessage; @@ -368,12 +402,11 @@ where } } -impl CustomMessageHandler - for LiquidityManager +impl CustomMessageHandler + for LiquidityManager where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { fn handle_custom_message( @@ -431,12 +464,10 @@ where } } -impl Listen - for LiquidityManager +impl Listen for LiquidityManager where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { fn filtered_block_connected( @@ -472,12 +503,10 @@ where } } -impl Confirm - for LiquidityManager +impl Confirm for LiquidityManager where ES::Target: EntropySource, CM::Target: AChannelManager, - PM::Target: APeerManager, C::Target: Filter, { fn transactions_confirmed( diff --git a/src/message_queue.rs b/src/message_queue.rs index 9cc420e..6ddcb81 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -1,15 +1,11 @@ //! Holds types and traits used to implement message queues for [`LSPSMessage`]s. use crate::lsps0::msgs::LSPSMessage; -use crate::prelude::{Vec, VecDeque}; -use crate::sync::Mutex; - -use lightning::ln::peer_handler::APeerManager; +use crate::prelude::{Box, Vec, VecDeque}; +use crate::sync::{Mutex, RwLock}; use bitcoin::secp256k1::PublicKey; -use core::ops::Deref; - /// Represents a simple message queue that the LSPS message handlers use to send messages to a given counterparty. pub trait MessageQueue { /// Enqueues an LSPS message to be sent to the counterparty with the given node id. @@ -22,45 +18,36 @@ pub trait MessageQueue { /// The default [`MessageQueue`] Implementation used by [`LiquidityManager`]. /// /// [`LiquidityManager`]: crate::LiquidityManager -pub struct DefaultMessageQueue -where - PM::Target: APeerManager, -{ +pub struct DefaultMessageQueue { queue: Mutex>, - peer_manager: Mutex>, + process_msgs_callback: RwLock>>, } -impl DefaultMessageQueue -where - PM::Target: APeerManager, -{ +impl DefaultMessageQueue { pub(crate) fn new() -> Self { let queue = Mutex::new(VecDeque::new()); - let peer_manager = Mutex::new(None); - Self { queue, peer_manager } + let process_msgs_callback = RwLock::new(None); + Self { queue, process_msgs_callback } } - pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> { - self.queue.lock().unwrap().drain(..).collect() + pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) { + *self.process_msgs_callback.write().unwrap() = Some(Box::new(callback)); } - pub(crate) fn set_peer_manager(&self, peer_manager: PM) { - *self.peer_manager.lock().unwrap() = Some(peer_manager); + pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> { + self.queue.lock().unwrap().drain(..).collect() } } -impl MessageQueue for DefaultMessageQueue -where - PM::Target: APeerManager, -{ +impl MessageQueue for DefaultMessageQueue { fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) { { let mut queue = self.queue.lock().unwrap(); queue.push_back((*counterparty_node_id, msg)); } - if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() { - peer_manager.as_ref().process_events(); + if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() { + (process_msgs_callback)() } } }