Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release monitor write lock in between update iterations #2528

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 60 additions & 44 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;

use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::iter::FromIterator;
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
Expand Down Expand Up @@ -285,7 +286,22 @@ where C::Target: chain::Filter,
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename monitor_lock -> monitors

if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename monitor_state -> monitor

self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
}
}

// do some followup cleanup if any funding outpoints were added in between iterations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"cleanup" doesn't sound right in this case.
Re-iterate if any funding outpoints were added in between write locked iterations above.

let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
}
}

if let Some(height) = best_height {
// If the best block height is being updated, update highest_chain_height under the
// monitors write lock.
Expand All @@ -295,55 +311,55 @@ where C::Target: chain::Filter,
self.highest_chain_height.store(new_height, Ordering::Release);
}
}
}

for (funding_outpoint, monitor_state) in monitor_states.iter() {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
// InProgress to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
fn update_monitor_with_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>) where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
let update_id = MonitorUpdateId {
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
if let Some(height) = best_height {
if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
// If there are not ChainSync persists awaiting completion, go ahead and
// set last_chain_persist_height here - we wouldn't want the first
// InProgress to always immediately be considered "overly delayed".
monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
}
}

log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
self.event_notifier.notify();
},
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
self.event_notifier.notify();
}
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
}
}
}

// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
chain_source.register_output(output)
}
// Register any new outputs with the chain source for filtering, storing any dependent
// transactions from within the block that previously had not been included in txdata.
if let Some(ref chain_source) = self.chain_source {
let block_hash = header.block_hash();
for (txid, mut outputs) in txn_outputs.drain(..) {
for (idx, output) in outputs.drain(..) {
// Register any new outputs with the chain source for filtering
let output = WatchedOutput {
block_hash: Some(block_hash),
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
chain_source.register_output(output)
}
}
}
Expand Down Expand Up @@ -976,7 +992,7 @@ mod tests {
assert!(err.contains("ChannelMonitor storage failure")));
check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
check_closed_broadcast!(nodes[0], true);
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
[nodes[1].node.get_our_node_id()], 100000);

// However, as the ChainMonitor is still waiting for the original persistence to complete,
Expand Down
13 changes: 9 additions & 4 deletions lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ fn test_anchors_aggregated_revoked_htlc_tx() {

// Alice should see that Bob is trying to claim to HTLCs, so she should now try to claim them at
// the second level instead.
let revoked_claims = {
let revoked_claim_transactions = {
let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(txn.len(), 2);

Expand All @@ -2205,10 +2205,14 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
check_spends!(revoked_htlc_claim, htlc_tx);
}

txn
let mut revoked_claim_transaction_map = HashMap::new();
for current_tx in txn.into_iter() {
revoked_claim_transaction_map.insert(current_tx.txid(), current_tx);
}
revoked_claim_transaction_map
};
for node in &nodes {
mine_transactions(node, &revoked_claims.iter().collect::<Vec<_>>());
mine_transactions(node, &revoked_claim_transactions.values().collect::<Vec<_>>());
}


Expand All @@ -2234,7 +2238,8 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs(
&[&outputs[0]], Vec::new(), Script::new_op_return(&[]), 253, None, &Secp256k1::new(),
).unwrap();
check_spends!(spend_tx, revoked_claims[idx]);

check_spends!(spend_tx, revoked_claim_transactions.get(&spend_tx.input[0].previous_output.txid).unwrap());
} else {
panic!("unexpected event");
}
Expand Down