Skip to content

Start tracking ChannelMonitors by channel ID in ChainMonitor and ChannelManager #3554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 48 additions & 44 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ struct TestChainMonitor {
Arc<TestPersister>,
>,
>,
pub latest_monitors: Mutex<HashMap<OutPoint, LatestMonitorState>>,
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
}
impl TestChainMonitor {
pub fn new(
Expand All @@ -213,12 +213,12 @@ impl TestChainMonitor {
}
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
&self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(funding_txo, monitor);
let res = self.chain_monitor.watch_channel(channel_id, monitor);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
Expand All @@ -231,17 +231,17 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
};
if self.latest_monitors.lock().unwrap().insert(funding_txo, state).is_some() {
if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() {
panic!("Already had monitor pre-watch_channel");
}
res
}

fn update_channel(
&self, funding_txo: OutPoint, update: &channelmonitor::ChannelMonitorUpdate,
&self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate,
) -> chain::ChannelMonitorUpdateStatus {
let mut map_lock = self.latest_monitors.lock().unwrap();
let map_entry = map_lock.get_mut(&funding_txo).expect("Didn't have monitor on update call");
let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call");
let latest_monitor_data = map_entry
.pending_monitors
.last()
Expand All @@ -265,7 +265,7 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
.unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
let res = self.chain_monitor.update_channel(funding_txo, update);
let res = self.chain_monitor.update_channel(channel_id, update);
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
Expand Down Expand Up @@ -711,9 +711,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {

let mut monitors = new_hash_map();
let mut old_monitors = $old_monitors.latest_monitors.lock().unwrap();
for (outpoint, mut prev_state) in old_monitors.drain() {
for (channel_id, mut prev_state) in old_monitors.drain() {
monitors.insert(
outpoint,
channel_id,
<(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
&mut Cursor::new(&prev_state.persisted_monitor),
(&*$keys_manager, &*$keys_manager),
Expand All @@ -725,11 +725,11 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
// considering them discarded. LDK should replay these for us as they're stored in
// the `ChannelManager`.
prev_state.pending_monitors.clear();
chain_monitor.latest_monitors.lock().unwrap().insert(outpoint, prev_state);
chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state);
}
let mut monitor_refs = new_hash_map();
for (outpoint, monitor) in monitors.iter() {
monitor_refs.insert(*outpoint, monitor);
for (channel_id, monitor) in monitors.iter() {
monitor_refs.insert(*channel_id, monitor);
}

let read_args = ChannelManagerReadArgs {
Expand All @@ -752,9 +752,9 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
.1,
chain_monitor.clone(),
);
for (funding_txo, mon) in monitors.drain() {
for (channel_id, mon) in monitors.drain() {
assert_eq!(
chain_monitor.chain_monitor.watch_channel(funding_txo, mon),
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
Ok(ChannelMonitorUpdateStatus::Completed)
);
}
Expand Down Expand Up @@ -825,7 +825,6 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
};

$source.handle_accept_channel($dest.get_our_node_id(), &accept_channel);
let funding_output;
{
let mut events = $source.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
Expand All @@ -845,7 +844,6 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
script_pubkey: output_script,
}],
};
funding_output = OutPoint { txid: tx.compute_txid(), index: 0 };
$source
.funding_transaction_generated(
temporary_channel_id,
Expand Down Expand Up @@ -890,13 +888,19 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
$source.handle_funding_signed($dest.get_our_node_id(), &funding_signed);
let events = $source.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
if let events::Event::ChannelPending { ref counterparty_node_id, .. } = events[0] {
let channel_id = if let events::Event::ChannelPending {
ref counterparty_node_id,
ref channel_id,
..
} = events[0]
{
assert_eq!(counterparty_node_id, &$dest.get_our_node_id());
channel_id.clone()
} else {
panic!("Wrong event type");
}
};

funding_output
channel_id
}};
}

Expand Down Expand Up @@ -963,8 +967,8 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {

let mut nodes = [node_a, node_b, node_c];

let chan_1_funding = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
let chan_2_funding = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);
let chan_1_id = make_channel!(nodes[0], nodes[1], keys_manager_b, 0);
let chan_2_id = make_channel!(nodes[1], nodes[2], keys_manager_c, 1);

for node in nodes.iter() {
confirm_txn!(node);
Expand Down Expand Up @@ -1363,14 +1367,14 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
}
};

let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_funding| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
"updates should be sorted by id"
);
for (id, data) in state.pending_monitors.drain(..) {
monitor.chain_monitor.channel_monitor_updated(*chan_funding, id).unwrap();
monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap();
if id > state.persisted_monitor_id {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
Expand Down Expand Up @@ -1410,10 +1414,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
ChannelMonitorUpdateStatus::Completed
},

0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_funding),
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_funding),
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_funding),
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_funding),
0x08 => complete_all_monitor_updates(&monitor_a, &chan_1_id),
0x09 => complete_all_monitor_updates(&monitor_b, &chan_1_id),
0x0a => complete_all_monitor_updates(&monitor_b, &chan_2_id),
0x0b => complete_all_monitor_updates(&monitor_c, &chan_2_id),

0x0c => {
if !chan_a_disconnected {
Expand Down Expand Up @@ -1683,21 +1687,21 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
nodes[2].maybe_update_chan_fees();
},

0xf0 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_funding, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_funding, &Vec::pop),
0xf0 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_first),
0xf1 => complete_monitor_update(&monitor_a, &chan_1_id, &complete_second),
0xf2 => complete_monitor_update(&monitor_a, &chan_1_id, &Vec::pop),

0xf4 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_first),
0xf5 => complete_monitor_update(&monitor_b, &chan_1_funding, &complete_second),
0xf6 => complete_monitor_update(&monitor_b, &chan_1_funding, &Vec::pop),
0xf4 => complete_monitor_update(&monitor_b, &chan_1_id, &complete_first),
0xf5 => complete_monitor_update(&monitor_b, &chan_1_id, &complete_second),
0xf6 => complete_monitor_update(&monitor_b, &chan_1_id, &Vec::pop),

0xf8 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_first),
0xf9 => complete_monitor_update(&monitor_b, &chan_2_funding, &complete_second),
0xfa => complete_monitor_update(&monitor_b, &chan_2_funding, &Vec::pop),
0xf8 => complete_monitor_update(&monitor_b, &chan_2_id, &complete_first),
0xf9 => complete_monitor_update(&monitor_b, &chan_2_id, &complete_second),
0xfa => complete_monitor_update(&monitor_b, &chan_2_id, &Vec::pop),

0xfc => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_first),
0xfd => complete_monitor_update(&monitor_c, &chan_2_funding, &complete_second),
0xfe => complete_monitor_update(&monitor_c, &chan_2_funding, &Vec::pop),
0xfc => complete_monitor_update(&monitor_c, &chan_2_id, &complete_first),
0xfd => complete_monitor_update(&monitor_c, &chan_2_id, &complete_second),
0xfe => complete_monitor_update(&monitor_c, &chan_2_id, &Vec::pop),

0xff => {
// Test that no channel is in a stuck state where neither party can send funds even
Expand All @@ -1711,10 +1715,10 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out, anchors: bool) {
*monitor_c.persister.update_ret.lock().unwrap() =
ChannelMonitorUpdateStatus::Completed;

complete_all_monitor_updates(&monitor_a, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_1_funding);
complete_all_monitor_updates(&monitor_b, &chan_2_funding);
complete_all_monitor_updates(&monitor_c, &chan_2_funding);
complete_all_monitor_updates(&monitor_a, &chan_1_id);
complete_all_monitor_updates(&monitor_b, &chan_1_id);
complete_all_monitor_updates(&monitor_b, &chan_2_id);
complete_all_monitor_updates(&monitor_c, &chan_2_id);

// Next, make sure peers are all connected to each other
if chan_a_disconnected {
Expand Down
2 changes: 1 addition & 1 deletion lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0;
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
/// chain_monitor.watch_channel(monitor.channel_id(), monitor);
///
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
Expand Down
Loading
Loading