diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1d87eccfe66..5ff1fcb0171 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -3271,8 +3271,8 @@ macro_rules! locked_close_channel { }}; ($self: ident, $peer_state: expr, $funded_chan: expr, $shutdown_res_mut: expr, FUNDED) => {{ if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() { - handle_new_monitor_update!($self, funding_txo, update, $peer_state, - $funded_chan.context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER); + handle_new_monitor_update_actions_deferred!($self, funding_txo, update, $peer_state, + $funded_chan.context); } // If there's a possibility that we need to generate further monitor updates for this // channel, we need to store the last update_id of it. However, we don't want to insert @@ -3628,57 +3628,152 @@ macro_rules! handle_monitor_update_completion { } } } -macro_rules! handle_new_monitor_update { - ($self: ident, $update_res: expr, $logger: expr, $channel_id: expr, _internal, $completed: expr) => { { - debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); - match $update_res { - ChannelMonitorUpdateStatus::UnrecoverableError => { - let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; - log_error!($logger, "{}", err_str); - panic!("{}", err_str); - }, - ChannelMonitorUpdateStatus::InProgress => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if $self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - log_debug!($logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", - $channel_id); - false - }, - ChannelMonitorUpdateStatus::Completed => { - #[cfg(not(any(test, feature = "_externalize_tests")))] - if $self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 { - panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); - } - $completed; - true - }, - } - } }; - ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => { +/// Returns whether the monitor update is completed, `false` if the update is in-progress. +fn handle_monitor_update_res( + cm: &CM, update_res: ChannelMonitorUpdateStatus, channel_id: ChannelId, logger: LG, +) -> bool { + debug_assert!(cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire)); + match update_res { + ChannelMonitorUpdateStatus::UnrecoverableError => { + let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down."; + log_error!(logger, "{}", err_str); + panic!("{}", err_str); + }, + ChannelMonitorUpdateStatus::InProgress => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if cm.get_cm().monitor_update_type.swap(1, Ordering::Relaxed) == 2 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } + log_debug!(logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", + channel_id); + false + }, + ChannelMonitorUpdateStatus::Completed => { + #[cfg(not(any(test, feature = "_externalize_tests")))] + if cm.get_cm().monitor_update_type.swap(2, Ordering::Relaxed) == 1 { + panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart"); + } + true + }, + } +} + +macro_rules! handle_initial_monitor { + ($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); - handle_new_monitor_update!($self, $update_res, logger, $chan.context.channel_id(), _internal, - handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan)) + let update_completed = + handle_monitor_update_res($self, $update_res, $chan.context.channel_id(), logger); + if update_completed { + handle_monitor_update_completion!( + $self, + $peer_state_lock, + $peer_state, + $per_peer_state_lock, + $chan + ); + } }; +} + +macro_rules! handle_post_close_monitor_update { + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, + $per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr + ) => {{ + let logger = + WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None); + let in_flight_updates; + let idx; + handle_new_monitor_update_internal!( + $self, + $funding_txo, + $update, + $peer_state, + logger, + $channel_id, + $counterparty_node_id, + in_flight_updates, + idx, + { + let _ = in_flight_updates.remove(idx); + if in_flight_updates.is_empty() { + let update_actions = $peer_state + .monitor_update_blocked_actions + .remove(&$channel_id) + .unwrap_or(Vec::new()); + + mem::drop($peer_state_lock); + mem::drop($per_peer_state_lock); + + $self.handle_monitor_update_completion_actions(update_actions); + } + } + ) + }}; +} + +/// Handles a new monitor update without dropping peer_state locks and calling +/// [`ChannelManager::handle_monitor_update_completion_actions`] if the monitor update completed +/// synchronously. +/// +/// Useful because monitor updates need to be handled in the same mutex where the channel generated +/// them (otherwise they can end up getting applied out-of-order) but it's not always possible to +/// drop the aforementioned peer state locks at a given callsite. In this situation, use this macro +/// to apply the monitor update immediately and handle the monitor update completion actions at a +/// later time. +macro_rules! handle_new_monitor_update_actions_deferred { + ( + $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr + ) => {{ + let logger = WithChannelContext::from(&$self.logger, &$chan_context, None); + let chan_id = $chan_context.channel_id(); + let counterparty_node_id = $chan_context.get_counterparty_node_id(); + let in_flight_updates; + let idx; + handle_new_monitor_update_internal!( + $self, + $funding_txo, + $update, + $peer_state, + logger, + chan_id, + counterparty_node_id, + in_flight_updates, + idx, + { + let _ = in_flight_updates.remove(idx); + } + ) + }}; +} + +macro_rules! handle_new_monitor_update_internal { ( $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr, $chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident, - _internal_outer, $completed: expr - ) => { { - $in_flight_updates = &mut $peer_state.in_flight_monitor_updates.entry($chan_id) - .or_insert_with(|| ($funding_txo, Vec::new())).1; + $completed: expr + ) => {{ + $in_flight_updates = &mut $peer_state + .in_flight_monitor_updates + .entry($chan_id) + .or_insert_with(|| ($funding_txo, Vec::new())) + .1; // During startup, we push monitor updates as background events through to here in // order to replay updates that were in-flight when we shut down. Thus, we have to // filter for uniqueness here. - $update_idx = $in_flight_updates.iter().position(|upd| upd == &$update) - .unwrap_or_else(|| { + $update_idx = + $in_flight_updates.iter().position(|upd| upd == &$update).unwrap_or_else(|| { $in_flight_updates.push($update); $in_flight_updates.len() - 1 }); if $self.background_events_processed_since_startup.load(Ordering::Acquire) { - let update_res = $self.chain_monitor.update_channel($chan_id, &$in_flight_updates[$update_idx]); - handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed) + let update_res = + $self.chain_monitor.update_channel($chan_id, &$in_flight_updates[$update_idx]); + let update_completed = handle_monitor_update_res($self, update_res, $chan_id, $logger); + if update_completed { + $completed; + } + update_completed } else { // We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we // fail to persist it. This is a fairly safe assumption, however, since anything we do @@ -3700,62 +3795,43 @@ macro_rules! handle_new_monitor_update { $self.pending_background_events.lock().unwrap().push(event); false } - } }; - ( - $self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr, - REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER - ) => { { - let logger = WithChannelContext::from(&$self.logger, &$chan_context, None); - let chan_id = $chan_context.channel_id(); - let counterparty_node_id = $chan_context.get_counterparty_node_id(); - let in_flight_updates; - let idx; - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - counterparty_node_id, in_flight_updates, idx, _internal_outer, - { - let _ = in_flight_updates.remove(idx); - }) - } }; - ( - $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, - $per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE - ) => { { - let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None); - let in_flight_updates; - let idx; - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, - $channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer, - { - let _ = in_flight_updates.remove(idx); - if in_flight_updates.is_empty() { - let update_actions = $peer_state.monitor_update_blocked_actions - .remove(&$channel_id).unwrap_or(Vec::new()); - - mem::drop($peer_state_lock); - mem::drop($per_peer_state_lock); + }}; +} - $self.handle_monitor_update_completion_actions(update_actions); - } - }) - } }; +macro_rules! handle_new_monitor_update { ( $self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr - ) => { { + ) => {{ let logger = WithChannelContext::from(&$self.logger, &$chan.context, None); let chan_id = $chan.context.channel_id(); let counterparty_node_id = $chan.context.get_counterparty_node_id(); let in_flight_updates; let idx; - handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id, - counterparty_node_id, in_flight_updates, idx, _internal_outer, + handle_new_monitor_update_internal!( + $self, + $funding_txo, + $update, + $peer_state, + logger, + chan_id, + counterparty_node_id, + in_flight_updates, + idx, { let _ = in_flight_updates.remove(idx); if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 { - handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); + handle_monitor_update_completion!( + $self, + $peer_state_lock, + $peer_state, + $per_peer_state_lock, + $chan + ); } - }) - } }; + } + ) + }}; } #[rustfmt::skip] @@ -4432,9 +4508,9 @@ where hash_map::Entry::Vacant(_) => {}, } - handle_new_monitor_update!( + handle_post_close_monitor_update!( self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state, - counterparty_node_id, channel_id, POST_CHANNEL_CLOSE + counterparty_node_id, channel_id ); } @@ -8893,7 +8969,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ .push(action); } - handle_new_monitor_update!( + handle_post_close_monitor_update!( self, prev_hop.funding_txo, preimage_update, @@ -8901,8 +8977,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ peer_state, per_peer_state, prev_hop.counterparty_node_id, - chan_id, - POST_CHANNEL_CLOSE + chan_id ); } @@ -10056,8 +10131,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } if let Some(funded_chan) = e.insert(Channel::from(chan)).as_funded_mut() { - handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state, - per_peer_state, funded_chan, INITIAL_MONITOR); + handle_initial_monitor!(self, persist_state, peer_state_lock, peer_state, + per_peer_state, funded_chan); } else { unreachable!("This must be a funded channel as we just inserted it."); } @@ -10220,7 +10295,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }) { Ok((funded_chan, persist_status)) => { - handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, funded_chan, INITIAL_MONITOR); + handle_initial_monitor!(self, persist_status, peer_state_lock, peer_state, per_peer_state, funded_chan); Ok(()) }, Err(e) => try_channel_entry!(self, peer_state, Err(e), chan_entry), @@ -10845,8 +10920,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ if let Some(monitor) = monitor_opt { let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor); if let Ok(persist_state) = monitor_res { - handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state, - per_peer_state, chan, INITIAL_MONITOR); + handle_initial_monitor!(self, persist_state, peer_state_lock, peer_state, + per_peer_state, chan); } else { let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_error!(logger, "Persisting initial ChannelMonitor failed, implying the channel ID was duplicated"); @@ -13283,7 +13358,7 @@ where }; self.pending_background_events.lock().unwrap().push(event); } else { - handle_new_monitor_update!( + handle_post_close_monitor_update!( self, channel_funding_outpoint, update, @@ -13291,8 +13366,7 @@ where peer_state, per_peer_state, counterparty_node_id, - channel_id, - POST_CHANNEL_CLOSE + channel_id ); } }, @@ -13976,13 +14050,12 @@ where insert_short_channel_id!(short_to_chan_info, funded_channel); if let Some(monitor_update) = monitor_update_opt { - handle_new_monitor_update!( + handle_new_monitor_update_actions_deferred!( self, funding_txo, monitor_update, peer_state, - funded_channel.context, - REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER + funded_channel.context ); to_process_monitor_update_actions.push(( counterparty_node_id, channel_id