diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 732ccd27e88..1b255791f7c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -44,7 +44,7 @@ use lightning::util::errors::APIError; use lightning::util::events; use lightning::util::logger::Logger; use lightning::util::config::UserConfig; -use lightning::util::events::{EventsProvider, MessageSendEventsProvider}; +use lightning::util::events::MessageSendEventsProvider; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use lightning::routing::router::{Route, RouteHop}; diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index f31ec7f8969..52b37a10f23 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError; use lightning::routing::router::get_route; use lightning::routing::network_graph::NetGraphMsgHandler; use lightning::util::config::UserConfig; -use lightning::util::events::{EventsProvider,Event}; +use lightning::util::events::Event; use lightning::util::enforcing_trait_impls::EnforcingSigner; use lightning::util::logger::Logger; use lightning::util::ser::Readable; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index de9aa286a62..69aa2ea0047 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -10,10 +10,13 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +use lightning::chain::chainmonitor::ChainMonitor; +use lightning::chain::channelmonitor; use lightning::chain::keysinterface::{Sign, KeysInterface}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{PeerManager, SocketDescriptor}; +use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -101,25 +104,31 @@ impl BackgroundProcessor { /// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager pub fn start< Signer: 'static + Sign, - M: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, K: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, - CMP: 'static + Send + ChannelManagerPersister, - CM: 'static + Deref> + Send + Sync, + EH: 'static + EventHandler + Send + Sync, + CMP: 'static + Send + ChannelManagerPersister, + M: 'static + Deref> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PM: 'static + Deref> + Send + Sync, > - (handler: CMP, channel_manager: CM, peer_manager: PM, logger: L) -> Self + (persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM, peer_manager: PM, logger: L) -> Self where - M::Target: 'static + chain::Watch, + CF::Target: 'static + chain::Filter, + CW::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, + P::Target: 'static + channelmonitor::Persist, CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, { @@ -129,10 +138,12 @@ impl BackgroundProcessor { let mut current_time = Instant::now(); loop { peer_manager.process_events(); + channel_manager.process_pending_events(&event_handler); + chain_monitor.process_pending_events(&event_handler); let updates_available = channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); if updates_available { - handler.persist_manager(&*channel_manager)?; + persister.persist_manager(&*channel_manager)?; } // Exit the loop if the background processor was requested to stop. if stop_thread.load(Ordering::Acquire) == true { @@ -159,13 +170,14 @@ impl BackgroundProcessor { #[cfg(test)] mod tests { + use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; - use lightning::chain; - use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; + use lightning::chain::Confirm; use lightning::chain::chainmonitor; - use lightning::chain::keysinterface::{Sign, InMemorySigner, KeysInterface, KeysManager}; + use lightning::chain::channelmonitor::ANTI_REORG_DELAY; + use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; use lightning::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, SimpleArcChannelManager}; @@ -173,8 +185,7 @@ mod tests { use lightning::ln::msgs::ChannelMessageHandler; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor}; use lightning::util::config::UserConfig; - use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent}; - use lightning::util::logger::Logger; + use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning_persister::FilesystemPersister; @@ -182,7 +193,9 @@ mod tests { use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; - use super::BackgroundProcessor; + use super::{BackgroundProcessor, FRESHNESS_TIMER}; + + const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; #[derive(Clone, Eq, Hash, PartialEq)] struct TestDescriptor{} @@ -199,8 +212,11 @@ mod tests { struct Node { node: Arc>, peer_manager: Arc, Arc, Arc>>, + chain_monitor: Arc, persister: Arc, + tx_broadcaster: Arc, logger: Arc, + best_block: BestBlock, } impl Drop for Node { @@ -232,27 +248,39 @@ mod tests { let now = Duration::from_secs(genesis_block(network).header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); - let params = ChainParameters { - network, - best_block: BestBlock::from_genesis(network), - }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params)); + let best_block = BestBlock::from_genesis(network); + let params = ChainParameters { network, best_block }; + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone())); - let node = Node { node: manager, peer_manager, persister, logger }; + let node = Node { node: manager, peer_manager, chain_monitor, persister, tx_broadcaster, logger, best_block }; nodes.push(node); } nodes } macro_rules! open_channel { + ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ + begin_open_channel!($node_a, $node_b, $channel_value); + let events = $node_a.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value); + end_open_channel!($node_a, $node_b, temporary_channel_id, tx); + tx + }} + } + + macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); - let events = $node_a.node.get_and_clear_pending_events(); - assert_eq!(events.len(), 1); - let (temporary_channel_id, tx) = match events[0] { + }} + } + + macro_rules! handle_funding_generation_ready { + ($event: expr, $channel_value: expr) => {{ + match $event { Event::FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => { assert_eq!(*channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); @@ -263,15 +291,39 @@ mod tests { (*temporary_channel_id, tx) }, _ => panic!("Unexpected event"), - }; + } + }} + } - $node_a.node.funding_transaction_generated(&temporary_channel_id, tx.clone()).unwrap(); + macro_rules! end_open_channel { + ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{ + $node_a.node.funding_transaction_generated(&$temporary_channel_id, $tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); - tx }} } + fn confirm_transaction(node: &mut Node, tx: &Transaction) { + for i in 1..=ANTI_REORG_DELAY { + let prev_blockhash = node.best_block.block_hash(); + let height = node.best_block.height() + 1; + let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 }; + let txdata = vec![(0, tx)]; + node.best_block = BestBlock::new(header.block_hash(), height); + match i { + 1 => { + node.node.transactions_confirmed(&header, &txdata, height); + node.chain_monitor.transactions_confirmed(&header, &txdata, height); + }, + ANTI_REORG_DELAY => { + node.node.best_block_updated(&header, height); + node.chain_monitor.best_block_updated(&header, height); + }, + _ => {}, + } + } + } + #[test] fn test_background_processor() { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with @@ -279,13 +331,16 @@ mod tests { // re-persistence and is successfully re-persisted. let nodes = create_nodes(2, "test_background_processor".to_string()); + // Go through the channel creation process so that each node has something to persist. Since + // open_channel consumes events, it must complete before starting BackgroundProcessor to + // avoid a race with processing events. + let tx = open_channel!(nodes[0], nodes[1], 100000); + // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); - let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); - - // Go through the channel creation process until each node should have something persisted. - let tx = open_channel!(nodes[0], nodes[1], 100000); + let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); macro_rules! check_persisted_data { ($node: expr, $filepath: expr, $expected_bytes: expr) => { @@ -336,8 +391,9 @@ mod tests { // `FRESHNESS_TIMER`. let nodes = create_nodes(1, "test_timer_tick_called".to_string()); let data_dir = nodes[0].persister.get_data_dir(); - let callback = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); - let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let persister = move |node: &ChannelManager, Arc, Arc, Arc, Arc>| FilesystemPersister::persist_manager(data_dir.clone(), node); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred".to_string(); @@ -352,21 +408,61 @@ mod tests { #[test] fn test_persist_error() { // Test that if we encounter an error during manager persistence, the thread panics. - fn persist_manager(_data: &ChannelManager, Arc, Arc, Arc, Arc>) -> Result<(), std::io::Error> - where Signer: 'static + Sign, - M: 'static + chain::Watch, - T: 'static + BroadcasterInterface, - K: 'static + KeysInterface, - F: 'static + FeeEstimator, - L: 'static + Logger, - { - Err(std::io::Error::new(std::io::ErrorKind::Other, "test")) - } - let nodes = create_nodes(2, "test_persist_error".to_string()); - let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); open_channel!(nodes[0], nodes[1], 100000); + let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test")); + let event_handler = |_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test"); } + + #[test] + fn test_background_event_handling() { + let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); + let channel_value = 100000; + let data_dir = nodes[0].persister.get_data_dir(); + let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node); + + // Set up a background event handler for FundingGenerationReady events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event| { + sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); + }; + let bg_processor = BackgroundProcessor::start(persister.clone(), event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + + // Open a channel and check that the FundingGenerationReady event was handled. + begin_open_channel!(nodes[0], nodes[1], channel_value); + let (temporary_channel_id, funding_tx) = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("FundingGenerationReady not handled within deadline"); + end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx); + + // Confirm the funding transaction. + confirm_transaction(&mut nodes[0], &funding_tx); + confirm_transaction(&mut nodes[1], &funding_tx); + nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())); + nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())); + + assert!(bg_processor.stop().is_ok()); + + // Set up a background event handler for SpendableOutputs events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event| sender.send(event).unwrap(); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + + // Force close the channel and check that the SpendableOutputs event was handled. + nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); + let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap(); + confirm_transaction(&mut nodes[0], &commitment_tx); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("SpendableOutputs not handled within deadline"); + match event { + Event::SpendableOutputs { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } + + assert!(bg_processor.stop().is_ok()); + } } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 6e2a0c22eef..d102778a460 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -12,21 +12,19 @@ //! //! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a //! TcpStream and a reference to a PeerManager and the rest is handled", except for the -//! [Event](../lightning/util/events/enum.Event.html) handlng mechanism, see below. +//! [Event](../lightning/util/events/enum.Event.html) handling mechanism; see example below. //! //! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use //! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor. //! -//! Three methods are exposed to register a new connection for handling in tokio::spawn calls, see -//! their individual docs for more. All three take a -//! [mpsc::Sender<()>](../tokio/sync/mpsc/struct.Sender.html) which is sent into every time -//! something occurs which may result in lightning [Events](../lightning/util/events/enum.Event.html). -//! The call site should, thus, look something like this: +//! Three methods are exposed to register a new connection for handling in tokio::spawn calls; see +//! their individual docs for details. +//! +//! # Example //! ``` -//! use tokio::sync::mpsc; //! use std::net::TcpStream; //! use bitcoin::secp256k1::key::PublicKey; -//! use lightning::util::events::EventsProvider; +//! use lightning::util::events::{Event, EventHandler, EventsProvider}; //! use std::net::SocketAddr; //! use std::sync::Arc; //! @@ -43,32 +41,30 @@ //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { -//! let (sender, mut receiver) = mpsc::channel(2); -//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await; -//! loop { -//! receiver.recv().await; -//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! } +//! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; +//! loop { +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! chain_monitor.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! } //! } //! //! // Begin reading from a newly accepted socket and talk to the peer: //! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, socket: TcpStream) { -//! let (sender, mut receiver) = mpsc::channel(2); -//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket); -//! loop { -//! receiver.recv().await; -//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { -//! // Handle the event! -//! } -//! } +//! lightning_net_tokio::setup_inbound(peer_manager, socket); +//! loop { +//! channel_manager.await_persistable_update(); +//! channel_manager.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! chain_monitor.process_pending_events(&|event| { +//! // Handle the event! +//! }); +//! } //! } //! ``` @@ -90,7 +86,7 @@ use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; use std::net::TcpStream as StdTcpStream; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use std::hash::Hash; @@ -102,7 +98,6 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0); /// read future (which is returned by schedule_read). struct Connection { writer: Option>, - event_notify: mpsc::Sender<()>, // Because our PeerManager is templated by user-provided types, and we can't (as far as I can // tell) have a const RawWakerVTable built out of templated functions, we need some indirection // between being woken up with write-ready and calling PeerManager::write_buffer_space_avail. @@ -129,21 +124,10 @@ struct Connection { id: u64, } impl Connection { - fn event_trigger(us: &mut MutexGuard) { - match us.event_notify.try_send(()) { - Ok(_) => {}, - Err(mpsc::error::TrySendError::Full(_)) => { - // Ignore full errors as we just need the user to poll after this point, so if they - // haven't received the last send yet, it doesn't matter. - }, - _ => panic!() - } - } async fn schedule_read(peer_manager: Arc, Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where CMH: ChannelMessageHandler + 'static, RMH: RoutingMessageHandler + 'static, L: Logger + 'static + ?Sized { - let peer_manager_ref = peer_manager.clone(); // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -201,7 +185,6 @@ impl Connection { if pause_read { us_lock.read_paused = true; } - Self::event_trigger(&mut us_lock); }, Err(e) => shutdown_socket!(e, Disconnect::CloseConnection), } @@ -210,6 +193,7 @@ impl Connection { Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected), }, } + peer_manager.process_events(); }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { @@ -217,12 +201,12 @@ impl Connection { let _ = writer.shutdown().await; } if let Disconnect::PeerDisconnected = disconnect_type { - peer_manager_ref.socket_disconnected(&our_descriptor); - Self::event_trigger(&mut us.lock().unwrap()); + peer_manager.socket_disconnected(&our_descriptor); + peer_manager.process_events(); } } - fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { + fn new(stream: StdTcpStream) -> (io::ReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc>) { // We only ever need a channel of depth 1 here: if we returned a non-full write to the // PeerManager, we will eventually get notified that there is room in the socket to write // new bytes, which will generate an event. That event will be popped off the queue before @@ -238,7 +222,7 @@ impl Connection { (reader, write_receiver, read_receiver, Arc::new(Mutex::new(Self { - writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false, + writer: Some(writer), write_avail, read_waker, read_paused: false, block_disconnect_socket: false, rl_requested_disconnect: false, id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) }))) @@ -251,13 +235,11 @@ impl Connection { /// The returned future will complete when the peer is disconnected and associated handling /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do /// not need to poll the provided future in order to make progress. -/// -/// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_inbound(peer_manager: Arc, Arc, Arc>>, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync { - let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream); + let (reader, write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -293,13 +275,11 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where +pub fn setup_outbound(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync { - let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream); + let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -365,14 +345,12 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc, Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where +pub async fn connect_outbound(peer_manager: Arc, Arc, Arc>>, their_node_id: PublicKey, addr: SocketAddr) -> Option> where CMH: ChannelMessageHandler + 'static + Send + Sync, RMH: RoutingMessageHandler + 'static + Send + Sync, L: Logger + 'static + ?Sized + Send + Sync { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { - Some(setup_outbound(peer_manager, event_notify, their_node_id, stream)) + Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } } @@ -634,9 +612,8 @@ mod tests { (std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0) } else { panic!("Failed to bind to v4 localhost on common ports"); }; - let (sender, _receiver) = mpsc::channel(2); - let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); - let fut_b = super::setup_inbound(b_manager, sender, conn_b); + let fut_a = super::setup_outbound(Arc::clone(&a_manager), b_pub, conn_a); + let fut_b = super::setup_inbound(b_manager, conn_b); tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index a9b570d523d..8e6ddc76c49 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::logger::Logger; use util::events; -use util::events::Event; +use util::events::EventHandler; use std::collections::{HashMap, hash_map}; use std::sync::RwLock; @@ -139,6 +139,15 @@ where C::Target: chain::Filter, persister, } } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + use util::events::EventsProvider; + let events = std::cell::RefCell::new(Vec::new()); + let event_handler = |event| events.borrow_mut().push(event); + self.process_pending_events(&event_handler); + events.into_inner() + } } impl @@ -306,12 +315,20 @@ impl even L::Target: Logger, P::Target: channelmonitor::Persist, { - fn get_and_clear_pending_events(&self) -> Vec { + /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity. + /// + /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in + /// order to handle these events. + /// + /// [`SpendableOutputs`]: events::Event::SpendableOutputs + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); for monitor in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor.get_and_clear_pending_events()); } - pending_events + for event in pending_events.drain(..) { + handler.handle_event(event); + } } } @@ -320,7 +337,6 @@ mod tests { use ::{check_added_monitors, get_local_commitment_txn}; use ln::features::InitFeatures; use ln::functional_test_utils::*; - use util::events::EventsProvider; use util::events::MessageSendEventsProvider; use util::test_utils::{OnRegisterOutput, TxOutReference}; diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8526ce51722..819811156ac 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -222,11 +222,11 @@ pub(crate) const CLTV_CLAIM_BUFFER: u32 = 18; pub(crate) const LATENCY_GRACE_PERIOD_BLOCKS: u32 = 3; /// Number of blocks we wait on seeing a HTLC output being solved before we fail corresponding inbound /// HTLCs. This prevents us from failing backwards and then getting a reorg resulting in us losing money. -/// We use also this delay to be sure we can remove our in-flight claim txn from bump candidates buffer. -/// It may cause spurrious generation of bumped claim txn but that's allright given the outpoint is already -/// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not -/// keeping bumping another claim tx to solve the outpoint. -pub(crate) const ANTI_REORG_DELAY: u32 = 6; +// We also use this delay to be sure we can remove our in-flight claim txn from bump candidates buffer. +// It may cause spurious generation of bumped claim txn but that's alright given the outpoint is already +// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not +// keep bumping another claim tx to solve the outpoint. +pub const ANTI_REORG_DELAY: u32 = 6; /// Number of blocks before confirmation at which we fail back an un-relayed HTLC or at which we /// refuse to accept a new HTLC. /// diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 79bbfadc768..2e758aa48c8 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -27,7 +27,7 @@ use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler}; use routing::router::get_route; use util::config::UserConfig; use util::enforcing_trait_impls::EnforcingSigner; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::ser::{ReadableArgs, Writeable}; diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 190fb2bc041..0662264e684 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,7 +54,7 @@ use ln::onion_utils; use ln::msgs::{ChannelMessageHandler, DecodeError, LightningError, OptionalField}; use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner}; use util::config::UserConfig; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; use util::{byte_utils, events}; use util::ser::{Readable, ReadableArgs, MaybeReadable, Writeable, Writer}; use util::chacha20::{ChaCha20, ChaChaReader}; @@ -62,6 +62,7 @@ use util::logger::Logger; use util::errors::APIError; use core::{cmp, mem}; +use std::cell::RefCell; use std::collections::{HashMap, hash_map, HashSet}; use std::io::{Cursor, Read}; use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; @@ -1860,6 +1861,8 @@ impl ChannelMana /// Note that this includes RBF or similar transaction replacement strategies - lightning does /// not currently support replacing a funding transaction on an existing channel. Instead, /// create a new channel with a conflicting funding transaction. + /// + /// [`Event::FundingGenerationReady`]: crate::util::events::Event::FundingGenerationReady pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], funding_transaction: Transaction) -> Result<(), APIError> { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); @@ -3449,60 +3452,66 @@ impl ChannelMana } } - /// Process pending events from the `chain::Watch`. - fn process_pending_monitor_events(&self) { + /// Process pending events from the `chain::Watch`, returning whether any events were processed. + fn process_pending_monitor_events(&self) -> bool { let mut failed_channels = Vec::new(); - { - for monitor_event in self.chain_monitor.release_pending_monitor_events() { - match monitor_event { - MonitorEvent::HTLCEvent(htlc_update) => { - if let Some(preimage) = htlc_update.payment_preimage { - log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); - self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); - } else { - log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0)); - self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); + let pending_monitor_events = self.chain_monitor.release_pending_monitor_events(); + let has_pending_monitor_events = !pending_monitor_events.is_empty(); + for monitor_event in pending_monitor_events { + match monitor_event { + MonitorEvent::HTLCEvent(htlc_update) => { + if let Some(preimage) = htlc_update.payment_preimage { + log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0)); + self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage); + } else { + log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0)); + self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); + } + }, + MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => { + let mut channel_lock = self.channel_state.lock().unwrap(); + let channel_state = &mut *channel_lock; + let by_id = &mut channel_state.by_id; + let short_to_id = &mut channel_state.short_to_id; + let pending_msg_events = &mut channel_state.pending_msg_events; + if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) { + if let Some(short_id) = chan.get_short_channel_id() { + short_to_id.remove(&short_id); } - }, - MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => { - let mut channel_lock = self.channel_state.lock().unwrap(); - let channel_state = &mut *channel_lock; - let by_id = &mut channel_state.by_id; - let short_to_id = &mut channel_state.short_to_id; - let pending_msg_events = &mut channel_state.pending_msg_events; - if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) { - if let Some(short_id) = chan.get_short_channel_id() { - short_to_id.remove(&short_id); - } - failed_channels.push(chan.force_shutdown(false)); - if let Ok(update) = self.get_channel_update(&chan) { - pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { - msg: update - }); - } - pending_msg_events.push(events::MessageSendEvent::HandleError { - node_id: chan.get_counterparty_node_id(), - action: msgs::ErrorAction::SendErrorMessage { - msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } - }, + failed_channels.push(chan.force_shutdown(false)); + if let Ok(update) = self.get_channel_update(&chan) { + pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { + msg: update }); } - }, - } + pending_msg_events.push(events::MessageSendEvent::HandleError { + node_id: chan.get_counterparty_node_id(), + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() } + }, + }); + } + }, } } for failure in failed_channels.drain(..) { self.finish_force_close_channel(failure); } + + has_pending_monitor_events } /// Check the holding cell in each channel and free any pending HTLCs in them if possible. + /// Returns whether there were any updates such as if pending HTLCs were freed or a monitor + /// update was applied. + /// /// This should only apply to HTLCs which were added to the holding cell because we were /// waiting on a monitor update to finish. In that case, we don't want to free the holding cell /// directly in `channel_monitor_updated` as it may introduce deadlocks calling back into user /// code to inform them of a channel monitor update. - fn check_free_holding_cells(&self) { + fn check_free_holding_cells(&self) -> bool { + let mut has_monitor_update = false; let mut failed_htlcs = Vec::new(); let mut handle_errors = Vec::new(); { @@ -3514,11 +3523,13 @@ impl ChannelMana by_id.retain(|channel_id, chan| { match chan.maybe_free_holding_cell_htlcs(&self.logger) { - Ok((None, ref htlcs)) if htlcs.is_empty() => true, Ok((commitment_opt, holding_cell_failed_htlcs)) => { - failed_htlcs.push((holding_cell_failed_htlcs, *channel_id)); + if !holding_cell_failed_htlcs.is_empty() { + failed_htlcs.push((holding_cell_failed_htlcs, *channel_id)); + } if let Some((commitment_update, monitor_update)) = commitment_opt { if let Err(e) = self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), monitor_update) { + has_monitor_update = true; let (res, close_channel) = handle_monitor_err!(self, e, short_to_id, chan, RAACommitmentOrder::CommitmentFirst, false, true, Vec::new(), Vec::new(), channel_id); handle_errors.push((chan.get_counterparty_node_id(), res)); if close_channel { return false; } @@ -3539,6 +3550,8 @@ impl ChannelMana } }); } + + let has_update = has_monitor_update || !failed_htlcs.is_empty(); for (failures, channel_id) in failed_htlcs.drain(..) { self.fail_holding_cell_htlcs(failures, channel_id); } @@ -3546,6 +3559,8 @@ impl ChannelMana for (counterparty_node_id, err) in handle_errors.drain(..) { let _ = handle_error!(self, err, counterparty_node_id); } + + has_update } /// Handle a list of channel failures during a block_connected or block_disconnected call, @@ -3670,6 +3685,14 @@ impl ChannelMana pub fn create_inbound_payment_for_hash(&self, payment_hash: PaymentHash, min_value_msat: Option, invoice_expiry_delta_secs: u32, user_payment_id: u64) -> Result { self.set_payment_hash_secret_map(payment_hash, None, min_value_msat, invoice_expiry_delta_secs, user_payment_id) } + + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] + pub fn get_and_clear_pending_events(&self) -> Vec { + let events = std::cell::RefCell::new(Vec::new()); + let event_handler = |event| events.borrow_mut().push(event); + self.process_pending_events(&event_handler); + events.into_inner() + } } impl MessageSendEventsProvider for ChannelManager @@ -3680,35 +3703,71 @@ impl MessageSend L::Target: Logger, { fn get_and_clear_pending_msg_events(&self) -> Vec { - //TODO: This behavior should be documented. It's non-intuitive that we query - // ChannelMonitors when clearing other events. - self.process_pending_monitor_events(); + let events = RefCell::new(Vec::new()); + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut result = NotifyOption::SkipPersist; - self.check_free_holding_cells(); + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } - let mut ret = Vec::new(); - let mut channel_state = self.channel_state.lock().unwrap(); - mem::swap(&mut ret, &mut channel_state.pending_msg_events); - ret + if self.check_free_holding_cells() { + result = NotifyOption::DoPersist; + } + + let mut pending_events = Vec::new(); + let mut channel_state = self.channel_state.lock().unwrap(); + mem::swap(&mut pending_events, &mut channel_state.pending_msg_events); + + if !pending_events.is_empty() { + events.replace(pending_events); + } + + result + }); + events.into_inner() } } impl EventsProvider for ChannelManager - where M::Target: chain::Watch, - T::Target: BroadcasterInterface, - K::Target: KeysInterface, - F::Target: FeeEstimator, - L::Target: Logger, +where + M::Target: chain::Watch, + T::Target: BroadcasterInterface, + K::Target: KeysInterface, + F::Target: FeeEstimator, + L::Target: Logger, { - fn get_and_clear_pending_events(&self) -> Vec { - //TODO: This behavior should be documented. It's non-intuitive that we query - // ChannelMonitors when clearing other events. - self.process_pending_monitor_events(); + /// Processes events that must be periodically handled. + /// + /// An [`EventHandler`] may safely call back to the provider in order to handle an event. + /// However, it must not call [`Writeable::write`] as doing so would result in a deadlock. + /// + /// Pending events are persisted as part of [`ChannelManager`]. While these events are cleared + /// when processed, an [`EventHandler`] must be able to handle previously seen events when + /// restarting from an old state. + fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let mut result = NotifyOption::SkipPersist; - let mut ret = Vec::new(); - let mut pending_events = self.pending_events.lock().unwrap(); - mem::swap(&mut ret, &mut *pending_events); - ret + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + + let mut pending_events = std::mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } + + for event in pending_events.drain(..) { + handler.handle_event(event); + } + + result + }); } } @@ -4956,7 +5015,7 @@ pub mod bench { use routing::router::get_route; use util::test_utils; use util::config::UserConfig; - use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; + use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c15005089e5..bf1a24d3685 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -23,7 +23,7 @@ use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler}; use util::enforcing_trait_impls::EnforcingSigner; use util::test_utils; use util::test_utils::TestChainMonitor; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::config::UserConfig; use util::ser::{ReadableArgs, Writeable, Readable}; diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 9f1769ff513..5fa93e18ae5 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -29,7 +29,7 @@ use ln::msgs; use ln::msgs::{ChannelMessageHandler,RoutingMessageHandler,HTLCFailChannelUpdate, ErrorAction}; use util::enforcing_trait_impls::EnforcingSigner; use util::{byte_utils, test_utils}; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::errors::APIError; use util::ser::{Writeable, ReadableArgs}; use util::config::UserConfig; diff --git a/lightning/src/ln/onion_route_tests.rs b/lightning/src/ln/onion_route_tests.rs index e3184fc65bb..bab78e7a1c7 100644 --- a/lightning/src/ln/onion_route_tests.rs +++ b/lightning/src/ln/onion_route_tests.rs @@ -20,7 +20,7 @@ use ln::features::{InitFeatures, InvoiceFeatures}; use ln::msgs; use ln::msgs::{ChannelMessageHandler, HTLCFailChannelUpdate, OptionalField}; use util::test_utils; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::ser::{Writeable, Writer}; use util::config::UserConfig; diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 6906e724d26..f81c42a36f7 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -15,7 +15,7 @@ use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs}; use ln::features::InitFeatures; use ln::msgs::{ChannelMessageHandler, ErrorAction, HTLCFailChannelUpdate}; use util::enforcing_trait_impls::EnforcingSigner; -use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider}; +use util::events::{Event, MessageSendEvent, MessageSendEventsProvider}; use util::test_utils; use util::ser::{ReadableArgs, Writeable}; diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index b7b8bf3137b..013e3c4e611 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -24,6 +24,7 @@ use bitcoin::blockdata::script::Script; use bitcoin::secp256k1::key::PublicKey; use core::time::Duration; +use std::ops::Deref; /// An Event which you should probably take some action in response to. /// @@ -376,9 +377,46 @@ pub trait MessageSendEventsProvider { fn get_and_clear_pending_msg_events(&self) -> Vec; } -/// A trait indicating an object may generate events +/// A trait indicating an object may generate events. +/// +/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`]. +/// +/// # Requirements +/// +/// See [`process_pending_events`] for requirements around event processing. +/// +/// When using this trait, [`process_pending_events`] will call [`handle_event`] for each pending +/// event since the last invocation. The handler must either act upon the event immediately +/// or preserve it for later handling. +/// +/// Note, handlers may call back into the provider and thus deadlocking must be avoided. Be sure to +/// consult the provider's documentation on the implication of processing events and how a handler +/// may safely use the provider (e.g., see [`ChannelManager::process_pending_events`] and +/// [`ChainMonitor::process_pending_events`]). +/// +/// [`process_pending_events`]: Self::process_pending_events +/// [`handle_event`]: EventHandler::handle_event +/// [`ChannelManager::process_pending_events`]: crate::ln::channelmanager::ChannelManager#method.process_pending_events +/// [`ChainMonitor::process_pending_events`]: crate::chain::chainmonitor::ChainMonitor#method.process_pending_events pub trait EventsProvider { - /// Gets the list of pending events which were generated by previous actions, clearing the list - /// in the process. - fn get_and_clear_pending_events(&self) -> Vec; + /// Processes any events generated since the last call using the given event handler. + /// + /// Subsequent calls must only process new events. However, handlers must be capable of handling + /// duplicate events across process restarts. This may occur if the provider was recovered from + /// an old state (i.e., it hadn't been successfully persisted after processing pending events). + fn process_pending_events(&self, handler: H) where H::Target: EventHandler; +} + +/// A trait implemented for objects handling events from [`EventsProvider`]. +pub trait EventHandler { + /// Handles the given [`Event`]. + /// + /// See [`EventsProvider`] for details that must be considered when implementing this method. + fn handle_event(&self, event: Event); +} + +impl EventHandler for F where F: Fn(Event) { + fn handle_event(&self, event: Event) { + self(event) + } }