Skip to content

Commit 36dbada

Browse files
committed
Add ChainMonitor::archive_fully_resolved_monitor_channels
Archives fully resolved channel monitors by adding them to a backup location and removing them from the primary storage & the monitor set. This is useful for pruning fully resolved monitors from the monitor set and primary storage so they are not reloaded on every new new block connection. We also add a new function, `archive_persisted_channel` to the `Persist` trait that writes the monitor to an archive storage and removes it from the primary storage.
1 parent a44f951 commit 36dbada

File tree

8 files changed

+162
-6
lines changed

8 files changed

+162
-6
lines changed

fuzz/src/chanmon_consistency.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
459459
let out = SearchingOutput::new(underlying_out);
460460
let broadcast = Arc::new(TestBroadcaster{});
461461
let router = FuzzRouter {};
462+
use std::collections::HashSet;
462463

463464
macro_rules! make_node {
464465
($node_id: expr, $fee_estimator: expr) => { {
@@ -467,7 +468,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
467468
let keys_manager = Arc::new(KeyProvider { node_secret, rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()) });
468469
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
469470
Arc::new(TestPersister {
470-
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
471+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
472+
archived_channels: Mutex::new(HashSet::new()),
471473
}), Arc::clone(&keys_manager)));
472474

473475
let mut config = UserConfig::default();
@@ -494,7 +496,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
494496
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
495497
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), $fee_estimator.clone(),
496498
Arc::new(TestPersister {
497-
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed)
499+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
500+
archived_channels: Mutex::new(HashSet::new()),
498501
}), Arc::clone(& $keys_manager)));
499502

500503
let mut config = UserConfig::default();

fuzz/src/full_stack.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,10 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
480480

481481
let broadcast = Arc::new(TestBroadcaster{ txn_broadcasted: Mutex::new(Vec::new()) });
482482
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(),
483-
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) })));
483+
Arc::new(TestPersister {
484+
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) ,
485+
archived_channels: Mutex::new(std::collections::HashSet::new()),
486+
})));
484487

485488
let keys_manager = Arc::new(KeyProvider {
486489
node_secret: our_network_key.clone(),

fuzz/src/utils/test_persister.rs

+7
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use lightning::chain::chainmonitor::MonitorUpdateId;
44
use lightning::chain::transaction::OutPoint;
55
use lightning::util::test_channel_signer::TestChannelSigner;
66

7+
use std::collections::HashSet;
78
use std::sync::Mutex;
89

910
pub struct TestPersister {
1011
pub update_ret: Mutex<chain::ChannelMonitorUpdateStatus>,
12+
pub archived_channels: Mutex<HashSet<OutPoint>>,
1113
}
1214
impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1315
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
@@ -17,4 +19,9 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1719
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
1820
self.update_ret.lock().unwrap().clone()
1921
}
22+
23+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
24+
self.archived_channels.lock().unwrap().insert(funding_txo);
25+
chain::ChannelMonitorUpdateStatus::Completed
26+
}
2027
}

lightning/src/chain/chainmonitor.rs

+33
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,17 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
194194
///
195195
/// [`Writeable::write`]: crate::util::ser::Writeable::write
196196
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
197+
/// Archive a channel's data to a backup location.
198+
///
199+
/// This function can be used to prune a stale channel's monitor. It is reccommended
200+
/// to move the data first to an archive location, and only then remove from the primary
201+
/// storage.
202+
///
203+
/// A stale channel is a channel that has been closed and settled on-chain, and no funds
204+
/// can be claimed with its data.
205+
///
206+
/// Archiving the data is useful for hedging against data loss in case of an unexpected failure/bug.
207+
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint) -> ChannelMonitorUpdateStatus;
197208
}
198209

199210
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
@@ -656,6 +667,28 @@ where C::Target: chain::Filter,
656667
}
657668
}
658669
}
670+
671+
/// Archives fully resolved channel monitors by adding them to a backup location and
672+
/// removing them from the primary storage & the monitor set.
673+
///
674+
/// This is useful for pruning fully resolved monitors from the monitor set and primary
675+
/// storage so they are not reloaded on every new new block connection.
676+
///
677+
/// The monitor data is archived to an archive namespace so we can still access it in case
678+
/// of an unexpected failure/bug.
679+
pub fn archive_fully_resolved_channel_monitors(&self, to_archive: Vec<OutPoint>) {
680+
let mut monitors = self.monitors.write().unwrap();
681+
for funding_txo in to_archive {
682+
let channel_monitor = monitors.get(&funding_txo);
683+
if let Some(channel_monitor) = channel_monitor {
684+
if channel_monitor.monitor.is_fully_resolved()
685+
&& self.persister.archive_persisted_channel(funding_txo) == ChannelMonitorUpdateStatus::Completed {
686+
monitors.remove(&funding_txo);
687+
};
688+
};
689+
}
690+
}
691+
659692
}
660693

661694
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/chain/channelmonitor.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1868,7 +1868,6 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
18681868
let is_all_funds_claimed = self.get_claimable_balances().is_empty();
18691869
let current_height = self.current_best_block().height;
18701870
let mut inner = self.inner.lock().unwrap();
1871-
18721871
match (inner.balances_empty_height, is_all_funds_claimed) {
18731872
(Some(h), true) => {
18741873
// Claimed all funds, Check if reached the threshold.
@@ -1878,7 +1877,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
18781877
(Some(_), false) => {
18791878
// previously assumed we claimed all funds, but we have new funds to claim.
18801879
// Should not happen in practice.
1881-
debug_assert!(!is_all_funds_claimed, "Trying to check if monitor is fully resolved before all funds are claimed. Aborting.");
1880+
debug_assert!(is_all_funds_claimed, "Trying to check if monitor is fully resolved before all funds are claimed.");
18821881
inner.balances_empty_height = None;
18831882
return false;
18841883
},

lightning/src/ln/monitor_tests.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {
171171
user_config.manually_accept_inbound_channels = true;
172172
}
173173
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
174-
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
174+
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
175175

176176
let (_, _, chan_id, funding_tx) =
177177
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);
@@ -262,6 +262,18 @@ fn do_chanmon_claim_value_coop_close(anchors: bool) {
262262

263263
assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
264264
assert!(nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
265+
266+
// Test we can archived fully resolved channel monitor.
267+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1); // one monitor
268+
// first archive should set balances_empty_height to current block height
269+
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(vec![funding_outpoint]);
270+
connect_blocks(&nodes[0], 2016);
271+
// Second call after 2016 blocks, should archive the monitor
272+
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors(vec![funding_outpoint]);
273+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0); // no monitor
274+
// cleanup
275+
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(funding_outpoint,
276+
funding_tx.txid(), funding_tx.output[0].script_pubkey.clone());
265277
}
266278

267279
#[test]

lightning/src/util/persist.rs

+67
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
5858
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
5959
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
6060

61+
/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
62+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
63+
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
64+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
65+
6166
/// The primary namespace under which the [`NetworkGraph`] will be persisted.
6267
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
6368
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
@@ -214,6 +219,40 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
214219
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
215220
}
216221
}
222+
223+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
224+
let monitor_name = MonitorName::from(funding_txo);
225+
let monitor = match self.read(
226+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
227+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
228+
monitor_name.as_str(),
229+
) {
230+
Ok(monitor) => monitor,
231+
Err(_) => {
232+
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
233+
}
234+
235+
};
236+
match self.write(
237+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
238+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
239+
monitor_name.as_str(),
240+
&monitor
241+
) {
242+
Ok(()) => {}
243+
Err(_e) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError
244+
};
245+
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
246+
match self.remove(
247+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
248+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
249+
&key,
250+
false,
251+
) {
252+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
253+
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
254+
}
255+
}
217256
}
218257

219258
/// Read previously persisted [`ChannelMonitor`]s from the store.
@@ -720,6 +759,34 @@ where
720759
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
721760
}
722761
}
762+
763+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
764+
let monitor_name = MonitorName::from(funding_txo);
765+
let monitor = match self.read_monitor(&monitor_name) {
766+
Ok((_block_hash, monitor)) => monitor,
767+
Err(_) => return chain::ChannelMonitorUpdateStatus::UnrecoverableError
768+
};
769+
match self.kv_store.write(
770+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
771+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
772+
monitor_name.as_str(),
773+
&monitor.encode()
774+
) {
775+
Ok(()) => {},
776+
Err(_e) => {
777+
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
778+
} // TODO: Should we return UnrecoverableError here
779+
};
780+
match self.kv_store.remove(
781+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
782+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
783+
monitor_name.as_str(),
784+
false,
785+
) {
786+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
787+
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
788+
}
789+
}
723790
}
724791

725792
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>

lightning/src/util/test_utils.rs

+32
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use crate::blinded_path::BlindedPath;
1111
use crate::blinded_path::payment::ReceiveTlvs;
1212
use crate::chain;
13+
use crate::chain::chainmonitor::Persist;
1314
use crate::chain::WatchedOutput;
1415
use crate::chain::chaininterface;
1516
use crate::chain::chaininterface::ConfirmationTarget;
@@ -501,6 +502,12 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
501502
}
502503
res
503504
}
505+
506+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
507+
let ret = <TestPersister as Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
508+
509+
ret
510+
}
504511
}
505512

506513
pub struct TestPersister {
@@ -513,20 +520,27 @@ pub struct TestPersister {
513520
/// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the
514521
/// MonitorUpdateId here.
515522
pub offchain_monitor_updates: Mutex<HashMap<OutPoint, HashSet<MonitorUpdateId>>>,
523+
/// When we get an archive_persisted_channel call, we insert the OutPoint here.
524+
pub archived_channels: Mutex<HashSet<OutPoint>>,
516525
}
517526
impl TestPersister {
518527
pub fn new() -> Self {
519528
Self {
520529
update_rets: Mutex::new(VecDeque::new()),
521530
chain_sync_monitor_persistences: Mutex::new(new_hash_map()),
522531
offchain_monitor_updates: Mutex::new(new_hash_map()),
532+
archived_channels: Mutex::new(new_hash_set()),
523533
}
524534
}
525535

526536
/// Queue an update status to return.
527537
pub fn set_update_ret(&self, next_ret: chain::ChannelMonitorUpdateStatus) {
528538
self.update_rets.lock().unwrap().push_back(next_ret);
529539
}
540+
// Check if the given OutPoint has been archived.
541+
pub fn is_archived(&self, funding_txo: OutPoint) -> bool {
542+
self.archived_channels.lock().unwrap().contains(&funding_txo)
543+
}
530544
}
531545
impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Signer> for TestPersister {
532546
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<Signer>, _id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
@@ -549,6 +563,20 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
549563
}
550564
ret
551565
}
566+
567+
fn archive_persisted_channel(&self, funding_txo: OutPoint) -> chain::ChannelMonitorUpdateStatus {
568+
self.archived_channels.lock().unwrap().insert(funding_txo);
569+
// remove the channel from the offchain_monitor_updates map
570+
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
571+
Some(_) => {},
572+
None => {
573+
// If the channel was not in the offchain_monitor_updates map, it should be in the
574+
// chain_sync_monitor_persistences map.
575+
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
576+
}
577+
};
578+
chain::ChannelMonitorUpdateStatus::Completed
579+
}
552580
}
553581

554582
pub struct TestStore {
@@ -1360,6 +1388,10 @@ impl TestChainSource {
13601388
watched_outputs: Mutex::new(new_hash_set()),
13611389
}
13621390
}
1391+
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, txid: Txid, script_pubkey: ScriptBuf) {
1392+
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
1393+
self.watched_txn.lock().unwrap().remove(&(txid, script_pubkey));
1394+
}
13631395
}
13641396

13651397
impl UtxoLookup for TestChainSource {

0 commit comments

Comments
 (0)