@@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3232use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , MonitorUpdated , TransactionOutputs } ;
3333use chain:: transaction:: { OutPoint , TransactionData } ;
3434use chain:: keysinterface:: Sign ;
35+ use util:: atomic_counter:: AtomicCounter ;
3536use util:: logger:: Logger ;
3637use util:: events;
3738use util:: events:: EventHandler ;
@@ -40,10 +41,12 @@ use ln::channelmanager::ChannelDetails;
4041use prelude:: * ;
4142use sync:: { RwLock , RwLockReadGuard , Mutex } ;
4243use core:: ops:: Deref ;
44+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
4345
4446#[ derive( Clone , Copy , Hash , PartialEq ) ]
4547pub ( crate ) enum MonitorUpdate {
4648 MonitorUpdateId ( u64 ) ,
49+ SyncPersistId ( u64 ) ,
4750}
4851
4952/// An opaque identifier describing a specific [`Persist`] method call.
@@ -90,6 +93,9 @@ pub trait Persist<ChannelSigner: Sign> {
9093 /// updated monitor itself to disk/backups. See the `Persist` trait documentation for more
9194 /// details.
9295 ///
96+ /// During blockchain synchronization operations, this may be called with no
97+ /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
98+ ///
9399 /// If an implementer chooses to persist the updates only, they need to make
94100 /// sure that all the updates are applied to the `ChannelMonitors` *before*
95101 /// the set of channel monitors is given to the `ChannelManager`
@@ -107,7 +113,7 @@ pub trait Persist<ChannelSigner: Sign> {
107113 /// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
108114 ///
109115 /// [`Writeable::write`]: crate::util::ser::Writeable::write
110- fn update_persisted_channel ( & self , id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
116+ fn update_persisted_channel ( & self , id : OutPoint , update : & Option < ChannelMonitorUpdate > , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
111117}
112118
113119struct MonitorHolder < ChannelSigner : Sign > {
@@ -118,7 +124,24 @@ struct MonitorHolder<ChannelSigner: Sign> {
118124 /// update_persisted_channel, the user returns a TemporaryFailure, and then calls
119125 /// channel_monitor_updated immediately, racing our insertion of the pending update into the
120126 /// contained Vec.
127+ ///
128+ /// Beyond the synchronization of updates themselves, we cannot handle user events until after
129+ /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
130+ /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
131+ /// being persisted fully ro disk after a chain update.
132+ ///
133+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
134+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
135+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
136+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
137+ /// resulting in a duplicate PaymentSent event.
121138 pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
139+ /// When the user returns a PermanentFailure error from an update_persisted_channel call during
140+ /// block processing, we inform the ChannelManager that the channel should be closed
141+ /// asynchronously. In order to ensure no further changes happen before the ChannelManager has
142+ /// processed the closure event, we set this to true and return PermanentFailure for any other
143+ /// chain::Watch events.
144+ channel_closed : AtomicBool ,
122145}
123146
124147/// A read-only reference to a current ChannelMonitor.
@@ -154,6 +177,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
154177 P :: Target : Persist < ChannelSigner > ,
155178{
156179 monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
180+ sync_persistence_id : AtomicCounter ,
157181 chain_source : Option < C > ,
158182 broadcaster : T ,
159183 logger : L ,
@@ -183,26 +207,50 @@ where C::Target: chain::Filter,
183207 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
184208 {
185209 let mut dependent_txdata = Vec :: new ( ) ;
186- let monitors = self . monitors . read ( ) . unwrap ( ) ;
187- for monitor_state in monitors. values ( ) {
188- let mut txn_outputs = process ( & monitor_state. monitor , txdata) ;
210+ {
211+ let monitors = self . monitors . write ( ) . unwrap ( ) ;
212+ for ( funding_outpoint, monitor_state) in monitors. iter ( ) {
213+ let monitor = & monitor_state. monitor ;
214+ let mut txn_outputs;
215+ {
216+ txn_outputs = process ( monitor, txdata) ;
217+ let update_id = MonitorUpdateId {
218+ contents : MonitorUpdate :: SyncPersistId ( self . sync_persistence_id . get_increment ( ) ) ,
219+ } ;
220+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
221+
222+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
223+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
224+ Ok ( ( ) ) =>
225+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
226+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
227+ monitor_state. channel_closed . store ( true , Ordering :: Release ) ;
228+ self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateFailed ( * funding_outpoint) ) ;
229+ } ,
230+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
231+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
232+ pending_monitor_updates. push ( update_id) ;
233+ } ,
234+ }
235+ }
189236
190- // Register any new outputs with the chain source for filtering, storing any dependent
191- // transactions from within the block that previously had not been included in txdata.
192- if let Some ( ref chain_source) = self . chain_source {
193- let block_hash = header. block_hash ( ) ;
194- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
195- for ( idx, output) in outputs. drain ( ..) {
196- // Register any new outputs with the chain source for filtering and recurse
197- // if it indicates that there are dependent transactions within the block
198- // that had not been previously included in txdata.
199- let output = WatchedOutput {
200- block_hash : Some ( block_hash) ,
201- outpoint : OutPoint { txid, index : idx as u16 } ,
202- script_pubkey : output. script_pubkey ,
203- } ;
204- if let Some ( tx) = chain_source. register_output ( output) {
205- dependent_txdata. push ( tx) ;
237+ // Register any new outputs with the chain source for filtering, storing any dependent
238+ // transactions from within the block that previously had not been included in txdata.
239+ if let Some ( ref chain_source) = self . chain_source {
240+ let block_hash = header. block_hash ( ) ;
241+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
242+ for ( idx, output) in outputs. drain ( ..) {
243+ // Register any new outputs with the chain source for filtering and recurse
244+ // if it indicates that there are dependent transactions within the block
245+ // that had not been previously included in txdata.
246+ let output = WatchedOutput {
247+ block_hash : Some ( block_hash) ,
248+ outpoint : OutPoint { txid, index : idx as u16 } ,
249+ script_pubkey : output. script_pubkey ,
250+ } ;
251+ if let Some ( tx) = chain_source. register_output ( output) {
252+ dependent_txdata. push ( tx) ;
253+ }
206254 }
207255 }
208256 }
@@ -228,6 +276,7 @@ where C::Target: chain::Filter,
228276 pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
229277 Self {
230278 monitors : RwLock :: new ( HashMap :: new ( ) ) ,
279+ sync_persistence_id : AtomicCounter :: new ( ) ,
231280 chain_source,
232281 broadcaster,
233282 logger,
@@ -300,7 +349,7 @@ where C::Target: chain::Filter,
300349 pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
301350
302351 match completed_update_id {
303- MonitorUpdateId { .. } => {
352+ MonitorUpdateId { contents : MonitorUpdate :: MonitorUpdateId ( _ ) } => {
304353 let monitor_update_pending_updates = pending_monitor_updates. iter ( ) . filter ( |update_id|
305354 if let MonitorUpdate :: MonitorUpdateId ( _) = update_id. contents { true } else { false } ) . count ( ) ;
306355 if monitor_update_pending_updates != 0 {
@@ -312,7 +361,12 @@ where C::Target: chain::Filter,
312361 funding_txo,
313362 monitor_update_id : monitor_data. monitor . get_latest_update_id ( ) ,
314363 } ) ) ;
315- }
364+ } ,
365+ MonitorUpdateId { contents : MonitorUpdate :: SyncPersistId ( _) } => {
366+ // We've already done everything we need to, the next time release_monitor_events
367+ // is called, any events for this ChannelMonitor will be returned if there's no
368+ // more SyncPersistId events left.
369+ } ,
316370 }
317371 }
318372
@@ -458,7 +512,11 @@ where C::Target: chain::Filter,
458512 monitor. load_outputs_to_watch ( chain_source) ;
459513 }
460514 }
461- entry. insert ( MonitorHolder { monitor, pending_monitor_updates : Mutex :: new ( pending_monitor_updates) } ) ;
515+ entry. insert ( MonitorHolder {
516+ monitor,
517+ pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
518+ channel_closed : AtomicBool :: new ( false ) ,
519+ } ) ;
462520 update_res
463521 }
464522
@@ -492,7 +550,7 @@ where C::Target: chain::Filter,
492550 contents : MonitorUpdate :: MonitorUpdateId ( update. update_id ) ,
493551 } ;
494552 let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
495- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor, update_id) ;
553+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & Some ( update) , monitor, update_id) ;
496554 if let Err ( e) = persist_res {
497555 if e == ChannelMonitorUpdateErr :: TemporaryFailure {
498556 pending_monitor_updates. push ( update_id) ;
@@ -501,6 +559,8 @@ where C::Target: chain::Filter,
501559 }
502560 if update_res. is_err ( ) {
503561 Err ( ChannelMonitorUpdateErr :: PermanentFailure )
562+ } else if monitor_state. channel_closed . load ( Ordering :: Acquire ) {
563+ Err ( ChannelMonitorUpdateErr :: PermanentFailure )
504564 } else {
505565 persist_res
506566 }
@@ -511,7 +571,17 @@ where C::Target: chain::Filter,
511571 fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
512572 let mut pending_monitor_events = self . user_provided_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
513573 for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
514- pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
574+ let pending_monitor_update_count = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( )
575+ . iter ( ) . filter ( |update_id|
576+ if let MonitorUpdate :: SyncPersistId ( _) = update_id. contents { true } else { false } )
577+ . count ( ) ;
578+ if pending_monitor_update_count > 0 {
579+ log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
580+ } else if monitor_state. channel_closed . load ( Ordering :: Acquire ) {
581+ log_info ! ( self . logger, "A Channel Monitor sync failed, refusing to provide monitor events!" ) ;
582+ } else {
583+ pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
584+ }
515585 }
516586 pending_monitor_events
517587 }
0 commit comments