Skip to content

Commit

Permalink
fix #475, musig error caused by failed tlcs
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 22, 2025
1 parent 682eb38 commit 6f436fb
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 91 deletions.
71 changes: 47 additions & 24 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,34 @@ impl PendingTlcs {
.cloned()
.collect()
}

pub fn get_oldest_failed_tlcs(&self) -> Vec<TLCId> {
let mut failed_tlcs = self
.tlcs
.iter()
.filter(|tlc| {
matches!(tlc.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_)))
&& matches!(
tlc.status,
TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck)
)
})
.map(|tlc| (tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX)))
.collect::<Vec<_>>();

let keep_failed_num = 2;
if failed_tlcs.len() >= keep_failed_num {
failed_tlcs.sort_by(|a, b| a.1.cmp(&b.1));
failed_tlcs[0..failed_tlcs.len() - keep_failed_num]
.iter()
.map(|(tlc_id, _)| *tlc_id)
.collect()
} else {
return Vec::new();
}
}
}

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -2949,7 +2977,7 @@ impl TlcState {
self.need_another_commitment_signed()
}

pub fn update_for_revoke_and_ack(&mut self, commitment_numbers: CommitmentNumbers) -> bool {
pub fn update_for_revoke_and_ack(&mut self, _commitment_numbers: CommitmentNumbers) -> bool {
self.set_waiting_ack(false);
for tlc in self.offered_tlcs.tlcs.iter_mut() {
match tlc.outbound_status() {
Expand All @@ -2961,7 +2989,7 @@ impl TlcState {
}
OutboundTlcStatus::RemoveWaitAck => {
tlc.status = TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed);
tlc.removed_confirmed_at = Some(commitment_numbers.get_local());
tlc.removed_confirmed_at = Some(now_timestamp_as_millis_u64());
}
_ => {}
}
Expand All @@ -2977,7 +3005,7 @@ impl TlcState {
}
InboundTlcStatus::LocalRemoved => {
tlc.status = TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed);
tlc.removed_confirmed_at = Some(commitment_numbers.get_remote());
tlc.removed_confirmed_at = Some(now_timestamp_as_millis_u64());
}
_ => {}
}
Expand Down Expand Up @@ -4697,27 +4725,22 @@ impl ChannelActorState {
}

pub fn clean_up_failed_tlcs(&mut self) {
let mut failed_tlcs: Vec<_> = self
.tlc_state
.received_tlcs
.tlcs
// Remove the oldest failed tlcs from the channel state turns out to be very tricky
// Because the different parties may have different views on the failed tlcs,
// so we need to be very careful here.

// The basic idea is to remove the oldest failed tlcs that are confirmed by both parties.
// And we need to calculate the oldest failed tlcs independently from two directions,
// Because we may have tlc operations from both directions at the same time, order matters.
// see #475 for more details.
let failed_offered_tlcs = self.tlc_state.offered_tlcs.get_oldest_failed_tlcs();
let failed_received_tlcs = self.tlc_state.received_tlcs.get_oldest_failed_tlcs();

for tlc_id in failed_offered_tlcs
.iter()
.chain(self.tlc_state.offered_tlcs.tlcs.iter())
.filter(|tlc| {
matches!(tlc.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_)))
&& matches!(
tlc.status,
TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck)
)
})
.map(|tlc| (tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX)))
.collect();

if failed_tlcs.len() >= 3 {
failed_tlcs.sort_by(|a, b| a.1.cmp(&b.1));
self.tlc_state.apply_remove_tlc(failed_tlcs[0].0);
.chain(failed_received_tlcs.iter())
{
self.tlc_state.apply_remove_tlc(*tlc_id);
}
}

Expand Down Expand Up @@ -6879,8 +6902,8 @@ impl ChannelActorState {
.pack(),
)
.build();

let to_remote_output_data = Bytes::default();

if for_remote {
(
[to_local_output, to_remote_output],
Expand Down
108 changes: 43 additions & 65 deletions src/fiber/tests/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ async fn test_send_payment_middle_hop_update_fee_should_recovery() {
let [node_0, node_1, mut node_2, node_3] = nodes.try_into().expect("4 nodes");
let mut all_sent = HashSet::new();

for _i in 0..5 {
for _i in 0..6 {
let res = node_0
.send_payment_keysend(&node_3, 1000, false)
.await
Expand Down Expand Up @@ -2001,11 +2001,12 @@ async fn test_send_payment_middle_hop_update_fee_should_recovery() {
}
}

#[tokio::test]
async fn test_send_payment_complex_network_payself() {
async fn run_complex_network_with_params(
funding_amount: u128,
payment_amount_gen: impl Fn() -> u128,
) -> Vec<(Hash256, PaymentSessionStatus)> {
init_tracing();
let _span = tracing::info_span!("node", node = "test").entered();
let funding_amount = MIN_RESERVED_CKB + 100000000;
let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel(
&[
((0, 1), (funding_amount, funding_amount)),
Expand All @@ -2024,74 +2025,22 @@ async fn test_send_payment_complex_network_payself() {
let mut all_sent = HashSet::new();
for _k in 0..3 {
for i in 0..6 {
let payment_amount = payment_amount_gen();
let res = nodes[i]
.send_payment_keysend_to_self(1000, false)
.await
.unwrap();
eprintln!("res: {:?}", res);
let payment_hash = res.payment_hash;
all_sent.insert((i, payment_hash));
}
}

loop {
for i in 0..6 {
assert!(nodes[i].get_triggered_unexpected_events().await.is_empty());
}

for (i, payment_hash) in all_sent.clone().into_iter() {
let status = nodes[i].get_payment_status(payment_hash).await;
if status == PaymentSessionStatus::Success {
eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status);
all_sent.remove(&(i, payment_hash));
.send_payment_keysend_to_self(payment_amount, false)
.await;
if let Ok(res) = res {
eprintln!("res: {:?}", res);
let payment_hash = res.payment_hash;
all_sent.insert((i, payment_hash));
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
if all_sent.is_empty() {
break;
}
}
}

#[tokio::test]
async fn test_send_payment_complex_network_payself_amount_exceeded() {
// variant from issue 475
// the channel amount is not enough, so payments will be failed
init_tracing();
let _span = tracing::info_span!("node", node = "test").entered();
let ckb_unit = 100_000_000;
let funding_amount = MIN_RESERVED_CKB + 1000 * ckb_unit;
let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel(
&[
((0, 1), (funding_amount, funding_amount)),
((1, 2), (funding_amount, funding_amount)),
((3, 4), (funding_amount, funding_amount)),
((4, 5), (funding_amount, funding_amount)),
((0, 3), (funding_amount, funding_amount)),
((1, 4), (funding_amount, funding_amount)),
((2, 5), (funding_amount, funding_amount)),
],
6,
true,
)
.await;

let mut all_sent = HashSet::new();
for _k in 0..2 {
for i in 0..6 {
let res = nodes[i]
.send_payment_keysend_to_self(500 * ckb_unit, false)
.await
.unwrap();
eprintln!("res: {:?}", res);
let payment_hash = res.payment_hash;
all_sent.insert((i, payment_hash));
}
}

let mut result = vec![];
loop {
for i in 0..6 {
eprintln!("assert node: {:?}", i);
assert!(nodes[i].get_triggered_unexpected_events().await.is_empty());
}

Expand All @@ -2102,7 +2051,7 @@ async fn test_send_payment_complex_network_payself_amount_exceeded() {
PaymentSessionStatus::Success | PaymentSessionStatus::Failed
) {
eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status);
result.push((i, payment_hash, status));
result.push((payment_hash, status));
all_sent.remove(&(i, payment_hash));
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand All @@ -2112,4 +2061,33 @@ async fn test_send_payment_complex_network_payself_amount_exceeded() {
}
}
eprintln!("result:\n {:?}", result);
result
}

#[tokio::test]
async fn test_send_payment_complex_network_payself() {
// from issue 475
// channel amount is enough, so all payments should success
let res = run_complex_network_with_params(MIN_RESERVED_CKB + 100000000, || 1000).await;
let failed_count = res
.iter()
.filter(|(_, status)| *status == PaymentSessionStatus::Failed)
.count();
assert_eq!(failed_count, 0);
}

#[tokio::test]
async fn test_send_payment_complex_network_payself_amount_exceeded() {
// variant from issue 475
// the channel amount is not enough, so payments maybe be failed
let ckb_unit = 100_000_000;
let res = run_complex_network_with_params(MIN_RESERVED_CKB + 1000 * ckb_unit, || {
(450 as u128 + (rand::random::<u64>() % 100) as u128) * ckb_unit
})
.await;
let failed_count = res
.iter()
.filter(|(_, status)| *status == PaymentSessionStatus::Failed)
.count();
assert!(failed_count > 0);
}
4 changes: 2 additions & 2 deletions src/fiber/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1568,8 +1568,8 @@ impl Debug for RemoveTlcReason {
RemoveTlcReason::RemoveTlcFulfill(_fulfill) => {
write!(f, "RemoveTlcFulfill")
}
RemoveTlcReason::RemoveTlcFail(fail) => {
write!(f, "RemoveTlcFail with packet ({:?})", fail)
RemoveTlcReason::RemoveTlcFail(_fail) => {
write!(f, "RemoveTlcFail")
}
}
}
Expand Down

0 comments on commit 6f436fb

Please sign in to comment.