From cb86399f2e61afb21e935d4084bc7fc0a6bf5afa Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Thu, 7 Mar 2024 11:00:41 -0800 Subject: [PATCH 1/5] Don't pause events for chainsync persistence We used to wait on ChannelMonitor persistence to avoid duplicate payment events. But this can still happen in cases where ChannelMonitor handed the event to ChannelManager and we did not persist ChannelManager after event handling. It is expected to receive payment duplicate events and clients should handle these events in an idempotent manner. Removing this hold-up of events simplifies the logic and makes it easier to not persist ChannelMonitors on every block connect. --- lightning/src/chain/chainmonitor.rs | 118 ++++------------------------ lightning/src/ln/payment_tests.rs | 49 +++++------- 2 files changed, 32 insertions(+), 135 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 81ba30ffb83..278d882dd99 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash}; use crate::chain; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS}; +use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::ChannelId; use crate::sign::ecdsa::WriteableEcdsaChannelSigner; @@ -209,17 +209,6 @@ struct MonitorHolder { /// update_persisted_channel, the user returns a /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated /// immediately, racing our insertion of the pending update into the contained Vec. - /// - /// Beyond the synchronization of updates themselves, we cannot handle user events until after - /// any chain updates have been stored on disk. Thus, we scan this list when returning updates - /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still - /// being persisted fully to disk after a chain update. - /// - /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor - /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping - /// the pending payment entry, and then reloading before the monitor is persisted, resulting in - /// the ChannelManager re-adding the same payment entry, before the same block is replayed, - /// resulting in a duplicate PaymentSent event. pending_monitor_updates: Mutex>, /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present /// in `pending_monitor_updates`. @@ -227,6 +216,8 @@ struct MonitorHolder { /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel /// forever, risking funds loss. + /// + /// [`LATENCY_GRACE_PERIOD_BLOCKS`]: crate::util::ser::Writeable::write last_chain_persist_height: AtomicUsize, } @@ -393,7 +384,7 @@ where C::Target: chain::Filter, chain_sync_update_id ), ChannelMonitorUpdateStatus::InProgress => { - log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor)); + log_debug!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor)); pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::UnrecoverableError => { @@ -924,21 +915,12 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec, Option)> { let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { - let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor); - let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) { - if is_pending_monitor_update { - log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); - log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released."); - log_error!(logger, " This may cause duplicate payment events to be generated."); - } - let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); - if monitor_events.len() > 0 { - let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; - let monitor_channel_id = monitor_state.monitor.channel_id(); - let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); - pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id)); - } + let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); + if monitor_events.len() > 0 { + let monitor_outpoint = monitor_state.monitor.get_funding_txo().0; + let monitor_channel_id = monitor_state.monitor.channel_id(); + let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id(); + pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id)); } } pending_monitor_events @@ -975,15 +957,12 @@ impl = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap() - .get_mut(&funding_txo).unwrap().drain().collect(); - // If we are using chain::Confirm instead of chain::Listen, we will get the same update twice. - // If we're testing connection idempotency we may get substantially more. - assert!(mon_updates.len() >= 1); - assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty()); - assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + // Note that we skip persisting ChannelMonitors. We should still be generating the payment sent + // event without ChannelMonitor persistence. If we reset to a previous state on reload, the block + // should be replayed and we'll regenerate the event. // If we persist the ChannelManager here, we should get the PaymentSent event after // deserialization. @@ -1128,13 +1121,7 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co chan_manager_serialized = nodes[0].node.encode(); } - // Now persist the ChannelMonitor and inform the ChainMonitor that we're done, generating the - // payment sent event. - chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode(); - for update in mon_updates { - nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, update).unwrap(); - } if payment_timeout { expect_payment_failed!(nodes[0], payment_hash, false); } else { @@ -1168,13 +1155,13 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co } #[test] -fn test_dup_htlc_onchain_fails_on_reload() { - do_test_dup_htlc_onchain_fails_on_reload(true, true, true); - do_test_dup_htlc_onchain_fails_on_reload(true, true, false); - do_test_dup_htlc_onchain_fails_on_reload(true, false, false); - do_test_dup_htlc_onchain_fails_on_reload(false, true, true); - do_test_dup_htlc_onchain_fails_on_reload(false, true, false); - do_test_dup_htlc_onchain_fails_on_reload(false, false, false); +fn test_dup_htlc_onchain_doesnt_fail_on_reload() { + do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true); + do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false); + do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false); + do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, true, true); + do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, true, false); + do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false); } #[test] From e084ab26e14acb33f162dda91ed427dc40e0a4a1 Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Wed, 27 Mar 2024 12:18:31 +0100 Subject: [PATCH 2/5] Stop storing last_chain_persist_height We only used to store last_chain_persist_height to release events held for more than LATENCY_GRACE_PERIOD_BLOCKS due to pending monitor update with UpdateOrigin::ChainSync. Since we no longer pause events for ChainSync persistence, we no longer need to store last_chain_persist_height. --- lightning/src/chain/chainmonitor.rs | 48 +++-------------------------- 1 file changed, 5 insertions(+), 43 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 278d882dd99..1421bd5f8e2 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -210,15 +210,6 @@ struct MonitorHolder { /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated /// immediately, racing our insertion of the pending update into the contained Vec. pending_monitor_updates: Mutex>, - /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present - /// in `pending_monitor_updates`. - /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain - /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up - /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel - /// forever, risking funds loss. - /// - /// [`LATENCY_GRACE_PERIOD_BLOCKS`]: crate::util::ser::Writeable::write - last_chain_persist_height: AtomicUsize, } impl MonitorHolder { @@ -226,10 +217,6 @@ impl MonitorHolder { pending_monitor_updates_lock.iter().any(|update_id| if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) } - fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { - pending_monitor_updates_lock.iter().any(|update_id| - if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false }) - } } /// A read-only reference to a current ChannelMonitor. @@ -317,7 +304,7 @@ where C::Target: chain::Filter, for funding_outpoint in funding_outpoints.iter() { let monitor_lock = self.monitors.read().unwrap(); if let Some(monitor_state) = monitor_lock.get(funding_outpoint) { - if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() { // Take the monitors lock for writing so that we poison it and any future // operations going forward fail immediately. core::mem::drop(monitor_lock); @@ -332,7 +319,7 @@ where C::Target: chain::Filter, let monitor_states = self.monitors.write().unwrap(); for (funding_outpoint, monitor_state) in monitor_states.iter() { if !funding_outpoints.contains(funding_outpoint) { - if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() { + if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() { log_error!(self.logger, "{}", err_str); panic!("{}", err_str); } @@ -351,8 +338,8 @@ where C::Target: chain::Filter, } fn update_monitor_with_chain_data( - &self, header: &Header, best_height: Option, txdata: &TransactionData, - process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder + &self, header: &Header, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, + monitor_state: &MonitorHolder ) -> Result<(), ()> where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let monitor = &monitor_state.monitor; let logger = WithChannelMonitor::from(&self.logger, &monitor); @@ -364,14 +351,6 @@ where C::Target: chain::Filter, contents: UpdateOrigin::ChainSync(chain_sync_update_id), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); - if let Some(height) = best_height { - if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { - // If there are not ChainSync persists awaiting completion, go ahead and - // set last_chain_persist_height here - we wouldn't want the first - // InProgress to always immediately be considered "overly delayed". - monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); - } - } log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}", log_funding_info!(monitor), @@ -562,23 +541,7 @@ where C::Target: chain::Filter, monitor_update_id: monitor_data.monitor.get_latest_update_id(), }], monitor_data.monitor.get_counterparty_node_id())); }, - MonitorUpdateId { contents: UpdateOrigin::ChainSync(completed_update_id) } => { - let monitor_has_pending_updates = - monitor_data.has_pending_chainsync_updates(&pending_monitor_updates); - log_debug!(self.logger, "Completed chain sync monitor update {} for channel with funding outpoint {:?}, {}", - completed_update_id, - funding_txo, - if monitor_has_pending_updates { - "still have pending chain sync updates" - } else { - "all chain sync updates complete, releasing pending MonitorEvents" - }); - if !monitor_has_pending_updates { - monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); - // The next time release_pending_monitor_events is called, any events for this - // ChannelMonitor will be returned. - } - }, + MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {}, } self.event_notifier.notify(); Ok(()) @@ -833,7 +796,6 @@ where C::Target: chain::Filter, entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), - last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); Ok(persist_res) } From 7de602a38ad68a5c75db641ee96f5a0c2693f1f9 Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Wed, 27 Mar 2024 16:16:14 +0100 Subject: [PATCH 3/5] Stop tracking MonitorUpdates from ChainSync in pending_monitor_updates We no longer need to track them since we no longer hold events for pending MonitorUpdates resulting from ChainSync. --- lightning-persister/src/fs_store.rs | 2 +- lightning/src/chain/chainmonitor.rs | 2 -- lightning/src/util/test_utils.rs | 18 +++++++++--------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 364a3ee706f..014dde3e037 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -500,7 +500,7 @@ mod tests { txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { + match store.persist_new_channel(test_txo, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => {}, _ => panic!("unexpected result from persisting new channel") } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 1421bd5f8e2..fb2bfc3fdc8 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -350,7 +350,6 @@ where C::Target: chain::Filter, let update_id = MonitorUpdateId { contents: UpdateOrigin::ChainSync(chain_sync_update_id), }; - let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}", log_funding_info!(monitor), @@ -364,7 +363,6 @@ where C::Target: chain::Filter, ), ChannelMonitorUpdateStatus::InProgress => { log_debug!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor)); - pending_monitor_updates.push(update_id); }, ChannelMonitorUpdateStatus::UnrecoverableError => { return Err(()); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 95bc2a7c661..b9672dc405f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -16,7 +16,7 @@ use crate::chain::chaininterface::ConfirmationTarget; #[cfg(test)] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chainmonitor; -use crate::chain::chainmonitor::{MonitorUpdateId, UpdateOrigin}; +use crate::chain::chainmonitor::{MonitorUpdateId}; use crate::chain::channelmonitor; use crate::chain::channelmonitor::MonitorEvent; use crate::chain::transaction::OutPoint; @@ -516,7 +516,7 @@ pub struct TestPersister { pub update_rets: Mutex>, /// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the /// MonitorUpdateId here. - pub chain_sync_monitor_persistences: Mutex>>, + pub chain_sync_monitor_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the /// MonitorUpdateId here. pub offchain_monitor_updates: Mutex>>, @@ -525,7 +525,7 @@ impl TestPersister { pub fn new() -> Self { Self { update_rets: Mutex::new(VecDeque::new()), - chain_sync_monitor_persistences: Mutex::new(new_hash_map()), + chain_sync_monitor_persistences: Mutex::new(VecDeque::new()), offchain_monitor_updates: Mutex::new(new_hash_map()), } } @@ -543,16 +543,16 @@ impl chainmonitor::Persist, _data: &channelmonitor::ChannelMonitor, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn update_persisted_channel(&self, funding_txo: OutPoint, update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { let mut ret = chain::ChannelMonitorUpdateStatus::Completed; if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { ret = update_ret; } - let is_chain_sync = if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false }; - if is_chain_sync { - self.chain_sync_monitor_persistences.lock().unwrap().entry(funding_txo).or_insert(new_hash_set()).insert(update_id); - } else { + + if update.is_some() { self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(new_hash_set()).insert(update_id); + } else { + self.chain_sync_monitor_persistences.lock().unwrap().push_back(funding_txo); } ret } @@ -564,7 +564,7 @@ impl chainmonitor::Persist { // If the channel was not in the offchain_monitor_updates map, it should be in the // chain_sync_monitor_persistences map. - assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some()); + self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &funding_txo); } }; } From c1125f00b392432fc29fae8555183b46220bba26 Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Thu, 11 Apr 2024 23:31:30 -0700 Subject: [PATCH 4/5] Remove MonitorUpdateId from persist trait MonitorUpdateId was an opaque abstraction for id's generated by UpdateOrigin:Offchain and UpdateOrigin::ChainSync monitor updates. It was mainly needed to map calls made to ChainMonitor::channel_monitor_updated. We no longer track UpdateOrigin::ChainSync MonitorUpdates and can directly use ChannelMonitor::get_latest_update_id() for tracking UpdateOrigin::Offchain monitor updates. --- fuzz/src/utils/test_persister.rs | 5 +- lightning-persister/src/fs_store.rs | 4 +- lightning/src/chain/chainmonitor.rs | 174 ++++++++++------------------ lightning/src/util/persist.rs | 23 ++-- lightning/src/util/test_utils.rs | 31 ++--- 5 files changed, 91 insertions(+), 146 deletions(-) diff --git a/fuzz/src/utils/test_persister.rs b/fuzz/src/utils/test_persister.rs index f9a03d16178..a99c397d0b2 100644 --- a/fuzz/src/utils/test_persister.rs +++ b/fuzz/src/utils/test_persister.rs @@ -1,6 +1,5 @@ use lightning::chain; use lightning::chain::{chainmonitor, channelmonitor}; -use lightning::chain::chainmonitor::MonitorUpdateId; use lightning::chain::transaction::OutPoint; use lightning::util::test_channel_signer::TestChannelSigner; @@ -10,11 +9,11 @@ pub struct TestPersister { pub update_ret: Mutex, } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { self.update_ret.lock().unwrap().clone() } - fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { self.update_ret.lock().unwrap().clone() } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 014dde3e037..8a144f6196b 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -448,8 +448,6 @@ mod tests { nodes[1].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[0].node.get_our_node_id()).unwrap(); check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed, [nodes[0].node.get_our_node_id()], 100000); let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); - let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); - let update_id = update_map.get(&added_monitors[0].1.channel_id()).unwrap(); // Set the store's directory to read-only, which should result in // returning an unrecoverable failure when we then attempt to persist a @@ -463,7 +461,7 @@ mod tests { txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; - match store.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { + match store.persist_new_channel(test_txo, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => {}, _ => panic!("unexpected result from persisting new channel") } diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index fb2bfc3fdc8..06052dc84de 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -47,46 +47,6 @@ use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; use bitcoin::secp256k1::PublicKey; -mod update_origin { - #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] - /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents - /// entirely opaque. - pub(crate) enum UpdateOrigin { - /// An update that was generated by the `ChannelManager` (via our [`crate::chain::Watch`] - /// implementation). This corresponds to an actual [ChannelMonitorUpdate::update_id] field - /// and [ChannelMonitor::get_latest_update_id]. - /// - /// [ChannelMonitor::get_latest_update_id]: crate::chain::channelmonitor::ChannelMonitor::get_latest_update_id - /// [ChannelMonitorUpdate::update_id]: crate::chain::channelmonitor::ChannelMonitorUpdate::update_id - OffChain(u64), - /// An update that was generated during blockchain processing. The ID here is specific to the - /// generating [ChannelMonitor] and does *not* correspond to any on-disk IDs. - /// - /// [ChannelMonitor]: crate::chain::channelmonitor::ChannelMonitor - ChainSync(u64), - } -} - -#[cfg(any(feature = "_test_utils", test))] -pub(crate) use update_origin::UpdateOrigin; -#[cfg(not(any(feature = "_test_utils", test)))] -use update_origin::UpdateOrigin; - -/// An opaque identifier describing a specific [`Persist`] method call. -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub struct MonitorUpdateId { - pub(crate) contents: UpdateOrigin, -} - -impl MonitorUpdateId { - pub(crate) fn from_monitor_update(update: &ChannelMonitorUpdate) -> Self { - Self { contents: UpdateOrigin::OffChain(update.update_id) } - } - pub(crate) fn from_new_monitor(monitor: &ChannelMonitor) -> Self { - Self { contents: UpdateOrigin::OffChain(monitor.get_latest_update_id()) } - } -} - /// `Persist` defines behavior for persisting channel monitors: this could mean /// writing once to disk, and/or uploading to one or more backup services. /// @@ -119,7 +79,7 @@ impl MonitorUpdateId { /// All calls should generally spawn a background task and immediately return /// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes, /// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding -/// [`MonitorUpdateId`]. +/// [`ChannelMonitor::get_latest_update_id`] or [`ChannelMonitorUpdate::update_id`]. /// /// Note that unlike the direct [`chain::Watch`] interface, /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs. @@ -150,15 +110,16 @@ pub trait Persist { /// channel's outpoint (and it is up to you to maintain a correct mapping between the outpoint /// and the stored channel data). Note that you **must** persist every new monitor to disk. /// - /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. + /// The [`ChannelMonitor::get_latest_update_id`] uniquely links this call to [`ChainMonitor::channel_monitor_updated`]. + /// For [`Persist::persist_new_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`] + /// when you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` /// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, monitor: &ChannelMonitor) -> ChannelMonitorUpdateStatus; /// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given /// update. @@ -185,15 +146,17 @@ pub trait Persist { /// them in batches. The size of each monitor grows `O(number of state updates)` /// whereas updates are small and `O(1)`. /// - /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], - /// if you return [`ChannelMonitorUpdateStatus::InProgress`]. + /// The [`ChannelMonitorUpdate::update_id`] or [`ChannelMonitor::get_latest_update_id`] uniquely + /// links this call to [`ChainMonitor::channel_monitor_updated`]. + /// For [`Persist::update_persisted_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`] + /// when an [`ChannelMonitorUpdate`] is provided and when you return [`ChannelMonitorUpdateStatus::InProgress`]. /// /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`, /// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and /// [`ChannelMonitorUpdateStatus`] for requirements when returning errors. /// /// [`Writeable::write`]: crate::util::ser::Writeable::write - fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus; + fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor) -> ChannelMonitorUpdateStatus; /// Prevents the channel monitor from being loaded on startup. /// /// Archiving the data in a backup location (rather than deleting it fully) is useful for @@ -209,13 +172,12 @@ struct MonitorHolder { /// update_persisted_channel, the user returns a /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated /// immediately, racing our insertion of the pending update into the contained Vec. - pending_monitor_updates: Mutex>, + pending_monitor_updates: Mutex>, } impl MonitorHolder { - fn has_pending_offchain_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { - pending_monitor_updates_lock.iter().any(|update_id| - if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false }) + fn has_pending_updates(&self, pending_monitor_updates_lock: &MutexGuard>) -> bool { + !pending_monitor_updates_lock.is_empty() } } @@ -259,7 +221,7 @@ pub struct ChainMonitor, { monitors: RwLock>>, - /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a + /// When we generate a monitor update for a chain-event monitor persistence, we need a /// unique ID, which we calculate by simply getting the next value from this counter. Note that /// the ID is never persisted so it's ok that they reset on restart. sync_persistence_id: AtomicCounter, @@ -346,20 +308,11 @@ where C::Target: chain::Filter, let mut txn_outputs; { txn_outputs = process(monitor, txdata); - let chain_sync_update_id = self.sync_persistence_id.get_increment(); - let update_id = MonitorUpdateId { - contents: UpdateOrigin::ChainSync(chain_sync_update_id), - }; - - log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}", - log_funding_info!(monitor), - chain_sync_update_id - ); - match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) { + log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); + match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) { ChannelMonitorUpdateStatus::Completed => - log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data update_id {}", - log_funding_info!(monitor), - chain_sync_update_id + log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data", + log_funding_info!(monitor) ), ChannelMonitorUpdateStatus::InProgress => { log_debug!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor)); @@ -464,7 +417,10 @@ where C::Target: chain::Filter, #[cfg(not(c_bindings))] /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). - pub fn list_pending_monitor_updates(&self) -> HashMap> { + /// Each `Vec` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates + /// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending + /// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`]. + pub fn list_pending_monitor_updates(&self) -> HashMap> { hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) })) @@ -472,7 +428,10 @@ where C::Target: chain::Filter, #[cfg(c_bindings)] /// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored). - pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { + /// Each `Vec` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates + /// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending + /// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`]. + pub fn list_pending_monitor_updates(&self) -> Vec<(OutPoint, Vec)> { self.monitors.read().unwrap().iter().map(|(outpoint, holder)| { (*outpoint, holder.pending_monitor_updates.lock().unwrap().clone()) }).collect() @@ -491,16 +450,20 @@ where C::Target: chain::Filter, /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the /// update to disk and begins updating any remote (e.g. watchtower/backup) copies, /// returning [`ChannelMonitorUpdateStatus::InProgress`], - /// 2) once all remote copies are updated, you call this function with the - /// `completed_update_id` that completed, and once all pending updates have completed the - /// channel will be re-enabled. - // Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't - // care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We - // only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s. + /// 2) once all remote copies are updated, you call this function with [`ChannelMonitor::get_latest_update_id`] + /// or [`ChannelMonitorUpdate::update_id`] as the `completed_update_id`, and once all pending + /// updates have completed the channel will be re-enabled. + /// + /// It is only necessary to call [`ChainMonitor::channel_monitor_updated`] when you return [`ChannelMonitorUpdateStatus::InProgress`] + /// from [`Persist`] and either: + /// 1. A new [`ChannelMonitor`] was added in [`Persist::persist_new_channel`], or + /// 2. A [`ChannelMonitorUpdate`] was provided as part of [`Persist::update_persisted_channel`]. + /// Note that we don't care about calls to [`Persist::update_persisted_channel`] where no + /// [`ChannelMonitorUpdate`] was provided. /// /// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently /// registered [`ChannelMonitor`]s. - pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) -> Result<(), APIError> { + pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: u64) -> Result<(), APIError> { let monitors = self.monitors.read().unwrap(); let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else { return Err(APIError::APIMisuseError { err: format!("No ChannelMonitor matching funding outpoint {:?} found", funding_txo) }); @@ -508,39 +471,28 @@ where C::Target: chain::Filter, let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap(); pending_monitor_updates.retain(|update_id| *update_id != completed_update_id); - match completed_update_id { - MonitorUpdateId { contents: UpdateOrigin::OffChain(completed_update_id) } => { - // Note that we only check for `UpdateOrigin::OffChain` failures here - if - // we're being told that a `UpdateOrigin::OffChain` monitor update completed, - // we only care about ensuring we don't tell the `ChannelManager` to restore - // the channel to normal operation until all `UpdateOrigin::OffChain` updates - // complete. - // If there's some `UpdateOrigin::ChainSync` update still pending that's okay - // - we can still update our channel state, just as long as we don't return - // `MonitorEvent`s from the monitor back to the `ChannelManager` until they - // complete. - let monitor_is_pending_updates = monitor_data.has_pending_offchain_updates(&pending_monitor_updates); - log_debug!(self.logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}", - completed_update_id, - funding_txo, - if monitor_is_pending_updates { - "still have pending off-chain updates" - } else { - "all off-chain updates complete, returning a MonitorEvent" - }); - if monitor_is_pending_updates { - // If there are still monitor updates pending, we cannot yet construct a - // Completed event. - return Ok(()); - } - let channel_id = monitor_data.monitor.channel_id(); - self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { - funding_txo, channel_id, - monitor_update_id: monitor_data.monitor.get_latest_update_id(), - }], monitor_data.monitor.get_counterparty_node_id())); - }, - MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {}, + // Note that we only check for pending non-chainsync monitor updates and we don't track monitor + // updates resulting from chainsync in `pending_monitor_updates`. + let monitor_is_pending_updates = monitor_data.has_pending_updates(&pending_monitor_updates); + log_debug!(self.logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}", + completed_update_id, + funding_txo, + if monitor_is_pending_updates { + "still have pending off-chain updates" + } else { + "all off-chain updates complete, returning a MonitorEvent" + }); + if monitor_is_pending_updates { + // If there are still monitor updates pending, we cannot yet construct a + // Completed event. + return Ok(()); } + let channel_id = monitor_data.monitor.channel_id(); + self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed { + funding_txo, channel_id, + monitor_update_id: monitor_data.monitor.get_latest_update_id(), + }], monitor_data.monitor.get_counterparty_node_id())); + self.event_notifier.notify(); Ok(()) } @@ -771,9 +723,9 @@ where C::Target: chain::Filter, hash_map::Entry::Vacant(e) => e, }; log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor)); - let update_id = MonitorUpdateId::from_new_monitor(&monitor); + let update_id = monitor.get_latest_update_id(); let mut pending_monitor_updates = Vec::new(); - let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id); + let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor); match persist_res { ChannelMonitorUpdateStatus::InProgress => { log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor)); @@ -823,7 +775,7 @@ where C::Target: chain::Filter, log_trace!(logger, "Updating ChannelMonitor to id {} for channel {}", update.update_id, log_funding_info!(monitor)); let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger); - let update_id = MonitorUpdateId::from_monitor_update(update); + let update_id = update.update_id; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); let persist_res = if update_res.is_err() { // Even if updating the monitor returns an error, the monitor's state will @@ -832,9 +784,9 @@ where C::Target: chain::Filter, // while reading `channel_monitor` with updates from storage. Instead, we should persist // the entire `channel_monitor` here. log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor)); - self.persister.update_persisted_channel(funding_txo, None, monitor, update_id) + self.persister.update_persisted_channel(funding_txo, None, monitor) } else { - self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id) + self.persister.update_persisted_channel(funding_txo, Some(update), monitor) }; match persist_res { ChannelMonitorUpdateStatus::InProgress => { diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 249a089cd48..46f82d2c113 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -20,7 +20,7 @@ use crate::prelude::*; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use crate::chain::chainmonitor::{Persist, MonitorUpdateId}; +use crate::chain::chainmonitor::Persist; use crate::sign::{EntropySource, ecdsa::WriteableEcdsaChannelSigner, SignerProvider}; use crate::chain::transaction::OutPoint; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID}; @@ -208,7 +208,7 @@ impl Persist, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); match self.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -220,7 +220,7 @@ impl Persist, monitor: &ChannelMonitor, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index); match self.write( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, @@ -648,8 +648,7 @@ where /// Persists a new channel. This means writing the entire monitor to the /// parametrized [`KVStore`]. fn persist_new_channel( - &self, funding_txo: OutPoint, monitor: &ChannelMonitor, - _monitor_update_call_id: MonitorUpdateId, + &self, funding_txo: OutPoint, monitor: &ChannelMonitor ) -> chain::ChannelMonitorUpdateStatus { // Determine the proper key for this monitor let monitor_name = MonitorName::from(funding_txo); @@ -693,10 +692,8 @@ where /// - The update is at [`CLOSED_CHANNEL_UPDATE_ID`] fn update_persisted_channel( &self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>, - monitor: &ChannelMonitor, monitor_update_call_id: MonitorUpdateId, + monitor: &ChannelMonitor ) -> chain::ChannelMonitorUpdateStatus { - // IMPORTANT: monitor_update_call_id: MonitorUpdateId is not to be confused with - // ChannelMonitorUpdate's update_id. if let Some(update) = update { if update.update_id != CLOSED_CHANNEL_UPDATE_ID && update.update_id % self.maximum_pending_updates != 0 @@ -732,7 +729,7 @@ where }; // We could write this update, but it meets criteria of our design that calls for a full monitor write. - let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id); + let monitor_update_status = self.persist_new_channel(funding_txo, monitor); if let chain::ChannelMonitorUpdateStatus::Completed = monitor_update_status { let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID { @@ -761,7 +758,7 @@ where } } else { // There is no update given, so we must persist a new monitor. - self.persist_new_channel(funding_txo, monitor, monitor_update_call_id) + self.persist_new_channel(funding_txo, monitor) } } @@ -1117,8 +1114,6 @@ mod tests { check_closed_event(&nodes[1], 1, ClosureReason::HolderForceClosed, false, &[nodes[0].node.get_our_node_id()], 100000); { let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap(); - let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap(); - let update_id = update_map.get(&added_monitors[0].1.channel_id()).unwrap(); let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap(); let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0]; let test_txo = OutPoint { txid: Txid::from_str("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(), index: 0 }; @@ -1130,7 +1125,7 @@ mod tests { entropy_source: node_cfgs[0].keys_manager, signer_provider: node_cfgs[0].keys_manager, }; - match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) { + match ro_persister.persist_new_channel(test_txo, &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { // correct result } @@ -1141,7 +1136,7 @@ mod tests { panic!("Returned InProgress when shouldn't have") } } - match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1, update_id.2) { + match ro_persister.update_persisted_channel(test_txo, Some(cmu), &added_monitors[0].1) { ChannelMonitorUpdateStatus::UnrecoverableError => { // correct result } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index b9672dc405f..642e548e606 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -16,7 +16,6 @@ use crate::chain::chaininterface::ConfirmationTarget; #[cfg(test)] use crate::chain::chaininterface::FEERATE_FLOOR_SATS_PER_KW; use crate::chain::chainmonitor; -use crate::chain::chainmonitor::{MonitorUpdateId}; use crate::chain::channelmonitor; use crate::chain::channelmonitor::MonitorEvent; use crate::chain::transaction::OutPoint; @@ -311,7 +310,7 @@ impl SignerProvider for OnlyReadsKeysInterface { pub struct TestChainMonitor<'a> { pub added_monitors: Mutex)>>, pub monitor_updates: Mutex>>, - pub latest_monitor_update_id: Mutex>, + pub latest_monitor_update_id: Mutex>, pub chain_monitor: chainmonitor::ChainMonitor>, pub keys_manager: &'a TestKeysInterface, /// If this is set to Some(), the next update_channel call (not watch_channel) must be a @@ -350,7 +349,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager)).unwrap().1; assert!(new_monitor == monitor); self.latest_monitor_update_id.lock().unwrap().insert(monitor.channel_id(), - (funding_txo, monitor.get_latest_update_id(), MonitorUpdateId::from_new_monitor(&monitor))); + (funding_txo, monitor.get_latest_update_id(), monitor.get_latest_update_id())); self.added_monitors.lock().unwrap().push((funding_txo, monitor)); self.chain_monitor.watch_channel(funding_txo, new_monitor) } @@ -374,7 +373,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { } self.latest_monitor_update_id.lock().unwrap().insert(channel_id, - (funding_txo, update.update_id, MonitorUpdateId::from_monitor_update(update))); + (funding_txo, update.update_id, update.update_id)); let update_res = self.chain_monitor.update_channel(funding_txo, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... @@ -453,9 +452,9 @@ impl WatchtowerPersister { #[cfg(test)] impl chainmonitor::Persist for WatchtowerPersister { fn persist_new_channel(&self, funding_txo: OutPoint, - data: &channelmonitor::ChannelMonitor, id: MonitorUpdateId + data: &channelmonitor::ChannelMonitor ) -> chain::ChannelMonitorUpdateStatus { - let res = self.persister.persist_new_channel(funding_txo, data, id); + let res = self.persister.persist_new_channel(funding_txo, data); assert!(self.unsigned_justice_tx_data.lock().unwrap() .insert(funding_txo, VecDeque::new()).is_none()); @@ -475,9 +474,9 @@ impl chainmonitor::Persist, - data: &channelmonitor::ChannelMonitor, update_id: MonitorUpdateId + data: &channelmonitor::ChannelMonitor ) -> chain::ChannelMonitorUpdateStatus { - let res = self.persister.update_persisted_channel(funding_txo, update, data, update_id); + let res = self.persister.update_persisted_channel(funding_txo, update, data); if let Some(update) = update { let commitment_txs = data.counterparty_commitment_txs_from_update(update); @@ -515,11 +514,13 @@ pub struct TestPersister { /// returned. pub update_rets: Mutex>, /// When we get an update_persisted_channel call with no ChannelMonitorUpdate, we insert the - /// MonitorUpdateId here. + /// MonitorId here. pub chain_sync_monitor_persistences: Mutex>, /// When we get an update_persisted_channel call *with* a ChannelMonitorUpdate, we insert the - /// MonitorUpdateId here. - pub offchain_monitor_updates: Mutex>>, + /// [`ChannelMonitor::get_latest_update_id`] here. + /// + /// [`ChannelMonitor`]: channelmonitor::ChannelMonitor + pub offchain_monitor_updates: Mutex>>, } impl TestPersister { pub fn new() -> Self { @@ -536,21 +537,21 @@ impl TestPersister { } } impl chainmonitor::Persist for TestPersister { - fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor, _id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { return update_ret } chain::ChannelMonitorUpdateStatus::Completed } - fn update_persisted_channel(&self, funding_txo: OutPoint, update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor, update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus { + fn update_persisted_channel(&self, funding_txo: OutPoint, update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor) -> chain::ChannelMonitorUpdateStatus { let mut ret = chain::ChannelMonitorUpdateStatus::Completed; if let Some(update_ret) = self.update_rets.lock().unwrap().pop_front() { ret = update_ret; } - if update.is_some() { - self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(new_hash_set()).insert(update_id); + if let Some(update) = update { + self.offchain_monitor_updates.lock().unwrap().entry(funding_txo).or_insert(new_hash_set()).insert(update.update_id); } else { self.chain_sync_monitor_persistences.lock().unwrap().push_back(funding_txo); } From 4d5de1fe611ee16026a831d4f24a3801f75b59d6 Mon Sep 17 00:00:00 2001 From: Gursharan Singh <3442979+G8XSU@users.noreply.github.com> Date: Thu, 11 Apr 2024 23:35:54 -0700 Subject: [PATCH 5/5] Remove ChainMonitor::sync_persistence_id It was used earlier for generating unique MonitorUpdateId for UpdateOrigin::ChainSync monitor updates. --- lightning/src/chain/chainmonitor.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 06052dc84de..c68e9a7169e 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -35,7 +35,6 @@ use crate::ln::ChannelId; use crate::sign::ecdsa::WriteableEcdsaChannelSigner; use crate::events; use crate::events::{Event, EventHandler}; -use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; @@ -221,10 +220,6 @@ pub struct ChainMonitor, { monitors: RwLock>>, - /// When we generate a monitor update for a chain-event monitor persistence, we need a - /// unique ID, which we calculate by simply getting the next value from this counter. Note that - /// the ID is never persisted so it's ok that they reset on restart. - sync_persistence_id: AtomicCounter, chain_source: Option, broadcaster: T, logger: L, @@ -353,7 +348,6 @@ where C::Target: chain::Filter, pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { monitors: RwLock::new(new_hash_map()), - sync_persistence_id: AtomicCounter::new(), chain_source, broadcaster, logger,