Skip to content

Commit 1670e90

Browse files
committed
Add NetworkGraph persistence
Instead of creating a separate trait for persisting NetworkGraph, use and rename the existing ChannelManagerPersister to handle them both. persist_graph is then called on removal of stale channels and on exit.
1 parent c244c78 commit 1670e90

File tree

2 files changed

+151
-29
lines changed
  • lightning-background-processor/src
  • lightning-persister/src

2 files changed

+151
-29
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 137 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,16 @@ const PING_TIMER: u64 = 1;
7575
/// Prune the network graph of stale entries hourly.
7676
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
7777

78-
/// Trait which handles persisting a [`ChannelManager`] to disk.
78+
#[cfg(not(test))]
79+
const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
80+
#[cfg(test)]
81+
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
82+
83+
/// Trait which handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
7984
///
8085
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
81-
pub trait ChannelManagerPersister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
86+
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
87+
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
8288
where
8389
M::Target: 'static + chain::Watch<Signer>,
8490
T::Target: 'static + BroadcasterInterface,
@@ -87,24 +93,15 @@ where
8793
L::Target: 'static + Logger,
8894
{
8995
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
90-
/// (which will cause the [`BackgroundProcessor`] which called this method to exit.
96+
/// (which will cause the [`BackgroundProcessor`] which called this method to exit.)
9197
///
9298
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
9399
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;
94-
}
95100

96-
impl<Fun, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
97-
ChannelManagerPersister<Signer, M, T, K, F, L> for Fun where
98-
M::Target: 'static + chain::Watch<Signer>,
99-
T::Target: 'static + BroadcasterInterface,
100-
K::Target: 'static + KeysInterface<Signer = Signer>,
101-
F::Target: 'static + FeeEstimator,
102-
L::Target: 'static + Logger,
103-
Fun: Fn(&ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>,
104-
{
105-
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
106-
self(channel_manager)
107-
}
101+
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
102+
///
103+
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
104+
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
108105
}
109106

110107
/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
@@ -151,7 +148,12 @@ impl BackgroundProcessor {
151148
/// [`ChannelManager`]. See [`FilesystemPersister::persist_manager`] for Rust-Lightning's
152149
/// provided implementation.
153150
///
154-
/// Typically, users should either implement [`ChannelManagerPersister`] to never return an
151+
/// `persist_graph` is responsible for writing out the [`NetworkGraph`] to disk. See
152+
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See
153+
/// [`FilesystemPersister::persist_network_graph`] for Rust-Lightning's
154+
/// provided implementation.
155+
///
156+
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
155157
/// error or call [`join`] and handle any error that may arise. For the latter case,
156158
/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
157159
///
@@ -168,7 +170,9 @@ impl BackgroundProcessor {
168170
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
169171
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
170172
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
173+
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
171174
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
175+
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
172176
pub fn start<
173177
Signer: 'static + Sign,
174178
CA: 'static + Deref + Send + Sync,
@@ -184,14 +188,14 @@ impl BackgroundProcessor {
184188
CMH: 'static + Deref + Send + Sync,
185189
RMH: 'static + Deref + Send + Sync,
186190
EH: 'static + EventHandler + Send,
187-
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
191+
PS: 'static + Send + Persister<Signer, CW, T, K, F, L>,
188192
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
189193
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
190194
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
191195
UMH: 'static + Deref + Send + Sync,
192196
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
193197
>(
194-
persister: CMP, event_handler: EH, chain_monitor: M, channel_manager: CM,
198+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
195199
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
196200
) -> Self
197201
where
@@ -273,19 +277,29 @@ impl BackgroundProcessor {
273277
// falling back to our usual hourly prunes. This avoids short-lived clients never
274278
// pruning their network graph. We run once 60 seconds after startup before
275279
// continuing our normal cadence.
276-
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { 60 } {
280+
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
277281
if let Some(ref handler) = net_graph_msg_handler {
278282
log_trace!(logger, "Pruning network graph of stale entries");
279283
handler.network_graph().remove_stale_channels();
284+
if persister.persist_graph(handler.network_graph()).is_err() {
285+
log_error!(logger, "Warning: Failed to persist network graph, check your disk and permissions");
286+
}
280287
last_prune_call = Instant::now();
281288
have_pruned = true;
282289
}
283290
}
284291
}
292+
285293
// After we exit, ensure we persist the ChannelManager one final time - this avoids
286294
// some races where users quit while channel updates were in-flight, with
287295
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
288-
persister.persist_manager(&*channel_manager)
296+
persister.persist_manager(&*channel_manager)?;
297+
298+
// Persist NetworkGraph on exit
299+
if let Some(ref handler) = net_graph_msg_handler {
300+
persister.persist_graph(handler.network_graph())?;
301+
}
302+
Ok(())
289303
});
290304
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
291305
}
@@ -343,9 +357,10 @@ mod tests {
343357
use bitcoin::blockdata::constants::genesis_block;
344358
use bitcoin::blockdata::transaction::{Transaction, TxOut};
345359
use bitcoin::network::constants::Network;
346-
use lightning::chain::{BestBlock, Confirm, chainmonitor};
360+
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
361+
use lightning::chain::{BestBlock, Confirm, chainmonitor, self};
347362
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
348-
use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
363+
use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager, Sign};
349364
use lightning::chain::transaction::OutPoint;
350365
use lightning::get_event_msg;
351366
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
@@ -355,12 +370,14 @@ mod tests {
355370
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
356371
use lightning::util::config::UserConfig;
357372
use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
373+
use lightning::util::logger::Logger;
358374
use lightning::util::ser::Writeable;
359375
use lightning::util::test_utils;
360376
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
361377
use lightning_invoice::utils::DefaultRouter;
362378
use lightning_persister::FilesystemPersister;
363379
use std::fs;
380+
use std::ops::Deref;
364381
use std::path::PathBuf;
365382
use std::sync::{Arc, Mutex};
366383
use std::time::Duration;
@@ -402,6 +419,27 @@ mod tests {
402419
}
403420
}
404421

422+
#[derive(Clone)]
423+
struct Persister {
424+
data_dir: String,
425+
}
426+
427+
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
428+
M::Target: 'static + chain::Watch<Signer>,
429+
T::Target: 'static + BroadcasterInterface,
430+
K::Target: 'static + KeysInterface<Signer = Signer>,
431+
F::Target: 'static + FeeEstimator,
432+
L::Target: 'static + Logger,
433+
{
434+
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
435+
FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
436+
}
437+
438+
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
439+
FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
440+
}
441+
}
442+
405443
fn get_full_filepath(filepath: String, filename: String) -> String {
406444
let mut path = PathBuf::from(filepath);
407445
path.push(filename);
@@ -525,7 +563,7 @@ mod tests {
525563

526564
// Initiate the background processors to watch each node.
527565
let data_dir = nodes[0].persister.get_data_dir();
528-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
566+
let persister = Persister { data_dir };
529567
let event_handler = |_: &_| {};
530568
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
531569

@@ -556,6 +594,7 @@ mod tests {
556594
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
557595
let mut expected_bytes = Vec::new();
558596
check_persisted_data!(nodes[0].node, filepath.clone(), expected_bytes);
597+
559598
loop {
560599
if !nodes[0].node.get_persistence_condvar_value() { break }
561600
}
@@ -570,6 +609,14 @@ mod tests {
570609
if !nodes[0].node.get_persistence_condvar_value() { break }
571610
}
572611

612+
// Check network graph is persisted
613+
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
614+
let mut expected_bytes = Vec::new();
615+
if let Some(handler) = nodes[0].net_graph_msg_handler.clone() {
616+
let network_graph = handler.network_graph();
617+
check_persisted_data!(network_graph, filepath.clone(), expected_bytes);
618+
}
619+
573620
assert!(bg_processor.stop().is_ok());
574621
}
575622

@@ -579,7 +626,7 @@ mod tests {
579626
// `FRESHNESS_TIMER`.
580627
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
581628
let data_dir = nodes[0].persister.get_data_dir();
582-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
629+
let persister = Persister { data_dir };
583630
let event_handler = |_: &_| {};
584631
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
585632
loop {
@@ -596,12 +643,33 @@ mod tests {
596643
}
597644

598645
#[test]
599-
fn test_persist_error() {
646+
fn test_channel_manager_persist_error() {
600647
// Test that if we encounter an error during manager persistence, the thread panics.
601648
let nodes = create_nodes(2, "test_persist_error".to_string());
602649
open_channel!(nodes[0], nodes[1], 100000);
603650

604-
let persister = |_: &_| Err(std::io::Error::new(std::io::ErrorKind::Other, "test"));
651+
struct ChannelManagerErrorPersister {
652+
data_dir: String,
653+
}
654+
655+
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for ChannelManagerErrorPersister where
656+
M::Target: 'static + chain::Watch<Signer>,
657+
T::Target: 'static + BroadcasterInterface,
658+
K::Target: 'static + KeysInterface<Signer = Signer>,
659+
F::Target: 'static + FeeEstimator,
660+
L::Target: 'static + Logger,
661+
{
662+
fn persist_manager(&self, _channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
663+
Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
664+
}
665+
666+
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
667+
FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph)
668+
}
669+
}
670+
671+
let data_dir = nodes[0].persister.get_data_dir();
672+
let persister = ChannelManagerErrorPersister{ data_dir };
605673
let event_handler = |_: &_| {};
606674
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
607675
match bg_processor.join() {
@@ -613,12 +681,52 @@ mod tests {
613681
}
614682
}
615683

684+
#[test]
685+
fn test_network_graph_persist_error() {
686+
// Test that if we encounter an error during network graph persistence, an error gets returned.
687+
let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
688+
open_channel!(nodes[0], nodes[1], 100000);
689+
690+
struct NetworkGraphErrorPersister {
691+
data_dir: String,
692+
}
693+
694+
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for NetworkGraphErrorPersister where
695+
M::Target: 'static + chain::Watch<Signer>,
696+
T::Target: 'static + BroadcasterInterface,
697+
K::Target: 'static + KeysInterface<Signer = Signer>,
698+
F::Target: 'static + FeeEstimator,
699+
L::Target: 'static + Logger,
700+
{
701+
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
702+
FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
703+
}
704+
705+
fn persist_graph(&self, _network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
706+
Err(std::io::Error::new(std::io::ErrorKind::Other, "test"))
707+
}
708+
}
709+
710+
let data_dir = nodes[0].persister.get_data_dir();
711+
let persister = NetworkGraphErrorPersister { data_dir };
712+
let event_handler = |_: &_| {};
713+
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
714+
715+
match bg_processor.stop() {
716+
Ok(_) => panic!("Expected error persisting network graph"),
717+
Err(e) => {
718+
assert_eq!(e.kind(), std::io::ErrorKind::Other);
719+
assert_eq!(e.get_ref().unwrap().to_string(), "test");
720+
},
721+
}
722+
}
723+
616724
#[test]
617725
fn test_background_event_handling() {
618726
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
619727
let channel_value = 100000;
620728
let data_dir = nodes[0].persister.get_data_dir();
621-
let persister = move |node: &_| FilesystemPersister::persist_manager(data_dir.clone(), node);
729+
let persister = Persister { data_dir };
622730

623731
// Set up a background event handler for FundingGenerationReady events.
624732
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
@@ -675,7 +783,7 @@ mod tests {
675783

676784
// Initiate the background processors to watch each node.
677785
let data_dir = nodes[0].persister.get_data_dir();
678-
let persister = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
786+
let persister = Persister { data_dir };
679787
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
680788
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
681789
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));

lightning-persister/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ extern crate libc;
1616

1717
use bitcoin::hash_types::{BlockHash, Txid};
1818
use bitcoin::hashes::hex::{FromHex, ToHex};
19+
use lightning::routing::network_graph::NetworkGraph;
1920
use crate::util::DiskWriteable;
2021
use lightning::chain;
2122
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
@@ -66,6 +67,12 @@ where
6667
}
6768
}
6869

70+
impl DiskWriteable for NetworkGraph {
71+
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
72+
self.write(writer)
73+
}
74+
}
75+
6976
impl FilesystemPersister {
7077
/// Initialize a new FilesystemPersister and set the path to the individual channels'
7178
/// files.
@@ -103,6 +110,13 @@ impl FilesystemPersister {
103110
util::write_to_file(path, "manager".to_string(), manager)
104111
}
105112

113+
/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
114+
/// initialization, within a file called "network_graph"
115+
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
116+
let path = PathBuf::from(data_dir);
117+
util::write_to_file(path, "network_graph".to_string(), network_graph)
118+
}
119+
106120
/// Read `ChannelMonitor`s from disk.
107121
pub fn read_channelmonitors<Signer: Sign, K: Deref> (
108122
&self, keys_manager: K

0 commit comments

Comments
 (0)