Skip to content
This repository has been archived by the owner on Jan 6, 2025. It is now read-only.

Drop APeerManager dependency and replace with process_msgs_callback #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 86 additions & 57 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ErrorAction, LightningError};
use lightning::ln::peer_handler::{APeerManager, CustomMessageHandler};
use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::ln::wire::CustomMessageReader;
use lightning::sign::EntropySource;
use lightning::util::logger::Level;
Expand Down Expand Up @@ -61,12 +61,11 @@ pub struct LiquidityClientConfig {

/// The main interface into LSP functionality.
///
/// Should be used as a [`CustomMessageHandler`] for your
/// [`PeerManager`]'s [`MessageHandler`].
/// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`].
///
/// Should provide a reference to your [`PeerManager`] by calling
/// [`LiquidityManager::set_peer_manager`] post construction. This allows the [`LiquidityManager`] to
/// wake the [`PeerManager`] when there are pending messages to be sent.
/// Users should provide a callback to process queued messages via
/// [`LiquidityManager::set_process_msgs_callback`] post construction. This allows the
/// [`LiquidityManager`] to wake the [`PeerManager`] when there are pending messages to be sent.
///
/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface
/// [`Event`]'s that likely need to be handled.
Expand All @@ -78,40 +77,33 @@ pub struct LiquidityClientConfig {
/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
pub struct LiquidityManager<
ES: Deref + Clone,
CM: Deref + Clone,
PM: Deref + Clone,
C: Deref + Clone,
> where
pub struct LiquidityManager<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
pending_messages: Arc<DefaultMessageQueue<PM>>,
pending_messages: Arc<DefaultMessageQueue>,
pending_events: Arc<EventQueue>,
request_id_to_method_map: Mutex<HashMap<String, String>>,
lsps0_client_handler: LSPS0ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>,
lsps0_service_handler: Option<LSPS0ServiceHandler<Arc<DefaultMessageQueue<PM>>>>,
lsps0_client_handler: LSPS0ClientHandler<ES, Arc<DefaultMessageQueue>>,
lsps0_service_handler: Option<LSPS0ServiceHandler<Arc<DefaultMessageQueue>>>,
#[cfg(lsps1)]
lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>>,
lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue>, C>>,
#[cfg(lsps1)]
lsps1_client_handler: Option<LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>>,
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue<PM>>>>,
lsps2_client_handler: Option<LSPS2ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>>,
lsps1_client_handler: Option<LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue>, C>>,
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue>>>,
lsps2_client_handler: Option<LSPS2ClientHandler<ES, Arc<DefaultMessageQueue>>>,
service_config: Option<LiquidityServiceConfig>,
_client_config: Option<LiquidityClientConfig>,
best_block: Option<RwLock<BestBlock>>,
_chain_source: Option<C>,
}

impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone>
LiquidityManager<ES, CM, PM, C>
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> LiquidityManager<ES, CM, C>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
/// Constructor for the [`LiquidityManager`].
Expand Down Expand Up @@ -208,47 +200,104 @@ where {
}

/// Returns a reference to the LSPS0 client-side handler.
pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, Arc<DefaultMessageQueue<PM>>> {
pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, Arc<DefaultMessageQueue>> {
&self.lsps0_client_handler
}

/// Returns a reference to the LSPS0 server-side handler.
pub fn lsps0_service_handler(
&self,
) -> Option<&LSPS0ServiceHandler<Arc<DefaultMessageQueue<PM>>>> {
pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler<Arc<DefaultMessageQueue>>> {
self.lsps0_service_handler.as_ref()
}

/// Returns a reference to the LSPS1 client-side handler.
#[cfg(lsps1)]
pub fn lsps1_client_handler(
&self,
) -> Option<&LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>> {
) -> Option<&LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue>, C>> {
self.lsps1_client_handler.as_ref()
}

/// Returns a reference to the LSPS1 server-side handler.
#[cfg(lsps1)]
pub fn lsps1_service_handler(
&self,
) -> Option<&LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>> {
) -> Option<&LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue>, C>> {
self.lsps1_service_handler.as_ref()
}

/// Returns a reference to the LSPS2 client-side handler.
pub fn lsps2_client_handler(
&self,
) -> Option<&LSPS2ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>> {
) -> Option<&LSPS2ClientHandler<ES, Arc<DefaultMessageQueue>>> {
self.lsps2_client_handler.as_ref()
}

/// Returns a reference to the LSPS2 server-side handler.
pub fn lsps2_service_handler(
&self,
) -> Option<&LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue<PM>>>> {
) -> Option<&LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue>>> {
self.lsps2_service_handler.as_ref()
}

/// Allows to set a callback that will be called after new messages are pushed to the message
/// queue.
///
/// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
/// message queue. For example:
///
/// ```
/// # use lightning::io;
/// # use lightning_liquidity::LiquidityManager;
/// # use std::sync::{Arc, RwLock};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::time::SystemTime;
/// # struct MyStore {}
/// # impl lightning::util::persist::KVStore for MyStore {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
/// # struct MyEntropySource {}
/// # impl lightning::sign::EntropySource for MyEntropySource {
/// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
/// # }
/// # struct MyEventHandler {}
/// # impl MyEventHandler {
/// # async fn handle_event(&self, _: lightning::events::Event) {}
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct MySocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
/// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
/// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
/// let process_msgs_pm = Arc::clone(&my_peer_manager);
/// let process_msgs_callback = move || process_msgs_pm.process_events();
///
/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
/// # }
/// ```
///
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
self.pending_messages.set_process_msgs_callback(callback)
}
tnull marked this conversation as resolved.
Show resolved Hide resolved

/// Blocks the current thread until next event is ready and returns it.
///
/// Typically you would spawn a thread or task that calls this in a loop.
Expand All @@ -271,20 +320,6 @@ where {
self.pending_events.get_and_clear_pending_events()
}

/// Set a [`PeerManager`] reference for all configured message handlers.
///
/// This allows the message handlers to wake the [`PeerManager`] by calling
/// [`PeerManager::process_events`] after enqueing messages to be sent.
///
/// Without this the messages will be sent based on whatever polling interval
/// your background processor uses.
///
/// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
pub fn set_peer_manager(&self, peer_manager: PM) {
self.pending_messages.set_peer_manager(peer_manager);
}

fn handle_lsps_message(
&self, msg: LSPSMessage, sender_node_id: &PublicKey,
) -> Result<(), lightning::ln::msgs::LightningError> {
Expand Down Expand Up @@ -348,12 +383,11 @@ where {
}
}

impl<ES: Deref + Clone + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone>
CustomMessageReader for LiquidityManager<ES, CM, PM, C>
impl<ES: Deref + Clone + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageReader
for LiquidityManager<ES, CM, C>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
type CustomMessage = RawLSPSMessage;
Expand All @@ -368,12 +402,11 @@ where
}
}

impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
for LiquidityManager<ES, CM, PM, C>
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
for LiquidityManager<ES, CM, C>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
fn handle_custom_message(
Expand Down Expand Up @@ -431,12 +464,10 @@ where
}
}

impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> Listen
for LiquidityManager<ES, CM, PM, C>
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Listen for LiquidityManager<ES, CM, C>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
fn filtered_block_connected(
Expand Down Expand Up @@ -472,12 +503,10 @@ where
}
}

impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> Confirm
for LiquidityManager<ES, CM, PM, C>
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Confirm for LiquidityManager<ES, CM, C>
where
ES::Target: EntropySource,
CM::Target: AChannelManager,
PM::Target: APeerManager,
C::Target: Filter,
{
fn transactions_confirmed(
Expand Down
41 changes: 14 additions & 27 deletions src/message_queue.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
//! Holds types and traits used to implement message queues for [`LSPSMessage`]s.

use crate::lsps0::msgs::LSPSMessage;
use crate::prelude::{Vec, VecDeque};
use crate::sync::Mutex;

use lightning::ln::peer_handler::APeerManager;
use crate::prelude::{Box, Vec, VecDeque};
use crate::sync::{Mutex, RwLock};

use bitcoin::secp256k1::PublicKey;

use core::ops::Deref;

/// Represents a simple message queue that the LSPS message handlers use to send messages to a given counterparty.
pub trait MessageQueue {
/// Enqueues an LSPS message to be sent to the counterparty with the given node id.
Expand All @@ -22,45 +18,36 @@ pub trait MessageQueue {
/// The default [`MessageQueue`] Implementation used by [`LiquidityManager`].
///
/// [`LiquidityManager`]: crate::LiquidityManager
pub struct DefaultMessageQueue<PM: Deref>
where
PM::Target: APeerManager,
{
pub struct DefaultMessageQueue {
queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
peer_manager: Mutex<Option<PM>>,
process_msgs_callback: RwLock<Option<Box<dyn Fn() + Send + Sync + 'static>>>,
}

impl<PM: Deref> DefaultMessageQueue<PM>
where
PM::Target: APeerManager,
{
impl DefaultMessageQueue {
pub(crate) fn new() -> Self {
let queue = Mutex::new(VecDeque::new());
let peer_manager = Mutex::new(None);
Self { queue, peer_manager }
let process_msgs_callback = RwLock::new(None);
Self { queue, process_msgs_callback }
}

pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
self.queue.lock().unwrap().drain(..).collect()
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
}

pub(crate) fn set_peer_manager(&self, peer_manager: PM) {
*self.peer_manager.lock().unwrap() = Some(peer_manager);
pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
self.queue.lock().unwrap().drain(..).collect()
}
}

impl<PM: Deref> MessageQueue for DefaultMessageQueue<PM>
where
PM::Target: APeerManager,
{
impl MessageQueue for DefaultMessageQueue {
fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
{
let mut queue = self.queue.lock().unwrap();
queue.push_back((*counterparty_node_id, msg));
}

if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() {
peer_manager.as_ref().process_events();
if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
(process_msgs_callback)()
}
}
}
Loading