@@ -41,36 +41,54 @@ use prelude::*;
4141use sync:: { RwLock , RwLockReadGuard , Mutex } ;
4242use core:: ops:: Deref ;
4343
44+ #[ derive( Clone , Copy , Hash , PartialEq ) ]
45+ pub ( crate ) enum MonitorUpdate {
46+ MonitorUpdateId ( u64 ) ,
47+ }
48+
49+ /// An opaque identifier describing a specific [`Persist`] method call.
50+ #[ derive( Clone , Copy , Hash , PartialEq ) ]
51+ pub struct MonitorUpdateId {
52+ pub ( crate ) contents : MonitorUpdate ,
53+ }
54+
4455/// `Persist` defines behavior for persisting channel monitors: this could mean
4556/// writing once to disk, and/or uploading to one or more backup services.
4657///
47- /// Note that for every new monitor, you **must** persist the new `ChannelMonitor`
48- /// to disk/backups. And, on every update, you **must** persist either the
49- /// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk
50- /// of situations such as revoking a transaction, then crashing before this
51- /// revocation can be persisted, then unintentionally broadcasting a revoked
52- /// transaction and losing money. This is a risk because previous channel states
53- /// are toxic, so it's important that whatever channel state is persisted is
54- /// kept up-to-date.
58+ /// Each method can return three possible values:
59+ /// * If persistence (including any relevant `fsync()` calls) happens immediately, the
60+ /// implementation should return `Ok(())`, indicating normal channel operation should continue.
61+ /// * If persistence happens asynchronously, implementations should first ensure the
62+ /// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
63+ /// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
64+ /// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
65+ /// called with the corresponding [`MonitorUpdateId`].
66+ ///
67+ /// Note that unlike the direct [`chain::Watch`] interface,
68+ /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
69+ ///
70+ /// * If persistence fails for some reason, implementations should return
71+ /// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
72+ /// closed without broadcasting the latest state. See
73+ /// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
5574pub trait Persist < ChannelSigner : Sign > {
56- /// Persist a new channel's data. The data can be stored any way you want, but
57- /// the identifier provided by Rust-Lightning is the channel's outpoint (and
58- /// it is up to you to maintain a correct mapping between the outpoint and the
59- /// stored channel data). Note that you **must** persist every new monitor to
60- /// disk. See the `Persist` trait documentation for more details.
75+ /// Persist a new channel's data. The data can be stored any way you want, but the identifier
76+ /// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct
77+ /// mapping between the outpoint and the stored channel data). Note that you **must** persist
78+ /// every new monitor to disk.
6179 ///
6280 /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
6381 /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
6482 ///
6583 /// [`Writeable::write`]: crate::util::ser::Writeable::write
66- fn persist_new_channel ( & self , id : OutPoint , data : & ChannelMonitor < ChannelSigner > ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
84+ fn persist_new_channel ( & self , id : OutPoint , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
6785
68- /// Update one channel's data. The provided `ChannelMonitor` has already
69- /// applied the given update.
86+ /// Update one channel's data. The provided [ `ChannelMonitor`] has already applied the given
87+ /// update.
7088 ///
71- /// Note that on every update, you **must** persist either the
72- /// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See
73- /// the `Persist` trait documentation for more details.
89+ /// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the
90+ /// updated monitor itself to disk/backups. See the `Persist` trait documentation for more
91+ /// details.
7492 ///
7593 /// If an implementer chooses to persist the updates only, they need to make
7694 /// sure that all the updates are applied to the `ChannelMonitors` *before*
@@ -89,11 +107,18 @@ pub trait Persist<ChannelSigner: Sign> {
89107 /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
90108 ///
91109 /// [`Writeable::write`]: crate::util::ser::Writeable::write
92- fn update_persisted_channel ( & self , id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
110+ fn update_persisted_channel ( & self , id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
93111}
94112
95113struct MonitorHolder < ChannelSigner : Sign > {
96114 monitor : ChannelMonitor < ChannelSigner > ,
115+ /// The full set of pending monitor updates for this Channel.
116+ ///
117+ /// Note that this lock must be held during updates to prevent a race where we call
118+ /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
119+ /// channel_monitor_updated immediately, racing our insertion of the pending update into the
120+ /// contained Vec.
121+ pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
97122}
98123
99124/// A read-only reference to a current ChannelMonitor.
@@ -262,23 +287,43 @@ where C::Target: chain::Filter,
262287 /// Indicates the persistence of a [`ChannelMonitor`] has completed after
263288 /// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
264289 ///
265- /// All ChannelMonitor updates up to and including highest_applied_update_id must have been
266- /// fully committed in every copy of the given channels' ChannelMonitors.
267- ///
268- /// Note that there is no effect to calling with a highest_applied_update_id other than the
269- /// current latest ChannelMonitorUpdate and one call to this function after multiple
270- /// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
271- /// exists largely only to prevent races between this and concurrent update_monitor calls.
272- ///
273290 /// Thus, the anticipated use is, at a high level:
274291 /// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
275292 /// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
276293 /// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
277294 /// 2) once all remote copies are updated, you call this function with the update_id that
278- /// completed, and once it is the latest the Channel will be re-enabled.
279- pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , highest_applied_update_id : u64 ) {
295+ /// completed, and once all pending updates have completed the Channel will be re-enabled.
296+ pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , completed_update_id : MonitorUpdateId ) {
297+ let monitors = self . monitors . read ( ) . unwrap ( ) ;
298+ let monitor_data = if let Some ( mon) = monitors. get ( & funding_txo) { mon } else { return ; } ;
299+ let mut pending_monitor_updates = monitor_data. pending_monitor_updates . lock ( ) . unwrap ( ) ;
300+ pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
301+
302+ match completed_update_id {
303+ MonitorUpdateId { .. } => {
304+ let monitor_update_pending_updates = pending_monitor_updates. iter ( ) . filter ( |update_id|
305+ if let MonitorUpdate :: MonitorUpdateId ( _) = update_id. contents { true } else { false } ) . count ( ) ;
306+ if monitor_update_pending_updates != 0 {
307+ // If there are still monitor updates pending, we cannot yet construct an
308+ // UpdateCompleted event.
309+ return ;
310+ }
311+ self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted ( MonitorUpdated {
312+ funding_txo,
313+ monitor_update_id : monitor_data. monitor . get_latest_update_id ( ) ,
314+ } ) ) ;
315+ }
316+ }
317+ }
318+
319+ /// This wrapper avoids having to update some of our tests for now as they assume the direct
320+ /// chain::Watch API wherein we mark a monitor fully-updated by just calling
321+ /// channel_monitor_updated once with the higest ID.
322+ #[ cfg( test) ]
323+ pub fn force_channel_monitor_updated ( & self , funding_txo : OutPoint , monitor_update_id : u64 ) {
280324 self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted ( MonitorUpdated {
281- funding_txo, monitor_update_id : highest_applied_update_id
325+ funding_txo,
326+ monitor_update_id,
282327 } ) ) ;
283328 }
284329
@@ -392,12 +437,18 @@ where C::Target: chain::Filter,
392437 return Err ( ChannelMonitorUpdateErr :: PermanentFailure ) } ,
393438 hash_map:: Entry :: Vacant ( e) => e,
394439 } ;
395- let update_res = self . persister . persist_new_channel ( funding_outpoint, & monitor) ;
440+ let update_id = MonitorUpdateId {
441+ contents : MonitorUpdate :: MonitorUpdateId ( monitor. get_latest_update_id ( ) ) ,
442+ } ;
443+ let mut pending_monitor_updates = Vec :: new ( ) ;
444+ let update_res = self . persister . persist_new_channel ( funding_outpoint, & monitor, update_id) ;
396445 if update_res. is_err ( ) {
397446 log_error ! ( self . logger, "Failed to persist new channel data: {:?}" , update_res) ;
398447 }
399448 if update_res == Err ( ChannelMonitorUpdateErr :: PermanentFailure ) {
400449 return update_res;
450+ } else if update_res. is_err ( ) {
451+ pending_monitor_updates. push ( update_id) ;
401452 }
402453 {
403454 let funding_txo = monitor. get_funding_txo ( ) ;
@@ -407,7 +458,7 @@ where C::Target: chain::Filter,
407458 monitor. load_outputs_to_watch ( chain_source) ;
408459 }
409460 }
410- entry. insert ( MonitorHolder { monitor } ) ;
461+ entry. insert ( MonitorHolder { monitor, pending_monitor_updates : Mutex :: new ( pending_monitor_updates ) } ) ;
411462 update_res
412463 }
413464
@@ -437,8 +488,15 @@ where C::Target: chain::Filter,
437488 }
438489 // Even if updating the monitor returns an error, the monitor's state will
439490 // still be changed. So, persist the updated monitor despite the error.
440- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor) ;
441- if let Err ( ref e) = persist_res {
491+ let update_id = MonitorUpdateId {
492+ contents : MonitorUpdate :: MonitorUpdateId ( update. update_id ) ,
493+ } ;
494+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
495+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor, update_id) ;
496+ if let Err ( e) = persist_res {
497+ if e == ChannelMonitorUpdateErr :: TemporaryFailure {
498+ pending_monitor_updates. push ( update_id) ;
499+ }
442500 log_error ! ( self . logger, "Failed to persist channel monitor update: {:?}" , e) ;
443501 }
444502 if update_res. is_err ( ) {
0 commit comments