diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 5b33892c..0f0244d4 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -2730,6 +2730,34 @@ impl PendingTlcs { .cloned() .collect() } + + pub fn get_oldest_failed_tlcs(&self) -> Vec { + let mut failed_tlcs = self + .tlcs + .iter() + .filter(|tlc| { + matches!(tlc.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_))) + && matches!( + tlc.status, + TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed) + | TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) + | TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck) + ) + }) + .map(|tlc| (tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX))) + .collect::>(); + + let keep_failed_num = 2; + if failed_tlcs.len() >= keep_failed_num { + failed_tlcs.sort_by(|a, b| a.1.cmp(&b.1)); + failed_tlcs[0..failed_tlcs.len() - keep_failed_num] + .iter() + .map(|(tlc_id, _)| *tlc_id) + .collect() + } else { + return Vec::new(); + } + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)] @@ -2949,7 +2977,7 @@ impl TlcState { self.need_another_commitment_signed() } - pub fn update_for_revoke_and_ack(&mut self, commitment_numbers: CommitmentNumbers) -> bool { + pub fn update_for_revoke_and_ack(&mut self, _commitment_numbers: CommitmentNumbers) -> bool { self.set_waiting_ack(false); for tlc in self.offered_tlcs.tlcs.iter_mut() { match tlc.outbound_status() { @@ -2961,7 +2989,7 @@ impl TlcState { } OutboundTlcStatus::RemoveWaitAck => { tlc.status = TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed); - tlc.removed_confirmed_at = Some(commitment_numbers.get_local()); + tlc.removed_confirmed_at = Some(now_timestamp_as_millis_u64()); } _ => {} } @@ -2977,7 +3005,7 @@ impl TlcState { } InboundTlcStatus::LocalRemoved => { tlc.status = TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed); - tlc.removed_confirmed_at = Some(commitment_numbers.get_remote()); + tlc.removed_confirmed_at = Some(now_timestamp_as_millis_u64()); } _ => {} } @@ -4697,27 +4725,22 @@ impl ChannelActorState { } pub fn clean_up_failed_tlcs(&mut self) { - let mut failed_tlcs: Vec<_> = self - .tlc_state - .received_tlcs - .tlcs + // Remove the oldest failed tlcs from the channel state turns out to be very tricky + // Because the different parties may have different views on the failed tlcs, + // so we need to be very careful here. + + // The basic idea is to remove the oldest failed tlcs that are confirmed by both parties. + // And we need to calculate the oldest failed tlcs independently from two directions, + // Because we may have tlc operations from both directions at the same time, order matters. + // see #475 for more details. + let failed_offered_tlcs = self.tlc_state.offered_tlcs.get_oldest_failed_tlcs(); + let failed_received_tlcs = self.tlc_state.received_tlcs.get_oldest_failed_tlcs(); + + for tlc_id in failed_offered_tlcs .iter() - .chain(self.tlc_state.offered_tlcs.tlcs.iter()) - .filter(|tlc| { - matches!(tlc.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_))) - && matches!( - tlc.status, - TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed) - | TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) - | TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck) - ) - }) - .map(|tlc| (tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX))) - .collect(); - - if failed_tlcs.len() >= 3 { - failed_tlcs.sort_by(|a, b| a.1.cmp(&b.1)); - self.tlc_state.apply_remove_tlc(failed_tlcs[0].0); + .chain(failed_received_tlcs.iter()) + { + self.tlc_state.apply_remove_tlc(*tlc_id); } } @@ -6879,8 +6902,8 @@ impl ChannelActorState { .pack(), ) .build(); - let to_remote_output_data = Bytes::default(); + if for_remote { ( [to_local_output, to_remote_output], diff --git a/src/fiber/tests/payment.rs b/src/fiber/tests/payment.rs index 308693d4..96e7d3ba 100644 --- a/src/fiber/tests/payment.rs +++ b/src/fiber/tests/payment.rs @@ -1958,7 +1958,7 @@ async fn test_send_payment_middle_hop_update_fee_should_recovery() { let [node_0, node_1, mut node_2, node_3] = nodes.try_into().expect("4 nodes"); let mut all_sent = HashSet::new(); - for _i in 0..5 { + for _i in 0..6 { let res = node_0 .send_payment_keysend(&node_3, 1000, false) .await @@ -2001,11 +2001,12 @@ async fn test_send_payment_middle_hop_update_fee_should_recovery() { } } -#[tokio::test] -async fn test_send_payment_complex_network_payself() { +async fn run_complex_network_with_params( + funding_amount: u128, + payment_amount_gen: impl Fn() -> u128, +) -> Vec<(Hash256, PaymentSessionStatus)> { init_tracing(); let _span = tracing::info_span!("node", node = "test").entered(); - let funding_amount = MIN_RESERVED_CKB + 100000000; let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( &[ ((0, 1), (funding_amount, funding_amount)), @@ -2024,74 +2025,22 @@ async fn test_send_payment_complex_network_payself() { let mut all_sent = HashSet::new(); for _k in 0..3 { for i in 0..6 { + let payment_amount = payment_amount_gen(); let res = nodes[i] - .send_payment_keysend_to_self(1000, false) - .await - .unwrap(); - eprintln!("res: {:?}", res); - let payment_hash = res.payment_hash; - all_sent.insert((i, payment_hash)); - } - } - - loop { - for i in 0..6 { - assert!(nodes[i].get_triggered_unexpected_events().await.is_empty()); - } - - for (i, payment_hash) in all_sent.clone().into_iter() { - let status = nodes[i].get_payment_status(payment_hash).await; - if status == PaymentSessionStatus::Success { - eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status); - all_sent.remove(&(i, payment_hash)); + .send_payment_keysend_to_self(payment_amount, false) + .await; + if let Ok(res) = res { + eprintln!("res: {:?}", res); + let payment_hash = res.payment_hash; + all_sent.insert((i, payment_hash)); } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - if all_sent.is_empty() { - break; - } - } -} - -#[tokio::test] -async fn test_send_payment_complex_network_payself_amount_exceeded() { - // variant from issue 475 - // the channel amount is not enough, so payments will be failed - init_tracing(); - let _span = tracing::info_span!("node", node = "test").entered(); - let ckb_unit = 100_000_000; - let funding_amount = MIN_RESERVED_CKB + 1000 * ckb_unit; - let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( - &[ - ((0, 1), (funding_amount, funding_amount)), - ((1, 2), (funding_amount, funding_amount)), - ((3, 4), (funding_amount, funding_amount)), - ((4, 5), (funding_amount, funding_amount)), - ((0, 3), (funding_amount, funding_amount)), - ((1, 4), (funding_amount, funding_amount)), - ((2, 5), (funding_amount, funding_amount)), - ], - 6, - true, - ) - .await; - - let mut all_sent = HashSet::new(); - for _k in 0..2 { - for i in 0..6 { - let res = nodes[i] - .send_payment_keysend_to_self(500 * ckb_unit, false) - .await - .unwrap(); - eprintln!("res: {:?}", res); - let payment_hash = res.payment_hash; - all_sent.insert((i, payment_hash)); } } let mut result = vec![]; loop { for i in 0..6 { + eprintln!("assert node: {:?}", i); assert!(nodes[i].get_triggered_unexpected_events().await.is_empty()); } @@ -2102,7 +2051,7 @@ async fn test_send_payment_complex_network_payself_amount_exceeded() { PaymentSessionStatus::Success | PaymentSessionStatus::Failed ) { eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status); - result.push((i, payment_hash, status)); + result.push((payment_hash, status)); all_sent.remove(&(i, payment_hash)); } tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -2112,4 +2061,33 @@ async fn test_send_payment_complex_network_payself_amount_exceeded() { } } eprintln!("result:\n {:?}", result); + result +} + +#[tokio::test] +async fn test_send_payment_complex_network_payself() { + // from issue 475 + // channel amount is enough, so all payments should success + let res = run_complex_network_with_params(MIN_RESERVED_CKB + 100000000, || 1000).await; + let failed_count = res + .iter() + .filter(|(_, status)| *status == PaymentSessionStatus::Failed) + .count(); + assert_eq!(failed_count, 0); +} + +#[tokio::test] +async fn test_send_payment_complex_network_payself_amount_exceeded() { + // variant from issue 475 + // the channel amount is not enough, so payments maybe be failed + let ckb_unit = 100_000_000; + let res = run_complex_network_with_params(MIN_RESERVED_CKB + 1000 * ckb_unit, || { + (450 as u128 + (rand::random::() % 100) as u128) * ckb_unit + }) + .await; + let failed_count = res + .iter() + .filter(|(_, status)| *status == PaymentSessionStatus::Failed) + .count(); + assert!(failed_count > 0); } diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 753d5115..48f058ce 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1568,8 +1568,8 @@ impl Debug for RemoveTlcReason { RemoveTlcReason::RemoveTlcFulfill(_fulfill) => { write!(f, "RemoveTlcFulfill") } - RemoveTlcReason::RemoveTlcFail(fail) => { - write!(f, "RemoveTlcFail with packet ({:?})", fail) + RemoveTlcReason::RemoveTlcFail(_fail) => { + write!(f, "RemoveTlcFail") } } }