Skip to content

Use AChannelManager in BackgroundProcessor #2963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 72 additions & 71 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,17 @@ extern crate lightning_rapid_gossip_sync;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
use lightning::events::{Event, PathFailure};
#[cfg(feature = "std")]
use lightning::events::EventHandler;
#[cfg(any(feature = "std", feature = "futures"))]
use lightning::events::EventsProvider;

use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::channelmanager::AChannelManager;
use lightning::ln::msgs::OnionMessageHandler;
use lightning::ln::peer_handler::APeerManager;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::Router;
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
Expand Down Expand Up @@ -81,6 +79,8 @@ use alloc::vec::Vec;
/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
/// unilateral chain closure fees are at risk.
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::events::Event
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
Expand Down Expand Up @@ -286,7 +286,7 @@ macro_rules! define_run_body {
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
) => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
$channel_manager.get_cm().timer_tick_occurred();
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
$chain_monitor.rebroadcast_pending_claims();

Expand Down Expand Up @@ -336,14 +336,14 @@ macro_rules! define_run_body {
break;
}

if $channel_manager.get_and_clear_needs_persistence() {
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
$persister.persist_manager(&$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
}
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
$channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
Expand Down Expand Up @@ -440,7 +440,7 @@ macro_rules! define_run_body {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
$persister.persist_manager(&*$channel_manager)?;
$persister.persist_manager(&$channel_manager)?;

// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
Expand Down Expand Up @@ -539,45 +539,64 @@ use core::task;
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::time::SystemTime;
/// # use lightning_background_processor::{process_events_async, GossipSync};
/// # struct MyStore {}
/// # impl lightning::util::persist::KVStore for MyStore {
/// # struct Logger {}
/// # impl lightning::util::logger::Logger for Logger {
/// # fn log(&self, _record: lightning::util::logger::Record) {}
/// # }
/// # struct Store {}
/// # impl lightning::util::persist::KVStore for Store {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
/// # struct MyEventHandler {}
/// # impl MyEventHandler {
/// # struct EventHandler {}
/// # impl EventHandler {
/// # async fn handle_event(&self, _: lightning::events::Event) {}
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct MySocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
/// # struct SocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
///
/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
/// let background_persister = Arc::clone(&my_persister);
/// let background_event_handler = Arc::clone(&my_event_handler);
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
/// let background_chan_man = Arc::clone(&my_channel_manager);
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
/// let background_peer_man = Arc::clone(&my_peer_manager);
/// let background_logger = Arc::clone(&my_logger);
/// let background_scorer = Arc::clone(&my_scorer);
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
/// #
/// # struct Node<
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
/// # F: lightning::chain::Filter + Send + Sync + 'static,
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
/// # > {
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
/// # event_handler: Arc<EventHandler>,
/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
/// # persister: Arc<Store>,
/// # logger: Arc<Logger>,
/// # scorer: Arc<Scorer>,
/// # }
/// #
/// # async fn setup_background_processing<
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
/// # F: lightning::chain::Filter + Send + Sync + 'static,
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
/// # >(node: Node<B, F, FE, UL>) {
/// let background_persister = Arc::clone(&node.persister);
/// let background_event_handler = Arc::clone(&node.event_handler);
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
/// let background_chan_man = Arc::clone(&node.channel_manager);
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
/// let background_peer_man = Arc::clone(&node.peer_manager);
/// let background_logger = Arc::clone(&node.logger);
/// let background_scorer = Arc::clone(&node.scorer);
///
/// // Setup the sleeper.
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
Expand Down Expand Up @@ -607,9 +626,9 @@ use core::task;
/// sleeper,
/// mobile_interruptable_platform,
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
/// )
/// .await
/// .expect("Failed to process events");
/// )
/// .await
/// .expect("Failed to process events");
/// });
///
/// // Stop the background processing.
Expand All @@ -622,21 +641,16 @@ pub async fn process_events_async<
'a,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
ES: 'static + Deref + Send + Sync,
NS: 'static + Deref + Send + Sync,
SP: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
R: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
PM: 'static + Deref + Send + Sync,
Expand All @@ -653,16 +667,12 @@ pub async fn process_events_async<
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
T::Target: 'static + BroadcasterInterface,
ES::Target: 'static + EntropySource,
NS::Target: 'static + NodeSigner,
SP::Target: 'static + SignerProvider,
F::Target: 'static + FeeEstimator,
R::Target: 'static + Router,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
PS::Target: 'static + Persister<'a, CM, L, SC>,
CM::Target: AChannelManager + Send + Sync,
PM::Target: APeerManager + Send + Sync,
{
let mut should_break = false;
Expand Down Expand Up @@ -693,11 +703,11 @@ where
define_run_body!(
persister, chain_monitor,
chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
gossip_sync, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_event_or_persistence_needed_future(),
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
Expand Down Expand Up @@ -788,20 +798,15 @@ impl BackgroundProcessor {
'a,
UL: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
ES: 'static + Deref + Send + Sync,
NS: 'static + Deref + Send + Sync,
SP: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
R: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
PM: 'static + Deref + Send + Sync,
Expand All @@ -814,16 +819,12 @@ impl BackgroundProcessor {
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
T::Target: 'static + BroadcasterInterface,
ES::Target: 'static + EntropySource,
NS::Target: 'static + NodeSigner,
SP::Target: 'static + SignerProvider,
F::Target: 'static + FeeEstimator,
R::Target: 'static + Router,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
PS::Target: 'static + Persister<'a, CM, L, SC>,
CM::Target: AChannelManager + Send + Sync,
PM::Target: APeerManager + Send + Sync,
{
let stop_thread = Arc::new(AtomicBool::new(false));
Expand All @@ -849,12 +850,12 @@ impl BackgroundProcessor {
};
define_run_body!(
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
peer_manager,
peer_manager.onion_message_handler().process_pending_events(&event_handler),
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
{ Sleeper::from_two_futures(
&channel_manager.get_event_or_persistence_needed_future(),
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)); },
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
Expand Down
Loading
Loading