@@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3232use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs } ;
3333use chain:: transaction:: { OutPoint , TransactionData } ;
3434use chain:: keysinterface:: Sign ;
35+ use util:: atomic_counter:: AtomicCounter ;
3536use util:: logger:: Logger ;
3637use util:: errors:: APIError ;
3738use util:: events;
@@ -41,10 +42,19 @@ use ln::channelmanager::ChannelDetails;
4142use prelude:: * ;
4243use sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
4344use core:: ops:: Deref ;
45+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
4446
4547#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
48+ /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
49+ /// entirely opaque.
4650enum UpdateOrigin {
51+ /// An update that was generated by the [`ChannelManager`] (via our `chain::Watch`
52+ /// implementation). This corresponds to an actual [`ChannelMonitorUpdate::update_id`] field
53+ /// and [`ChannelMonitor::get_latest_update_id`].
4754 OffChain ( u64 ) ,
55+ /// An update that was generated during blockchain processing. The ID here is specific to the
56+ /// generating [`ChainMonitor`] and does *not* correspond to any on-disk IDs.
57+ ChainSync ( u64 ) ,
4858}
4959
5060/// An opaque identifier describing a specific [`Persist`] method call.
@@ -103,6 +113,12 @@ pub trait Persist<ChannelSigner: Sign> {
103113 /// updated monitor itself to disk/backups. See the [`Persist`] trait documentation for more
104114 /// details.
105115 ///
116+ /// During blockchain synchronization operations, this may be called with no
117+ /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
118+ /// Note that after the full [`ChannelMonitor`] is persisted any previous
119+ /// [`ChannelMonitorUpdate`]s which were persisted should be discarded - they can no longer be
120+ /// applied to the persisted [`ChannelMonitor`] as they were already applied.
121+ ///
106122 /// If an implementer chooses to persist the updates only, they need to make
107123 /// sure that all the updates are applied to the `ChannelMonitors` *before*
108124 /// the set of channel monitors is given to the `ChannelManager`
@@ -123,7 +139,7 @@ pub trait Persist<ChannelSigner: Sign> {
123139 /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
124140 ///
125141 /// [`Writeable::write`]: crate::util::ser::Writeable::write
126- fn update_persisted_channel ( & self , channel_id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
142+ fn update_persisted_channel ( & self , channel_id : OutPoint , update : & Option < ChannelMonitorUpdate > , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
127143}
128144
129145struct MonitorHolder < ChannelSigner : Sign > {
@@ -134,14 +150,35 @@ struct MonitorHolder<ChannelSigner: Sign> {
134150 /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
135151 /// channel_monitor_updated immediately, racing our insertion of the pending update into the
136152 /// contained Vec.
153+ ///
154+ /// Beyond the synchronization of updates themselves, we cannot handle user events until after
155+ /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
156+ /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
157+ /// being persisted fully to disk after a chain update.
158+ ///
159+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
160+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
161+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
162+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
163+ /// resulting in a duplicate PaymentSent event.
137164 pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
165+ /// When the user returns a PermanentFailure error from an update_persisted_channel call during
166+ /// block processing, we inform the ChannelManager that the channel should be closed
167+ /// asynchronously. In order to ensure no further changes happen before the ChannelManager has
168+ /// processed the closure event, we set this to true and return PermanentFailure for any other
169+ /// chain::Watch events.
170+ channel_perm_failed : AtomicBool ,
138171}
139172
140173impl < ChannelSigner : Sign > MonitorHolder < ChannelSigner > {
141174 fn has_pending_offchain_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < MonitorUpdateId > > ) -> bool {
142175 pending_monitor_updates_lock. iter ( ) . any ( |update_id|
143176 if let UpdateOrigin :: OffChain ( _) = update_id. contents { true } else { false } )
144177 }
178+ fn has_pending_chainsync_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < MonitorUpdateId > > ) -> bool {
179+ pending_monitor_updates_lock. iter ( ) . any ( |update_id|
180+ if let UpdateOrigin :: ChainSync ( _) = update_id. contents { true } else { false } )
181+ }
145182}
146183
147184/// A read-only reference to a current ChannelMonitor.
@@ -177,11 +214,17 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
177214 P :: Target : Persist < ChannelSigner > ,
178215{
179216 monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
217+ /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a
218+ /// unique ID, which we calculate by simply getting the next value from this counter. Note that
219+ /// the ID is never persisted so it's ok that they reset on restart.
220+ sync_persistence_id : AtomicCounter ,
180221 chain_source : Option < C > ,
181222 broadcaster : T ,
182223 logger : L ,
183224 fee_estimator : F ,
184225 persister : P ,
226+ /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
227+ /// from the user and not from a [`ChannelMonitor`].
185228 pending_monitor_events : Mutex < Vec < MonitorEvent > > ,
186229}
187230
@@ -206,26 +249,50 @@ where C::Target: chain::Filter,
206249 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
207250 {
208251 let mut dependent_txdata = Vec :: new ( ) ;
209- let monitor_states = self . monitors . read ( ) . unwrap ( ) ;
210- for monitor_state in monitor_states. values ( ) {
211- let mut txn_outputs = process ( & monitor_state. monitor , txdata) ;
252+ {
253+ let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
254+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
255+ let monitor = & monitor_state. monitor ;
256+ let mut txn_outputs;
257+ {
258+ txn_outputs = process ( monitor, txdata) ;
259+ let update_id = MonitorUpdateId {
260+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
261+ } ;
262+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
263+
264+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
265+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
266+ Ok ( ( ) ) =>
267+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
268+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
269+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
270+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateFailed ( * funding_outpoint) ) ;
271+ } ,
272+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
273+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
274+ pending_monitor_updates. push ( update_id) ;
275+ } ,
276+ }
277+ }
212278
213- // Register any new outputs with the chain source for filtering, storing any dependent
214- // transactions from within the block that previously had not been included in txdata.
215- if let Some ( ref chain_source) = self . chain_source {
216- let block_hash = header. block_hash ( ) ;
217- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
218- for ( idx, output) in outputs. drain ( ..) {
219- // Register any new outputs with the chain source for filtering and recurse
220- // if it indicates that there are dependent transactions within the block
221- // that had not been previously included in txdata.
222- let output = WatchedOutput {
223- block_hash : Some ( block_hash) ,
224- outpoint : OutPoint { txid, index : idx as u16 } ,
225- script_pubkey : output. script_pubkey ,
226- } ;
227- if let Some ( tx) = chain_source. register_output ( output) {
228- dependent_txdata. push ( tx) ;
279+ // Register any new outputs with the chain source for filtering, storing any dependent
280+ // transactions from within the block that previously had not been included in txdata.
281+ if let Some ( ref chain_source) = self . chain_source {
282+ let block_hash = header. block_hash ( ) ;
283+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
284+ for ( idx, output) in outputs. drain ( ..) {
285+ // Register any new outputs with the chain source for filtering and recurse
286+ // if it indicates that there are dependent transactions within the block
287+ // that had not been previously included in txdata.
288+ let output = WatchedOutput {
289+ block_hash : Some ( block_hash) ,
290+ outpoint : OutPoint { txid, index : idx as u16 } ,
291+ script_pubkey : output. script_pubkey ,
292+ } ;
293+ if let Some ( tx) = chain_source. register_output ( output) {
294+ dependent_txdata. push ( tx) ;
295+ }
229296 }
230297 }
231298 }
@@ -251,6 +318,7 @@ where C::Target: chain::Filter,
251318 pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
252319 Self {
253320 monitors : RwLock :: new ( HashMap :: new ( ) ) ,
321+ sync_persistence_id : AtomicCounter :: new ( ) ,
254322 chain_source,
255323 broadcaster,
256324 logger,
@@ -337,7 +405,7 @@ where C::Target: chain::Filter,
337405 pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
338406
339407 match completed_update_id {
340- MonitorUpdateId { .. } => {
408+ MonitorUpdateId { contents : UpdateOrigin :: OffChain ( _ ) } => {
341409 // Note that we only check for `UpdateOrigin::OffChain` failures here - if
342410 // we're being told that a `UpdateOrigin::OffChain` monitor update completed,
343411 // we only care about ensuring we don't tell the `ChannelManager` to restore
@@ -348,16 +416,22 @@ where C::Target: chain::Filter,
348416 // `MonitorEvent`s from the monitor back to the `ChannelManager` until they
349417 // complete.
350418 let monitor_is_pending_updates = monitor_data. has_pending_offchain_updates ( & pending_monitor_updates) ;
351- if monitor_is_pending_updates {
352- // If there are still monitor updates pending, we cannot yet construct an
419+ if monitor_is_pending_updates || monitor_data. channel_perm_failed . load ( Ordering :: Acquire ) {
420+ // If there are still monitor updates pending (or an old monitor update
421+ // finished after a later one perm-failed), we cannot yet construct an
353422 // UpdateCompleted event.
354423 return Ok ( ( ) ) ;
355424 }
356425 self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateCompleted {
357426 funding_txo,
358427 monitor_update_id : monitor_data. monitor . get_latest_update_id ( ) ,
359428 } ) ;
360- }
429+ } ,
430+ MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => {
431+ // We've already done everything we need to, the next time
432+ // release_pending_monitor_events is called, any events for this ChannelMonitor
433+ // will be returned if there's no more SyncPersistId events left.
434+ } ,
361435 }
362436 Ok ( ( ) )
363437 }
@@ -502,7 +576,11 @@ where C::Target: chain::Filter,
502576 monitor. load_outputs_to_watch ( chain_source) ;
503577 }
504578 }
505- entry. insert ( MonitorHolder { monitor, pending_monitor_updates : Mutex :: new ( pending_monitor_updates) } ) ;
579+ entry. insert ( MonitorHolder {
580+ monitor,
581+ pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
582+ channel_perm_failed : AtomicBool :: new ( false ) ,
583+ } ) ;
506584 persist_res
507585 }
508586
@@ -534,15 +612,19 @@ where C::Target: chain::Filter,
534612 // still be changed. So, persist the updated monitor despite the error.
535613 let update_id = MonitorUpdateId :: from_monitor_update ( & update) ;
536614 let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
537- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor, update_id) ;
615+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & Some ( update) , monitor, update_id) ;
538616 if let Err ( e) = persist_res {
539617 if e == ChannelMonitorUpdateErr :: TemporaryFailure {
540618 pending_monitor_updates. push ( update_id) ;
619+ } else {
620+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
541621 }
542622 log_error ! ( self . logger, "Failed to persist channel monitor update: {:?}" , e) ;
543623 }
544624 if update_res. is_err ( ) {
545625 Err ( ChannelMonitorUpdateErr :: PermanentFailure )
626+ } else if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
627+ Err ( ChannelMonitorUpdateErr :: PermanentFailure )
546628 } else {
547629 persist_res
548630 }
@@ -553,7 +635,23 @@ where C::Target: chain::Filter,
553635 fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
554636 let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
555637 for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
556- pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
638+ let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
639+ if is_pending_monitor_update {
640+ log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
641+ } else {
642+ if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
643+ // If a `UpdateOrigin::ChainSync` persistence failed with `PermanantFailure`,
644+ // we don't really know if the latest `ChannelMonitor` state is on disk or not.
645+ // We're supposed to hold monitor updates until the latest state is on disk to
646+ // avoid duplicate events, but the user told us persistence is screw-y and may
647+ // not complete. We can't hold events forever because we may learn some payment
648+ // preimage, so instead we just log and hope the user complied with the
649+ // `PermanentFailure` requirements of having at least the local-disk copy
650+ // updated.
651+ log_info ! ( self . logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!" ) ;
652+ }
653+ pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
654+ }
557655 }
558656 pending_monitor_events
559657 }
0 commit comments