-
Notifications
You must be signed in to change notification settings - Fork 414
Process monitor update events in block_[dis]connected asynchronously #808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8902676
23c3278
280de80
ba6eee2
d481008
5351f9f
93a7572
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -206,7 +206,7 @@ pub struct PaymentPreimage(pub [u8;32]); | |
#[derive(Hash, Copy, Clone, PartialEq, Eq, Debug)] | ||
pub struct PaymentSecret(pub [u8;32]); | ||
|
||
type ShutdownResult = (Option<OutPoint>, ChannelMonitorUpdate, Vec<(HTLCSource, PaymentHash)>); | ||
type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>); | ||
|
||
/// Error type returned across the channel_state mutex boundary. When an Err is generated for a | ||
/// Channel, we generally end up with a ChannelError::Close for which we have to close the channel | ||
|
@@ -333,6 +333,15 @@ pub(super) struct ChannelHolder<Signer: Sign> { | |
pub(super) pending_msg_events: Vec<MessageSendEvent>, | ||
} | ||
|
||
/// Events which we process internally but cannot be procsesed immediately at the generation site | ||
/// for some reason. They are handled in timer_chan_freshness_every_min, so may be processed with | ||
/// quite some time lag. | ||
enum BackgroundEvent { | ||
/// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder | ||
/// commitment transaction. | ||
ClosingMonitorUpdate((OutPoint, ChannelMonitorUpdate)), | ||
} | ||
|
||
/// State we hold per-peer. In the future we should put channels in here, but for now we only hold | ||
/// the latest Init features we heard from the peer. | ||
struct PeerState { | ||
|
@@ -436,6 +445,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, | |
per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>, | ||
|
||
pending_events: Mutex<Vec<events::Event>>, | ||
pending_background_events: Mutex<Vec<BackgroundEvent>>, | ||
/// Used when we have to take a BIG lock to make sure everything is self-consistent. | ||
/// Essentially just when we're serializing ourselves out. | ||
/// Taken first everywhere where we are making changes before any other locks. | ||
|
@@ -794,6 +804,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
per_peer_state: RwLock::new(HashMap::new()), | ||
|
||
pending_events: Mutex::new(Vec::new()), | ||
pending_background_events: Mutex::new(Vec::new()), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: PersistenceNotifier::new(), | ||
|
||
|
@@ -942,12 +953,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
|
||
#[inline] | ||
fn finish_force_close_channel(&self, shutdown_res: ShutdownResult) { | ||
let (funding_txo_option, monitor_update, mut failed_htlcs) = shutdown_res; | ||
let (monitor_update_option, mut failed_htlcs) = shutdown_res; | ||
log_trace!(self.logger, "Finishing force-closure of channel {} HTLCs to fail", failed_htlcs.len()); | ||
for htlc_source in failed_htlcs.drain(..) { | ||
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_source.0, &htlc_source.1, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() }); | ||
} | ||
if let Some(funding_txo) = funding_txo_option { | ||
if let Some((funding_txo, monitor_update)) = monitor_update_option { | ||
// There isn't anything we can do if we get an update failure - we're already | ||
// force-closing. The monitor update on the required in-memory copy should broadcast | ||
// the latest local state, which is the best we can do anyway. Thus, it is safe to | ||
|
@@ -1854,13 +1865,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
events.append(&mut new_events); | ||
} | ||
|
||
/// Free the background events, generally called from timer_chan_freshness_every_min. | ||
/// | ||
/// Exposed for testing to allow us to process events quickly without generating accidental | ||
/// BroadcastChannelUpdate events in timer_chan_freshness_every_min. | ||
/// | ||
/// Expects the caller to have a total_consistency_lock read lock. | ||
fn process_background_events(&self) { | ||
let mut background_events = Vec::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can assert the lock consistency requirement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, it only needs a read lock, not a write lock, and there's no way to assert that the current thread holds one - we could assert that no thread holds a write lock, but that's a not quite sufficient. |
||
mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); | ||
for event in background_events.drain(..) { | ||
match event { | ||
BackgroundEvent::ClosingMonitorUpdate((funding_txo, update)) => { | ||
// The channel has already been closed, so no use bothering to care about the | ||
// monitor updating completing. | ||
let _ = self.chain_monitor.update_channel(funding_txo, update); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. Seems a bit weird to just let the update die silently. Should we log? Also, should we put it back in the event queue and give it 3 chances to succeed, or so? Seems like a loose end to trust the counterparty to get the commit tx confirmed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, we don't really handle it anywhere else - in the case of a TemporaryFailure the API requires users to have stored it somewhere (as we never provide duplicate/old monitor updates). In the case of a permanent failure, indeed, we're a little hosed, but that isn't an issue specific to this case - in any permanent failure case if the final force-closure monitor update fails to be delivered the user will need to manually intervene and call the relevant method to get the latest commitment transaction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't the update succeed but persistence fail? Would this be a problem to ignore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean "the update succeed but persistence fail"? Monitor Update success includes persistence, but I'm not sure what exactly you mean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, what I meant was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I don't think this is a unique case there - anything applies already, if anything this callsite is much less error-prone because the only thing it does is broadcasts our latest transaction. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still have a pathological case when funding_tx is disconnected and we try to force-close the channel with a holder commitment. It won't propagate or confirm. If the latest user balance is substantial, even manual intervention won't solve the issue. Ideally, as soon as we a counterparty funding transaction we should cache it. If the funding is reorg-out later, we should attach the funding_tx with our holder commitment and why not a high-feerate CPFP. This could be implemented by either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, that's definitely a lot for this PR (also because its not a regression - we have the same issue today). That said, I don't really think its worth it - in theory the funding transaction that was just un-confirmed should be in the mempool as nodes re-add disconnected transactions to their mempool. If we want to add a high-feerate CPFP on top of the commitment, that stands alone as a change to ChannelMonitor in handling the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel the same. Users playing with substantial amount should scale their funding transactions confirmations lock-down high-enough ( |
||
}, | ||
} | ||
} | ||
} | ||
TheBlueMatt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#[cfg(any(test, feature = "_test_utils"))] | ||
pub(crate) fn test_process_background_events(&self) { | ||
self.process_background_events(); | ||
} | ||
|
||
/// If a peer is disconnected we mark any channels with that peer as 'disabled'. | ||
/// After some time, if channels are still disabled we need to broadcast a ChannelUpdate | ||
/// to inform the network about the uselessness of these channels. | ||
/// | ||
/// This method handles all the details, and must be called roughly once per minute. | ||
/// | ||
/// Note that in some rare cases this may generate a `chain::Watch::update_channel` call. | ||
pub fn timer_chan_freshness_every_min(&self) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you don't update |
||
let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); | ||
self.process_background_events(); | ||
|
||
let mut channel_state_lock = self.channel_state.lock().unwrap(); | ||
let channel_state = &mut *channel_state_lock; | ||
for (_, chan) in channel_state.by_id.iter_mut() { | ||
|
@@ -1953,6 +1993,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
//identify whether we sent it or not based on the (I presume) very different runtime | ||
//between the branches here. We should make this async and move it into the forward HTLCs | ||
//timer handling. | ||
|
||
// Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called | ||
// from block_connected which may run during initialization prior to the chain_monitor | ||
// being fully configured. See the docs for `ChannelManagerReadArgs` for more. | ||
match source { | ||
HTLCSource::OutboundRoute { ref path, .. } => { | ||
log_trace!(self.logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0)); | ||
|
@@ -2418,7 +2462,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
// We do not do a force-close here as that would generate a monitor update for | ||
// a monitor that we didn't manage to store (and that we don't care about - we | ||
// don't respond with the funding_signed so the channel can never go on chain). | ||
let (_funding_txo_option, _monitor_update, failed_htlcs) = chan.force_shutdown(true); | ||
let (_monitor_update, failed_htlcs) = chan.force_shutdown(true); | ||
assert!(failed_htlcs.is_empty()); | ||
return Err(MsgHandleErrInternal::send_err_msg_no_close("ChannelMonitor storage failure".to_owned(), funding_msg.channel_id)); | ||
}, | ||
|
@@ -3100,6 +3144,29 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
self.finish_force_close_channel(failure); | ||
} | ||
} | ||
|
||
/// Handle a list of channel failures during a block_connected or block_disconnected call, | ||
/// pushing the channel monitor update (if any) to the background events queue and removing the | ||
/// Channel object. | ||
fn handle_init_event_channel_failures(&self, mut failed_channels: Vec<ShutdownResult>) { | ||
for mut failure in failed_channels.drain(..) { | ||
// Either a commitment transactions has been confirmed on-chain or | ||
// Channel::block_disconnected detected that the funding transaction has been | ||
// reorganized out of the main chain. | ||
// We cannot broadcast our latest local state via monitor update (as | ||
// Channel::force_shutdown tries to make us do) as we may still be in initialization, | ||
// so we track the update internally and handle it when the user next calls | ||
// timer_chan_freshness_every_min, guaranteeing we're running normally. | ||
if let Some((funding_txo, update)) = failure.0.take() { | ||
assert_eq!(update.updates.len(), 1); | ||
if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { | ||
assert!(should_broadcast); | ||
} else { unreachable!(); } | ||
self.pending_background_events.lock().unwrap().push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, update))); | ||
} | ||
self.finish_force_close_channel(failure); | ||
} | ||
} | ||
} | ||
|
||
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> MessageSendEventsProvider for ChannelManager<Signer, M, T, K, F, L> | ||
|
@@ -3167,6 +3234,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
{ | ||
/// Updates channel state based on transactions seen in a connected block. | ||
pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { | ||
// Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called | ||
// during initialization prior to the chain_monitor being fully configured in some cases. | ||
// See the docs for `ChannelManagerReadArgs` for more. | ||
Comment on lines
+3237
to
+3239
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, is there a way to put some asserts on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I thought about that, we would have to wrap the entire There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does seem to indicate a need to refactor the code such that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we wanted to go there, we could have deserialization build some intermediate object which you can only connect/disconnect blocks on, then you can tell it you're done and get the full There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to address now, but as part of #800 we may want to catalog all that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment describing this on 800. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hm, could you retrieve the |
||
let header_hash = header.block_hash(); | ||
log_trace!(self.logger, "Block {} at height {} connected", header_hash, height); | ||
let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); | ||
|
@@ -3218,9 +3288,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
if let Some(short_id) = channel.get_short_channel_id() { | ||
short_to_id.remove(&short_id); | ||
} | ||
// It looks like our counterparty went on-chain. We go ahead and | ||
// broadcast our latest local state as well here, just in case its | ||
// some kind of SPV attack, though we expect these to be dropped. | ||
// It looks like our counterparty went on-chain. Close the channel. | ||
failed_channels.push(channel.force_shutdown(true)); | ||
if let Ok(update) = self.get_channel_update(&channel) { | ||
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { | ||
|
@@ -3254,9 +3322,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
!htlcs.is_empty() // Only retain this entry if htlcs has at least one entry. | ||
}); | ||
} | ||
for failure in failed_channels.drain(..) { | ||
self.finish_force_close_channel(failure); | ||
} | ||
|
||
self.handle_init_event_channel_failures(failed_channels); | ||
|
||
for (source, payment_hash, reason) in timed_out_htlcs.drain(..) { | ||
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), source, &payment_hash, reason); | ||
|
@@ -3282,6 +3349,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
/// If necessary, the channel may be force-closed without letting the counterparty participate | ||
/// in the shutdown. | ||
pub fn block_disconnected(&self, header: &BlockHeader) { | ||
// Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called | ||
// during initialization prior to the chain_monitor being fully configured in some cases. | ||
// See the docs for `ChannelManagerReadArgs` for more. | ||
let _persistence_guard = PersistenceNotifierGuard::new(&self.total_consistency_lock, &self.persistence_notifier); | ||
let mut failed_channels = Vec::new(); | ||
{ | ||
|
@@ -3306,9 +3376,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana | |
} | ||
}); | ||
} | ||
for failure in failed_channels.drain(..) { | ||
self.finish_force_close_channel(failure); | ||
} | ||
self.handle_init_event_channel_failures(failed_channels); | ||
self.latest_block_height.fetch_sub(1, Ordering::AcqRel); | ||
*self.last_block_hash.try_lock().expect("block_(dis)connected must not be called in parallel") = header.block_hash(); | ||
} | ||
|
@@ -3914,6 +3982,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f | |
event.write(writer)?; | ||
} | ||
|
||
let background_events = self.pending_background_events.lock().unwrap(); | ||
(background_events.len() as u64).write(writer)?; | ||
for event in background_events.iter() { | ||
match event { | ||
BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)) => { | ||
0u8.write(writer)?; | ||
funding_txo.write(writer)?; | ||
monitor_update.write(writer)?; | ||
}, | ||
} | ||
} | ||
|
||
(self.last_node_announcement_serial.load(Ordering::Acquire) as u32).write(writer)?; | ||
|
||
Ok(()) | ||
|
@@ -3929,11 +4009,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f | |
/// ChannelManager)>::read(reader, args). | ||
/// This may result in closing some Channels if the ChannelMonitor is newer than the stored | ||
/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted. | ||
/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using | ||
/// ChannelMonitor::get_outputs_to_watch() and ChannelMonitor::get_funding_txo(). | ||
/// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same | ||
/// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and | ||
/// ChannelMonitor::get_funding_txo(). | ||
/// 4) Reconnect blocks on your ChannelMonitors. | ||
/// 5) Move the ChannelMonitors into your local chain::Watch. | ||
/// 6) Disconnect/connect blocks on the ChannelManager. | ||
/// 5) Disconnect/connect blocks on the ChannelManager. | ||
/// 6) Move the ChannelMonitors into your local chain::Watch. | ||
/// | ||
/// Note that the ordering of #4-6 is not of importance, however all three must occur before you | ||
/// call any other methods on the newly-deserialized ChannelManager. | ||
Comment on lines
-3935
to
+4020
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the note in (2) still accurate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean, do you mean the part below? If so, yes, that behavior is unchanged - we call
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that part. |
||
/// | ||
/// Note that because some channels may be closed during deserialization, it is critical that you | ||
/// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to | ||
/// you. If you deserialize an old ChannelManager (during which force-closure transactions may be | ||
/// broadcast), and then later deserialize a newer version of the same ChannelManager (which will | ||
/// not force-close the same channels but consider them live), you may end up revoking a state for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this really possible. Let's say 1st deserialized ChannelManager force-closes the channel due to some off-chain violations from our counterparty (e.g a HTLC under minimum_msat). Force-close is dutifully sent to Or do you envision a different scenario ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The force-close doesn't generate a |
||
/// which you've already broadcasted the transaction. | ||
pub struct ChannelManagerReadArgs<'a, Signer: 'a + Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> | ||
where M::Target: chain::Watch<Signer>, | ||
T::Target: BroadcasterInterface, | ||
|
@@ -4064,7 +4155,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> | |
channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() || | ||
channel.get_latest_monitor_update_id() < monitor.get_latest_update_id() { | ||
// But if the channel is behind of the monitor, close the channel: | ||
let (_, _, mut new_failed_htlcs) = channel.force_shutdown(true); | ||
let (_, mut new_failed_htlcs) = channel.force_shutdown(true); | ||
failed_htlcs.append(&mut new_failed_htlcs); | ||
monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger); | ||
} else { | ||
|
@@ -4128,6 +4219,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> | |
} | ||
} | ||
|
||
let background_event_count: u64 = Readable::read(reader)?; | ||
let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>())); | ||
for _ in 0..background_event_count { | ||
match <u8 as Readable>::read(reader)? { | ||
0 => pending_background_events_read.push(BackgroundEvent::ClosingMonitorUpdate((Readable::read(reader)?, Readable::read(reader)?))), | ||
_ => return Err(DecodeError::InvalidValue), | ||
} | ||
} | ||
valentinewallace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let last_node_announcement_serial: u32 = Readable::read(reader)?; | ||
|
||
let mut secp_ctx = Secp256k1::new(); | ||
|
@@ -4157,6 +4257,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> | |
per_peer_state: RwLock::new(per_peer_state), | ||
|
||
pending_events: Mutex::new(pending_events_read), | ||
pending_background_events: Mutex::new(pending_background_events_read), | ||
total_consistency_lock: RwLock::new(()), | ||
persistence_notifier: PersistenceNotifier::new(), | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.