diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f82c4ec..8d4bd156 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: with: tool: nextest - run: | - RUST_BACKTRACE=full RUST_LOG=trace cargo nextest run --no-fail-fast + TEST_TEMP_RETAIN=1 RUST_BACKTRACE=full RUST_LOG=trace cargo nextest run --no-fail-fast fmt: name: Rustfmt diff --git a/Cargo.toml b/Cargo.toml index e6f6ad6d..1f4e184c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,11 @@ overflow-checks = true [profile.dev] panic = "abort" +[profile.quick_test] +inherits = "test" +opt-level = 3 +debug = false + [dev-dependencies] tempfile = "3.10.1" ckb-testtool = "0.13.2" diff --git a/Makefile b/Makefile index 2613c462..41247bc8 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,7 @@ coverage-run-unittests: RUSTFLAGS="${RUSTFLAGS} -Cinstrument-coverage" \ RUST_LOG=off \ LLVM_PROFILE_FILE="${COVERAGE_PROFRAW_DIR}/unittests-%p-%m.profraw" \ + TEST_TEMP_RETAIN=1 \ cargo test --all coverage-collect-data: diff --git a/migrate/Cargo.lock b/migrate/Cargo.lock index 94770780..da391216 100644 --- a/migrate/Cargo.lock +++ b/migrate/Cargo.lock @@ -1710,7 +1710,115 @@ dependencies = [ [[package]] name = "fnn" -version = "0.3.0" +version = "0.3.1" +dependencies = [ + "anyhow", + "arcode", + "bech32 0.8.1", + "bincode", + "bitcoin", + "bitflags 2.7.0", + "ckb-chain-spec", + "ckb-gen-types", + "ckb-hash 0.115.0", + "ckb-jsonrpc-types", + "ckb-resource", + "ckb-rocksdb", + "ckb-sdk", + "ckb-types", + "clap", + "clap-serde-derive", + "console", + "fiber-sphinx", + "futures", + "git-version", + "hex", + "home", + "indicatif", + "jsonrpsee", + "lightning-invoice", + "lnd-grpc-tonic-client", + "molecule", + "musig2", + "nom", + "num_enum", + "once_cell", + "ractor 0.14.2", + "rand 0.8.5", + "regex", + "secp256k1 0.28.2", + "serde", + "serde_json", + "serde_with", + "serde_yaml", + "socket2", + "strum", + "tentacle", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "fnn" +version = "0.3.1" +source = "git+https://github.com/nervosnetwork/fiber.git?tag=v0.3.1#d0f0ebd74b5fda3c329b3b89530dcc368cdaf10a" +dependencies = [ + "anyhow", + "arcode", + "bech32 0.8.1", + "bincode", + "bitcoin", + "bitflags 2.7.0", + "ckb-chain-spec", + "ckb-gen-types", + "ckb-hash 0.115.0", + "ckb-jsonrpc-types", + "ckb-resource", + "ckb-rocksdb", + "ckb-sdk", + "ckb-types", + "clap", + "clap-serde-derive", + "console", + "fiber-sphinx", + "futures", + "git-version", + "hex", + "home", + "indicatif", + "jsonrpsee", + "lightning-invoice", + "lnd-grpc-tonic-client", + "molecule", + "musig2", + "nom", + "num_enum", + "once_cell", + "ractor 0.14.2", + "rand 0.8.5", + "regex", + "secp256k1 0.28.2", + "serde", + "serde_json", + "serde_with", + "serde_yaml", + "socket2", + "strum", + "tentacle", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "fnn" +version = "0.3.1" +source = "git+https://github.com/chenyukang/fiber.git?branch=yukang-fix-480-keep-fail-tlc#07d054da4a2e9769eb12da35558183dcfb48bb8c" dependencies = [ "anyhow", "arcode", @@ -1772,7 +1880,9 @@ dependencies = [ "fnn 0.2.0", "fnn 0.2.1 (git+https://github.com/nervosnetwork/fiber.git?tag=v0.2.1)", "fnn 0.2.1 (git+https://github.com/nervosnetwork/fiber.git?tag=v0.3.0-rc1)", - "fnn 0.3.0", + "fnn 0.3.1", + "fnn 0.3.1 (git+https://github.com/nervosnetwork/fiber.git?tag=v0.3.1)", + "fnn 0.3.1 (git+https://github.com/chenyukang/fiber.git?branch=yukang-fix-480-keep-fail-tlc)", "hex", "indicatif", "serde", diff --git a/migrate/Cargo.toml b/migrate/Cargo.toml index badfb352..0fb00c4a 100644 --- a/migrate/Cargo.toml +++ b/migrate/Cargo.toml @@ -24,6 +24,8 @@ serde_json = "1.0.135" fiber_v020 = { package = "fnn", git = "https://github.com/nervosnetwork/fiber.git", tag = "v0.2.0" } fiber_v021 = { package = "fnn", git = "https://github.com/nervosnetwork/fiber.git", tag = "v0.2.1" } fiber_v030 = { package = "fnn", git = "https://github.com/nervosnetwork/fiber.git", tag = "v0.3.0-rc1" } +fiber_v031 = { package = "fnn", git = "https://github.com/nervosnetwork/fiber.git", tag = "v0.3.1" } +fiber_v032 = { package = "fnn", git = "https://github.com/chenyukang/fiber.git", branch = "yukang-fix-480-keep-fail-tlc"} [features] default = [] diff --git a/migrate/src/migrations/mig_20250123.rs b/migrate/src/migrations/mig_20250123.rs new file mode 100644 index 00000000..48d49180 --- /dev/null +++ b/migrate/src/migrations/mig_20250123.rs @@ -0,0 +1,151 @@ +use fiber::{store::migration::Migration, Error}; +use indicatif::ProgressBar; +use rocksdb::ops::Iterate; +use rocksdb::ops::Put; +use rocksdb::DB; +use std::{collections::HashSet, sync::Arc}; +use tracing::info; + +const MIGRATION_DB_VERSION: &str = "20250123051223"; + +pub use fiber_v031::fiber::channel::ChannelActorState as ChannelActorStateV031; +pub use fiber_v031::fiber::channel::{ + ChannelTlcInfo as ChannelTlcInfoV031, PendingTlcs as PendingTlcsV031, TlcInfo as TlcInfoV031, + TlcState as TlcStateV031, +}; + +use crate::util::convert; +pub use fiber_v032::fiber::channel::ChannelActorState as ChannelActorStateV032; +pub use fiber_v032::fiber::channel::{ + ChannelTlcInfo as ChannelTlcInfoV032, PendingTlcs as PendingTlcsV032, + PublicChannelInfo as PublicChannelInfoV032, TlcInfo as TlcInfoV032, TlcState as TlcStateV032, +}; + +pub struct MigrationObj { + version: String, +} + +impl MigrationObj { + pub fn new() -> Self { + Self { + version: MIGRATION_DB_VERSION.to_string(), + } + } +} + +fn convert_tlc_info(old: TlcInfoV031) -> TlcInfoV032 { + TlcInfoV032 { + channel_id: convert(old.channel_id), + status: convert(old.status), + tlc_id: convert(old.tlc_id), + amount: convert(old.amount), + payment_hash: convert(old.payment_hash), + expiry: convert(old.expiry), + hash_algorithm: convert(old.hash_algorithm), + onion_packet: convert(old.onion_packet), + shared_secret: convert(old.shared_secret), + created_at: convert(old.created_at), + removed_reason: convert(old.removed_reason), + previous_tlc: convert(old.previous_tlc), + // new field in v032 + removed_confirmed_at: None, + } +} + +fn convert_pending_tlcs(old: PendingTlcsV031) -> PendingTlcsV032 { + PendingTlcsV032 { + tlcs: old + .tlcs + .into_iter() + .map(|tlc| (convert_tlc_info(tlc))) + .collect(), + next_tlc_id: convert(old.next_tlc_id), + } +} + +impl Migration for MigrationObj { + fn migrate( + &self, + db: Arc, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result, Error> { + info!( + "MigrationObj::migrate to {} ...........", + MIGRATION_DB_VERSION + ); + + const CHANNEL_ACTOR_STATE_PREFIX: u8 = 0; + let prefix = vec![CHANNEL_ACTOR_STATE_PREFIX]; + + for (k, v) in db + .prefix_iterator(prefix.as_slice()) + .take_while(move |(col_key, _)| col_key.starts_with(prefix.as_slice())) + { + let old_channel_state: ChannelActorStateV031 = + bincode::deserialize(&v).expect("deserialize to old channel state"); + + let old_tlc_state = old_channel_state.tlc_state.clone(); + let new_tlc_state = TlcStateV032 { + offered_tlcs: convert_pending_tlcs(old_tlc_state.offered_tlcs), + received_tlcs: convert_pending_tlcs(old_tlc_state.received_tlcs), + retryable_tlc_operations: convert(old_tlc_state.retryable_tlc_operations), + applied_add_tlcs: convert(old_tlc_state.applied_add_tlcs), + // new field in v032 + applied_remove_tlcs: HashSet::new(), + waiting_ack: old_tlc_state.waiting_ack, + }; + + let new_channel_state = ChannelActorStateV032 { + state: convert(old_channel_state.state), + local_pubkey: convert(old_channel_state.local_pubkey), + remote_pubkey: convert(old_channel_state.remote_pubkey), + id: convert(old_channel_state.id), + funding_tx: old_channel_state.funding_tx, + funding_tx_confirmed_at: old_channel_state.funding_tx_confirmed_at, + funding_udt_type_script: old_channel_state.funding_udt_type_script, + is_acceptor: old_channel_state.is_acceptor, + to_local_amount: old_channel_state.to_local_amount, + to_remote_amount: old_channel_state.to_remote_amount, + local_reserved_ckb_amount: old_channel_state.local_reserved_ckb_amount, + remote_reserved_ckb_amount: old_channel_state.remote_reserved_ckb_amount, + commitment_fee_rate: old_channel_state.commitment_fee_rate, + commitment_delay_epoch: old_channel_state.commitment_delay_epoch, + funding_fee_rate: old_channel_state.funding_fee_rate, + signer: convert(old_channel_state.signer), + local_channel_public_keys: convert(old_channel_state.local_channel_public_keys), + commitment_numbers: convert(old_channel_state.commitment_numbers), + local_constraints: convert(old_channel_state.local_constraints), + remote_constraints: convert(old_channel_state.remote_constraints), + tlc_state: new_tlc_state, + remote_shutdown_script: old_channel_state.remote_shutdown_script, + local_shutdown_script: old_channel_state.local_shutdown_script, + last_commitment_signed_remote_nonce: old_channel_state + .last_commitment_signed_remote_nonce, + last_revoke_and_ack_remote_nonce: old_channel_state + .last_revoke_and_ack_remote_nonce, + last_committed_remote_nonce: old_channel_state.last_committed_remote_nonce, + latest_commitment_transaction: old_channel_state.latest_commitment_transaction, + remote_commitment_points: convert(old_channel_state.remote_commitment_points), + remote_channel_public_keys: convert(old_channel_state.remote_channel_public_keys), + local_shutdown_info: convert(old_channel_state.local_shutdown_info), + remote_shutdown_info: convert(old_channel_state.remote_shutdown_info), + reestablishing: old_channel_state.reestablishing, + created_at: old_channel_state.created_at, + public_channel_info: convert(old_channel_state.public_channel_info), + local_tlc_info: convert(old_channel_state.local_tlc_info), + remote_tlc_info: convert(old_channel_state.remote_tlc_info), + }; + + let new_channel_state_bytes = + bincode::serialize(&new_channel_state).expect("serialize to new channel state"); + + db.put(k, new_channel_state_bytes) + .expect("save new channel state"); + } + Ok(db) + } + + fn version(&self) -> &str { + &self.version + } +} diff --git a/migrate/src/migrations/mod.rs b/migrate/src/migrations/mod.rs index f6b0f1d5..8e4c259d 100644 --- a/migrate/src/migrations/mod.rs +++ b/migrate/src/migrations/mod.rs @@ -3,3 +3,4 @@ pub mod mig_20250114; pub mod mig_20250115; +pub mod mig_20250123; diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 641f2cda..6d11d233 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -70,10 +70,10 @@ use tokio::sync::oneshot; use super::{graph::ChannelUpdateInfo, types::ForwardTlcResult}; use std::{ collections::HashSet, - fmt::{self, Debug}, + fmt::{self, Debug, Display}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, - u128, + u128, u64, }; use super::types::{ChannelUpdateChannelFlags, ChannelUpdateMessageFlags, UpdateTlcInfo}; @@ -144,6 +144,22 @@ pub enum ChannelCommand { ReloadState(ReloadParams), } +impl Display for ChannelCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ChannelCommand::TxCollaborationCommand(_) => write!(f, "TxCollaborationCommand"), + ChannelCommand::CommitmentSigned() => write!(f, "CommitmentSigned"), + ChannelCommand::AddTlc(_, _) => write!(f, "AddTlc"), + ChannelCommand::RemoveTlc(_, _) => write!(f, "RemoveTlc"), + ChannelCommand::Shutdown(_, _) => write!(f, "Shutdown"), + ChannelCommand::Update(_, _) => write!(f, "Update"), + ChannelCommand::ForwardTlcResult(_) => write!(f, "ForwardTlcResult"), + #[cfg(test)] + ChannelCommand::ReloadState(_) => write!(f, "ReloadState"), + } + } +} + #[cfg(test)] #[derive(Debug)] pub struct ReloadParams { @@ -462,10 +478,10 @@ where FiberChannelMessage::RevokeAndAck(revoke_and_ack) => { let need_commitment_signed = state.handle_revoke_and_ack_peer_message(&self.network, revoke_and_ack)?; + self.update_tlc_status_on_ack(myself, state).await; if need_commitment_signed { self.handle_commitment_signed_command(state)?; } - self.update_tlc_status_on_ack(myself, state).await; Ok(()) } FiberChannelMessage::ChannelReady(_channel_ready) => { @@ -699,12 +715,14 @@ where ); let need_commitment_signed = state.tlc_state.update_for_commitment_signed(); + + // flush remove tlc for received tlcs after replying ack for peer + self.apply_settled_remove_tlcs(myself, state, true).await; + if need_commitment_signed && !state.tlc_state.waiting_ack { self.handle_commitment_signed_command(state)?; } - // flush remove tlc for received tlcs after replying ack for peer - self.apply_settled_remove_tlcs(myself, state, true).await; Ok(()) } @@ -716,9 +734,9 @@ where ) { let previous_balance = state.get_local_balance(); let pending_tlcs = if inbound { - state.tlc_state.received_tlcs.tlcs.iter_mut() + state.tlc_state.received_tlcs.tlcs.iter() } else { - state.tlc_state.offered_tlcs.tlcs.iter_mut() + state.tlc_state.offered_tlcs.tlcs.iter() }; let settled_tlcs: Vec<_> = pending_tlcs .filter(|tlc| { @@ -728,6 +746,7 @@ where TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed) | TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) ) + && !state.tlc_state.applied_remove_tlcs.contains(&tlc.tlc_id) }) .map(|tlc| tlc.tlc_id) .collect(); @@ -737,6 +756,7 @@ where .await .expect("expect remove tlc success"); } + if state.get_local_balance() != previous_balance { state.update_graph_for_local_channel_change(&self.network); state.update_graph_for_remote_channel_change(&self.network); @@ -775,6 +795,7 @@ where // There's no shared secret stored in the received TLC, use the one found in the peeled onion packet. &error.shared_secret, ); + self.register_retryable_tlc_remove( myself, state, @@ -830,6 +851,7 @@ where assert!(previous_channel_id != state.get_id()); let remove_reason = remove_reason.clone().backward(&tlc_info.shared_secret); + self.register_retryable_relay_tlc_remove( myself, state, @@ -1098,6 +1120,9 @@ where tlc_id: TLCId, ) -> Result<(), ProcessingChannelError> { let channel_id = state.get_id(); + assert!(!state.tlc_state.applied_remove_tlcs.contains(&tlc_id)); + state.tlc_state.applied_remove_tlcs.insert(tlc_id); + let (tlc_info, remove_reason) = state.remove_tlc_with_reason(tlc_id)?; if matches!(remove_reason, RemoveTlcReason::RemoveTlcFulfill(_)) && self.store.get_invoice(&tlc_info.payment_hash).is_some() @@ -1188,6 +1213,7 @@ where ))); } }; + state.clean_up_failed_tlcs(); let (funding_tx_partial_signature, commitment_tx_partial_signature) = state.build_and_sign_commitment_tx()?; let commitment_signed = CommitmentSigned { @@ -2294,15 +2320,21 @@ where .await { error!( - "Error while processing channel message: {:?} with message: {:?}", - error, message + "{:?} Error while processing channel message: {:?} with message: {:?}", + state.get_local_peer_id(), + error, + message ); debug_event!(&self.network, &format!("{:?}", error)); } } ChannelActorMessage::Command(command) => { if let Err(err) = self.handle_command(&myself, state, command).await { - error!("Error while processing channel command: {:?}", err); + error!( + "{:?} Error while processing channel command: {:?}", + state.get_local_peer_id(), + err + ); } } ChannelActorMessage::Event(e) => { @@ -2516,6 +2548,7 @@ pub struct TlcInfo { /// ^^^^ ^^^^ /// pub previous_tlc: Option<(Hash256, TLCId)>, + pub removed_confirmed_at: Option, } // When we are forwarding a TLC, we need to know the previous TLC information. @@ -2545,6 +2578,8 @@ impl Debug for TlcInfo { .field("status", &self.status) .field("amount", &self.amount) .field("removed_reason", &self.removed_reason) + .field("payment_hash", &self.payment_hash) + .field("removed_confirmed_at", &self.removed_confirmed_at) .finish() } } @@ -2581,6 +2616,16 @@ impl TlcInfo { self.status.as_inbound_status() } + pub fn is_fail_remove_confirmed(&self) -> bool { + matches!(self.removed_reason, Some(RemoveTlcReason::RemoveTlcFail(_))) + && match self.status { + TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) => true, + TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitAck) => true, + TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed) => true, + _ => false, + } + } + fn get_hash(&self) -> ShortHash { self.payment_hash.as_ref()[..20] .try_into() @@ -2683,6 +2728,31 @@ impl PendingTlcs { .cloned() .collect() } + + pub fn get_oldest_failed_tlcs(&self) -> Vec { + let mut failed_tlcs = self + .tlcs + .iter() + .filter_map(|tlc| { + if tlc.is_fail_remove_confirmed() { + Some((tlc.tlc_id, tlc.removed_confirmed_at.unwrap_or(u64::MAX))) + } else { + None + } + }) + .collect::>(); + + if failed_tlcs.len() > 1 { + failed_tlcs.sort_by_key(|a| a.1); + failed_tlcs + .iter() + .take(failed_tlcs.len() - 1) + .map(|(tlc_id, _)| *tlc_id) + .collect() + } else { + return Vec::new(); + } + } } #[derive(Default, Clone, Debug, Serialize, Deserialize)] @@ -2691,6 +2761,7 @@ pub struct TlcState { pub received_tlcs: PendingTlcs, pub retryable_tlc_operations: Vec, pub applied_add_tlcs: HashSet, + pub applied_remove_tlcs: HashSet, pub waiting_ack: bool, } @@ -2787,6 +2858,7 @@ impl TlcState { pub fn apply_remove_tlc(&mut self, tlc_id: TLCId) { self.applied_add_tlcs.remove(&tlc_id); + self.applied_remove_tlcs.remove(&tlc_id); if tlc_id.is_offered() { self.offered_tlcs.tlcs.retain(|tlc| tlc.tlc_id != tlc_id); } else { @@ -2900,7 +2972,7 @@ impl TlcState { self.need_another_commitment_signed() } - pub fn update_for_revoke_and_ack(&mut self) -> bool { + pub fn update_for_revoke_and_ack(&mut self, commitment_number: CommitmentNumbers) -> bool { self.set_waiting_ack(false); for tlc in self.offered_tlcs.tlcs.iter_mut() { match tlc.outbound_status() { @@ -2912,6 +2984,7 @@ impl TlcState { } OutboundTlcStatus::RemoveWaitAck => { tlc.status = TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed); + tlc.removed_confirmed_at = Some(commitment_number.get_local()); } _ => {} } @@ -2927,6 +3000,7 @@ impl TlcState { } InboundTlcStatus::LocalRemoved => { tlc.status = TlcStatus::Inbound(InboundTlcStatus::RemoveAckConfirmed); + tlc.removed_confirmed_at = Some(commitment_number.get_remote()); } _ => {} } @@ -4126,14 +4200,16 @@ impl ChannelActorState { self.to_remote_amount } - pub fn get_offered_tlc_balance(&self) -> u128 { + pub fn get_offered_tlc_balance(&self, exclude_failed_tls: bool) -> u128 { self.get_all_offer_tlcs() + .filter(|tlc| !(exclude_failed_tls && tlc.is_fail_remove_confirmed())) .map(|tlc| tlc.amount) .sum::() } - pub fn get_received_tlc_balance(&self) -> u128 { + pub fn get_received_tlc_balance(&self, exclude_failed_tls: bool) -> u128 { self.get_all_received_tlcs() + .filter(|tlc| !(exclude_failed_tls && tlc.is_fail_remove_confirmed())) .map(|tlc| tlc.amount) .sum::() } @@ -4376,6 +4452,7 @@ impl ChannelActorState { self.get_remote_commitment_number().to_be_bytes().as_slice(), ] .concat(); + let message = blake2b_256( [ to_local_output.as_slice(), @@ -4386,7 +4463,6 @@ impl ChannelActorState { ] .concat(), ); - sign_ctx.sign(message.as_slice())? }; @@ -4555,24 +4631,31 @@ impl ChannelActorState { ))); } let payment_hash = tlc.payment_hash; - if let Some(tlc) = self + let mut tlc_infos = self .tlc_state .all_tlcs() - .find(|tlc| tlc.payment_hash == payment_hash) - { - return Err(ProcessingChannelError::RepeatedProcessing(format!( - "Trying to insert tlc with duplicate payment hash {:?} with tlc {:?}", - payment_hash, tlc - ))); + .filter(|tlc| tlc.payment_hash == payment_hash) + .peekable(); + + if tlc_infos.peek().is_some() { + if tlc_infos.all(|t| t.is_fail_remove_confirmed()) { + // If all the tlcs with the same payment hash are confirmed to be failed, + // then it's safe to insert the new tlc, the old tlcs will be removed later. + } else { + return Err(ProcessingChannelError::RepeatedProcessing(format!( + "Trying to insert tlc with duplicate payment hash {:?}", + payment_hash + ))); + } } if tlc.is_offered() { - let sent_tlc_value = self.get_offered_tlc_balance(); + let sent_tlc_value = self.get_offered_tlc_balance(false); debug_assert!(self.to_local_amount >= sent_tlc_value); if sent_tlc_value + tlc.amount > self.to_local_amount { return Err(ProcessingChannelError::TlcAmountExceedLimit); } } else { - let received_tlc_value = self.get_received_tlc_balance(); + let received_tlc_value = self.get_received_tlc_balance(false); debug_assert!(self.to_remote_amount >= received_tlc_value); if received_tlc_value + tlc.amount > self.to_remote_amount { debug!( @@ -4628,8 +4711,8 @@ impl ChannelActorState { debug!("Updated local balance to {} and remote balance to {} by removing tlc {:?} with reason {:?}", to_local_amount, to_remote_amount, tlc_id, reason); + self.tlc_state.apply_remove_tlc(tlc_id); } - self.tlc_state.apply_remove_tlc(tlc_id); debug!( "Removed tlc payment_hash {:?} with reason {:?}", current.payment_hash, reason @@ -4638,6 +4721,27 @@ impl ChannelActorState { Ok((current.clone(), reason)) } + pub fn clean_up_failed_tlcs(&mut self) { + // Remove the oldest failed tlcs from the channel state turns out to be very tricky + // Because the different parties may have different views on the failed tlcs, + // so we need to be very careful here. + + // The basic idea is to remove the oldest failed tlcs that are confirmed by both parties. + // And we need to calculate the oldest failed tlcs independently from two directions, + // Because we may have tlc operations from both directions at the same time, order matters. + // see #475 for more details. + let failed_offered_tlcs = self.tlc_state.offered_tlcs.get_oldest_failed_tlcs(); + let failed_received_tlcs = self.tlc_state.received_tlcs.get_oldest_failed_tlcs(); + + for tlc_id in failed_offered_tlcs + .iter() + .chain(failed_received_tlcs.iter()) + { + debug_assert!(self.tlc_state.applied_remove_tlcs.contains(&tlc_id)); + self.tlc_state.apply_remove_tlc(*tlc_id); + } + } + pub fn get_local_channel_public_keys(&self) -> &ChannelBasePublicKeys { &self.local_channel_public_keys } @@ -5136,6 +5240,7 @@ impl ChannelActorState { TLCId::Received(prev_tlc.prev_tlc_id), ) }), + removed_confirmed_at: None, } } @@ -5155,6 +5260,7 @@ impl ChannelActorState { created_at: self.get_current_commitment_numbers(), removed_reason: None, previous_tlc: None, + removed_confirmed_at: None, }; Ok(tlc_info) } @@ -5557,6 +5663,7 @@ impl ChannelActorState { } }; + self.clean_up_failed_tlcs(); let (commitment_tx, settlement_data) = self.verify_and_complete_tx( commitment_signed.funding_tx_partial_signature, commitment_signed.commitment_tx_partial_signature, @@ -5880,6 +5987,7 @@ impl ChannelActorState { ] .concat(), ); + let aggregated_signature = sign_ctx.sign_and_aggregate(message.as_slice(), revocation_partial_signature)?; RevocationData { @@ -5912,6 +6020,7 @@ impl ChannelActorState { ] .concat(), ); + let aggregated_signature = sign_ctx.sign_and_aggregate(message.as_slice(), commitment_tx_partial_signature)?; @@ -5928,7 +6037,9 @@ impl ChannelActorState { self.increment_local_commitment_number(); self.append_remote_commitment_point(next_per_commitment_point); - let need_commitment_signed = self.tlc_state.update_for_revoke_and_ack(); + let need_commitment_signed = self + .tlc_state + .update_for_revoke_and_ack(self.commitment_numbers); network .send_message(NetworkActorMessage::new_notification( NetworkServiceEvent::RevokeAndAckReceived( @@ -6736,6 +6847,7 @@ impl ChannelActorState { } } } + let to_local_value = self.to_local_amount + received_fullfilled - offered_pending; let to_remote_value = self.to_remote_amount + offered_fullfilled - received_pending; @@ -6789,6 +6901,7 @@ impl ChannelActorState { ) .build(); let to_remote_output_data = Bytes::default(); + if for_remote { ( [to_local_output, to_remote_output], diff --git a/src/fiber/history.rs b/src/fiber/history.rs index bc11c9aa..898667b3 100644 --- a/src/fiber/history.rs +++ b/src/fiber/history.rs @@ -181,7 +181,10 @@ impl InternalResult { let Some(index) = error_index else { error!("Error index not found in the route: {:?}", tlc_err); - return need_to_retry; + // if the error node is not in the route, + // and we can not penalize the source node (which is ourself) + // it's better to stop the payment session + return false; }; let len = nodes.len(); @@ -254,7 +257,14 @@ impl InternalResult { TlcErrorCode::PermanentChannelFailure => { self.fail_pair(nodes, index + 1); } - TlcErrorCode::FeeInsufficient | TlcErrorCode::IncorrectTlcExpiry => { + TlcErrorCode::FeeInsufficient => { + need_to_retry = true; + self.fail_pair_balanced(nodes, index + 1); + if index > 1 { + self.succeed_range_pairs(nodes, 0, index); + } + } + TlcErrorCode::IncorrectTlcExpiry => { need_to_retry = false; if index == 1 { self.fail_node(nodes, 1); diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 8877d0cc..3220e1ad 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -1522,7 +1522,11 @@ where &payment_session.session_key, payment_session.hops_public_keys(), ) - .unwrap_or(TlcErr::new(TlcErrorCode::InvalidOnionError)); + .unwrap_or_else(|| { + debug_event!(myself, "InvalidOnionError"); + TlcErr::new(TlcErrorCode::InvalidOnionError) + }); + self.update_graph_with_tlc_fail(&state.network, &error_detail) .await; let need_to_retry = self @@ -1772,8 +1776,15 @@ where let Some(mut payment_session) = self.store.get_payment_session(payment_hash) else { return Err(Error::InvalidParameter(payment_hash.to_string())); }; + assert!(payment_session.status != PaymentSessionStatus::Failed); + debug!( + "try_payment_session: {:?} times: {:?}", + payment_session.payment_hash(), + payment_session.retried_times + ); + let payment_data = payment_session.request.clone(); if payment_session.can_retry() { if payment_session.last_error != Some("WaitingTlcAck".to_string()) { @@ -1783,6 +1794,7 @@ where let hops_info = self .build_payment_route(&mut payment_session, &payment_data) .await?; + match self .send_payment_onion_packet(state, &mut payment_session, &payment_data, hops_info) .await diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index f22ca4ef..94673e2b 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -4514,6 +4514,9 @@ async fn test_connect_to_peers_with_mutual_channel_on_restart_1() { ) .await; + // sleep for a while to make sure this test works both for release mode + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + node_a.restart().await; node_a.expect_event( @@ -4554,6 +4557,9 @@ async fn test_connect_to_peers_with_mutual_channel_on_restart_2() { ) .await; + // sleep for a while to make sure this test works both for release mode + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + node_a.stop().await; node_b.expect_event( @@ -5299,6 +5305,7 @@ async fn test_send_payment_with_all_failed_middle_hops() { // because there is only one path for the payment, the payment will fail in the second try // this assertion make sure we didn't do meaningless retry + assert!(node_0.get_triggered_unexpected_events().await.is_empty()); let payment_session = source_node.get_payment_session(payment_hash).unwrap(); assert_eq!(payment_session.retried_times, 3); } diff --git a/src/fiber/tests/payment.rs b/src/fiber/tests/payment.rs index 0e0a6929..5033c132 100644 --- a/src/fiber/tests/payment.rs +++ b/src/fiber/tests/payment.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; - use super::test_utils::init_tracing; +use crate::fiber::channel::UpdateCommand; use crate::fiber::graph::PaymentSessionStatus; use crate::fiber::network::HopHint; use crate::fiber::network::SendPaymentCommand; @@ -9,6 +8,7 @@ use crate::fiber::types::Hash256; use crate::fiber::NetworkActorCommand; use crate::fiber::NetworkActorMessage; use ractor::call; +use std::collections::HashSet; // This test will send two payments from node_0 to node_1, the first payment will run // with dry_run, the second payment will run without dry_run. Both payments will be successful. @@ -25,9 +25,9 @@ async fn test_send_payment_for_direct_channel_and_dry_run() { true, ) .await; - let [mut node_0, node_1] = nodes.try_into().expect("2 nodes"); + let [node_0, node_1] = nodes.try_into().expect("2 nodes"); let channel = channels[0]; - let source_node = &mut node_0; + let source_node = &node_0; let res = source_node .send_payment_keysend(&node_1, 10000000000, true) @@ -328,7 +328,7 @@ async fn test_send_payment_for_pay_self() { true, ) .await; - let [mut node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); let node_1_channel0_balance = node_1.get_local_balance_from_channel(channels[0]); let node_1_channel1_balance = node_1.get_local_balance_from_channel(channels[1]); @@ -402,7 +402,7 @@ async fn test_send_payment_for_pay_self_with_two_nodes() { true, ) .await; - let [mut node_0, node_1] = nodes.try_into().expect("2 nodes"); + let [node_0, node_1] = nodes.try_into().expect("2 nodes"); let node_1_channel0_balance = node_1.get_local_balance_from_channel(channels[0]); let node_1_channel1_balance = node_1.get_local_balance_from_channel(channels[1]); @@ -469,7 +469,7 @@ async fn test_send_payment_with_more_capacity_for_payself() { true, ) .await; - let [mut node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); let node_1_channel0_balance = node_1.get_local_balance_from_channel(channels[0]); let node_1_channel1_balance = node_1.get_local_balance_from_channel(channels[1]); @@ -564,7 +564,7 @@ async fn test_send_payment_with_route_to_self_with_hop_hints() { true, ) .await; - let [mut node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); eprintln!("node_0: {:?}", node_0.pubkey); eprintln!("node_1: {:?}", node_1.pubkey); eprintln!("node_2: {:?}", node_2.pubkey); @@ -676,7 +676,7 @@ async fn test_send_payment_with_route_to_self_with_outbound_hop_hints() { true, ) .await; - let [mut node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); eprintln!("node_0: {:?}", node_0.pubkey); eprintln!("node_1: {:?}", node_1.pubkey); eprintln!("node_2: {:?}", node_2.pubkey); @@ -773,7 +773,7 @@ async fn test_send_payment_select_channel_with_hop_hints() { true, ) .await; - let [mut node_0, node_1, node_2, node_3] = nodes.try_into().expect("4 nodes"); + let [node_0, node_1, node_2, node_3] = nodes.try_into().expect("4 nodes"); eprintln!("node_0: {:?}", node_0.pubkey); eprintln!("node_1: {:?}", node_1.pubkey); eprintln!("node_2: {:?}", node_2.pubkey); @@ -918,7 +918,7 @@ async fn test_send_payment_two_nodes_with_hop_hints_and_multiple_channels() { true, ) .await; - let [mut node_0, node_1] = nodes.try_into().expect("2 nodes"); + let [node_0, node_1] = nodes.try_into().expect("2 nodes"); eprintln!("node_0: {:?}", node_0.pubkey); eprintln!("node_1: {:?}", node_1.pubkey); @@ -1129,7 +1129,7 @@ async fn test_network_three_nodes_two_channels_send_each_other() { true, ) .await; - let [mut node_a, node_b, mut node_c] = nodes.try_into().expect("3 nodes"); + let [node_a, node_b, node_c] = nodes.try_into().expect("3 nodes"); // Wait for the channel announcement to be broadcasted tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; @@ -1301,7 +1301,7 @@ async fn test_send_payment_bench_test() { true, ) .await; - let [mut node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, node_1, node_2] = nodes.try_into().expect("3 nodes"); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; @@ -1355,7 +1355,7 @@ async fn test_send_payment_three_nodes_wait_succ_bench_test() { true, ) .await; - let [mut node_0, _node_1, node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, _node_1, node_2] = nodes.try_into().expect("3 nodes"); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; @@ -1391,7 +1391,7 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { true, ) .await; - let [mut node_0, _node_1, mut node_2] = nodes.try_into().expect("3 nodes"); + let [node_0, _node_1, node_2] = nodes.try_into().expect("3 nodes"); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; @@ -1418,6 +1418,87 @@ async fn test_send_payment_three_nodes_send_each_other_bench_test() { } } +#[tokio::test] +async fn test_send_payment_three_nodes_send_each_other_no_wait() { + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 3, + true, + ) + .await; + + let mut all_sent = vec![]; + let node_0_balance = nodes[0].get_local_balance_from_channel(channels[0]); + let node_2_balance = nodes[2].get_local_balance_from_channel(channels[1]); + + let amount = 100000; + let mut node_0_sent_fee = 0; + let mut node_0_sent_amount = 0; + let mut node_2_sent_fee = 0; + let mut node_2_sent_amount = 0; + for _i in 0..4 { + for _k in 0..3 { + let payment1 = nodes[0] + .send_payment_keysend(&nodes[2], amount, false) + .await + .unwrap(); + eprintln!( + "send: {} payment_hash: {:?} sent, fee: {:?}", + _i, payment1.payment_hash, payment1.fee + ); + node_0_sent_fee += payment1.fee; + node_0_sent_amount += amount; + } + + let payment2 = nodes[2] + .send_payment_keysend(&nodes[0], amount, false) + .await + .unwrap(); + all_sent.push((2, payment2.payment_hash)); + eprintln!( + "send: {} payment_hash: {:?} sent, fee: {:?}", + _i, payment2.payment_hash, payment2.fee + ); + node_2_sent_fee += payment2.fee; + node_2_sent_amount += amount; + } + + loop { + for (node_index, payment_hash) in all_sent.clone().iter() { + let node = &nodes[*node_index]; + node.wait_until_success(*payment_hash).await; + all_sent.retain(|x| x.1 != *payment_hash); + } + if all_sent.is_empty() { + break; + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; + let new_node_0_balance = nodes[0].get_local_balance_from_channel(channels[0]); + let new_node_2_balance = nodes[2].get_local_balance_from_channel(channels[1]); + eprintln!( + "node_0_balance: {}, new_node_0_balance: {}, node_0_sent_amount: {}, node_0_sent_fee: {}", + node_0_balance, new_node_0_balance, node_0_sent_amount, node_0_sent_fee, + ); + eprintln!( + "node_2_balance: {}, new_node_2_balance: {}, node_2_sent_amount: {}, node_2_sent_fee: {}", + node_2_balance, new_node_2_balance, node_2_sent_amount, node_2_sent_fee + ); + assert_eq!( + new_node_0_balance, + node_0_balance - node_0_sent_fee - 8 * amount + ); + assert_eq!( + new_node_2_balance, + node_2_balance - node_2_sent_fee + 8 * amount + ); +} + #[tokio::test] async fn test_send_payment_three_nodes_bench_test() { init_tracing(); @@ -1431,7 +1512,6 @@ async fn test_send_payment_three_nodes_bench_test() { true, ) .await; - let [mut node_1, mut node_2, mut node_3] = nodes.try_into().expect("3 nodes"); tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; @@ -1444,14 +1524,14 @@ async fn test_send_payment_three_nodes_bench_test() { let mut node_2_ch1_sent_amount = 0; let mut node_2_ch2_sent_amount = 0; - let old_node_1_amount = node_1.get_local_balance_from_channel(channels[0]); - let old_node_2_chnnale1_amount = node_2.get_local_balance_from_channel(channels[0]); - let old_node_2_chnnale2_amount = node_2.get_local_balance_from_channel(channels[1]); - let old_node_3_amount = node_3.get_local_balance_from_channel(channels[1]); + let old_node_1_amount = nodes[0].get_local_balance_from_channel(channels[0]); + let old_node_2_chnnale1_amount = nodes[1].get_local_balance_from_channel(channels[0]); + let old_node_2_chnnale2_amount = nodes[1].get_local_balance_from_channel(channels[1]); + let old_node_3_amount = nodes[2].get_local_balance_from_channel(channels[1]); for i in 1..=4 { - let payment1 = node_1 - .send_payment_keysend(&node_3, 1000, false) + let payment1 = nodes[0] + .send_payment_keysend(&nodes[2], 1000, false) .await .unwrap(); all_sent.insert((1, payment1.payment_hash, payment1.fee)); @@ -1459,8 +1539,8 @@ async fn test_send_payment_three_nodes_bench_test() { node_1_sent_fee += payment1.fee; node_2_got_fee += payment1.fee; - let payment2 = node_2 - .send_payment_keysend(&node_3, 1000, false) + let payment2 = nodes[1] + .send_payment_keysend(&nodes[2], 1000, false) .await .unwrap(); all_sent.insert((2, payment2.payment_hash, payment2.fee)); @@ -1468,8 +1548,8 @@ async fn test_send_payment_three_nodes_bench_test() { node_2_ch1_sent_amount += 1000; node1_got_amount += 1000; - let payment3 = node_2 - .send_payment_keysend(&node_1, 1000, false) + let payment3 = nodes[1] + .send_payment_keysend(&nodes[0], 1000, false) .await .unwrap(); all_sent.insert((2, payment3.payment_hash, payment3.fee)); @@ -1477,39 +1557,29 @@ async fn test_send_payment_three_nodes_bench_test() { node_2_ch2_sent_amount += 1000; node3_got_amount += 1000; - let payment4 = node_3 - .send_payment_keysend(&node_1, 1000, false) + let payment4 = nodes[2] + .send_payment_keysend(&nodes[0], 1000, false) .await .unwrap(); all_sent.insert((3, payment4.payment_hash, payment4.fee)); eprintln!("send: {} payment_hash: {:?} sent", i, payment4.payment_hash); + assert!(payment4.fee > 0); node_3_sent_fee += payment4.fee; node_2_got_fee += payment4.fee; } loop { - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; - for (node_index, payment_hash, fee) in all_sent.clone().iter() { - let node = match node_index { - 1 => &mut node_1, - 2 => &mut node_2, - 3 => &mut node_3, - _ => unreachable!(), - }; - let status = node.get_payment_status(*payment_hash).await; - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - eprintln!("got payment: {:?} status: {:?}", payment_hash, status); - if status == PaymentSessionStatus::Success { - eprintln!("payment_hash: {:?} success", payment_hash); - all_sent.remove(&(*node_index, *payment_hash, *fee)); - } + nodes[*node_index - 1] + .wait_until_success(*payment_hash) + .await; + all_sent.remove(&(*node_index, *payment_hash, *fee)); } - let res = node_1.node_info().await; + let res = nodes[0].node_info().await; eprintln!("node1 node_info: {:?}", res); - let res = node_2.node_info().await; + let res = nodes[1].node_info().await; eprintln!("node2 node_info: {:?}", res); - let res = node_3.node_info().await; + let res = nodes[2].node_info().await; eprintln!("node3 node_info: {:?}", res); if all_sent.is_empty() { break; @@ -1524,10 +1594,10 @@ async fn test_send_payment_three_nodes_bench_test() { // node3: sent 4 fee to node2, got 4000 from node2 // node2: got 8 from node1 and node3, sent 8000 to node1 and node3 - let node_1_amount = node_1.get_local_balance_from_channel(channels[0]); - let node_2_chnnale1_amount = node_2.get_local_balance_from_channel(channels[0]); - let node_2_chnnale2_amount = node_2.get_local_balance_from_channel(channels[1]); - let node_3_amount = node_3.get_local_balance_from_channel(channels[1]); + let node_1_amount = nodes[0].get_local_balance_from_channel(channels[0]); + let node_2_chnnale1_amount = nodes[1].get_local_balance_from_channel(channels[0]); + let node_2_chnnale2_amount = nodes[1].get_local_balance_from_channel(channels[1]); + let node_3_amount = nodes[2].get_local_balance_from_channel(channels[1]); let node_1_amount_diff = node_1_amount - old_node_1_amount; let node_2_chnnale1_amount_diff = old_node_2_chnnale1_amount - node_2_chnnale1_amount; @@ -1569,7 +1639,7 @@ async fn test_send_payment_middle_hop_stopped() { true, ) .await; - let [mut node_0, _node_1, _node_2, node_3, mut node_4] = nodes.try_into().expect("5 nodes"); + let [node_0, _node_1, _node_2, node_3, mut node_4] = nodes.try_into().expect("5 nodes"); // dry run node_0 -> node_3 will select 0 -> 4 -> 3 let res = node_0 @@ -1613,7 +1683,7 @@ async fn test_send_payment_middle_hop_stopped_retry_longer_path() { true, ) .await; - let [mut node_0, _node_1, mut node_2, mut node_3, _node_4, _node_5, _node_6] = + let [node_0, _node_1, mut node_2, mut node_3, _node_4, _node_5, _node_6] = nodes.try_into().expect("7 nodes"); // dry run node_0 -> node_3 will select 0 -> 1 -> 2 -> 3 @@ -1770,7 +1840,7 @@ async fn test_send_payment_target_hop_stopped() { true, ) .await; - let [mut node_0, _node_1, _node_2, _node_3, mut node_4] = nodes.try_into().expect("5 nodes"); + let [node_0, _node_1, _node_2, _node_3, mut node_4] = nodes.try_into().expect("5 nodes"); // dry run node_0 -> node_4 will select 0 -> 1 -> 2 -> 3 -> 4 let res = node_0 @@ -1811,7 +1881,7 @@ async fn test_send_payment_middle_hop_balance_is_not_enough() { true, ) .await; - let [mut node_0, _node_1, _node_2, node_3] = nodes.try_into().expect("3 nodes"); + let [node_0, _node_1, _node_2, node_3] = nodes.try_into().expect("3 nodes"); let res = node_0 .send_payment_keysend(&node_3, 1000, false) @@ -1829,3 +1899,289 @@ async fn test_send_payment_middle_hop_balance_is_not_enough() { .expect("got error") .contains("Failed to build route")); } + +#[tokio::test] +async fn test_send_payment_middle_hop_update_fee_send_payment_failed() { + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 4, + true, + ) + .await; + let [node_0, _node_1, mut node_2, node_3] = nodes.try_into().expect("4 nodes"); + + // node_2 update fee rate to a higher one, so the payment will fail + let res = node_0 + .send_payment_keysend(&node_3, 1000, false) + .await + .unwrap(); + eprintln!("res: {:?}", res); + let payment_hash = res.payment_hash; + + node_2 + .update_channel_with_command( + channels[2], + UpdateCommand { + enabled: None, + tlc_expiry_delta: None, + tlc_minimum_value: None, + tlc_fee_proportional_millionths: Some(100000), + }, + ) + .await; + + node_0.wait_until_failed(payment_hash).await; +} + +#[tokio::test] +async fn test_send_payment_middle_hop_update_fee_multiple_payments() { + // https://github.com/nervosnetwork/fiber/issues/480 + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (mut nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 4, + true, + ) + .await; + + let mut all_sent = HashSet::new(); + + for _i in 0..5 { + let res = nodes[0] + .send_payment_keysend(&nodes[3], 1000, false) + .await + .unwrap(); + all_sent.insert(res.payment_hash); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + nodes[2] + .update_channel_with_command( + channels[2], + UpdateCommand { + enabled: None, + tlc_expiry_delta: None, + tlc_minimum_value: None, + tlc_fee_proportional_millionths: Some(100000), + }, + ) + .await; + + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + loop { + for i in 0..4 { + assert!(nodes[i].get_triggered_unexpected_events().await.is_empty()); + } + + for payment_hash in all_sent.clone().iter() { + let status = nodes[0].get_payment_status(*payment_hash).await; + //eprintln!("got payment: {:?} status: {:?}", payment_hash, status); + if status == PaymentSessionStatus::Failed || status == PaymentSessionStatus::Success { + eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status); + all_sent.remove(payment_hash); + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + if all_sent.is_empty() { + break; + } + } +} + +#[tokio::test] +async fn test_send_payment_middle_hop_update_fee_should_recovery() { + // a variant test from + // https://github.com/nervosnetwork/fiber/issues/480 + // in this test, we will make sure the payment should recovery after the fee is updated by the middle hop + // there are two channels between node_1 and node_2, they are with the same fee rate + // path finding will pick the channel with latest time, so channels[2] will be picked + // but we will update the fee rate of channels[2] to a higher one + // so the payment will fail, but after the payment failed, the path finding should pick the channels[1] in the next try + // in the end, all the payments should success + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (mut nodes, channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((1, 2), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ((2, 3), (HUGE_CKB_AMOUNT, HUGE_CKB_AMOUNT)), + ], + 4, + true, + ) + .await; + let mut all_sent = HashSet::new(); + + let tx_count = 6; + for _i in 0..tx_count { + let res = nodes[0] + .send_payment_keysend(&nodes[3], 1000, false) + .await + .unwrap(); + all_sent.insert(res.payment_hash); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + nodes[1] + .update_channel_with_command( + channels[2], + UpdateCommand { + enabled: None, + tlc_expiry_delta: None, + tlc_minimum_value: None, + tlc_fee_proportional_millionths: Some(100000), + }, + ) + .await; + + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + let mut succ_count = 0; + loop { + for i in 0..4 { + assert!(nodes[i].get_triggered_unexpected_events().await.is_empty()); + } + + for payment_hash in all_sent.clone().iter() { + let status = nodes[0].get_payment_status(*payment_hash).await; + if status == PaymentSessionStatus::Success || status == PaymentSessionStatus::Failed { + eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status); + all_sent.remove(payment_hash); + if status == PaymentSessionStatus::Success { + succ_count += 1; + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + if all_sent.is_empty() { + break; + } + } + + assert_eq!(succ_count, tx_count); + let channel_state = nodes[0].get_channel_actor_state(channels[0]); + assert_eq!(channel_state.get_offered_tlc_balance(true), 0); + assert!(channel_state.get_offered_tlc_balance(false) > 0); +} + +async fn run_complex_network_with_params( + funding_amount: u128, + payment_amount_gen: impl Fn() -> u128, +) -> Vec<(Hash256, PaymentSessionStatus)> { + init_tracing(); + let _span = tracing::info_span!("node", node = "test").entered(); + let (nodes, _channels) = create_n_nodes_with_index_and_amounts_with_established_channel( + &[ + ((0, 1), (funding_amount, funding_amount)), + ((1, 2), (funding_amount, funding_amount)), + ((3, 4), (funding_amount, funding_amount)), + ((4, 5), (funding_amount, funding_amount)), + ((0, 3), (funding_amount, funding_amount)), + ((1, 4), (funding_amount, funding_amount)), + ((2, 5), (funding_amount, funding_amount)), + ], + 6, + true, + ) + .await; + + let mut all_sent = HashSet::new(); + for _k in 0..3 { + for i in 0..6 { + let payment_amount = payment_amount_gen(); + let res = nodes[i] + .send_payment_keysend_to_self(payment_amount, false) + .await; + if let Ok(res) = res { + let payment_hash = res.payment_hash; + all_sent.insert((i, payment_hash)); + } + } + } + + let mut result = vec![]; + loop { + for i in 0..6 { + let unexpected_events = nodes[i].get_triggered_unexpected_events().await; + if !unexpected_events.is_empty() { + eprintln!("node_{} got unexpected events: {:?}", i, unexpected_events); + unreachable!("unexpected events"); + } + } + + for (i, payment_hash) in all_sent.clone().into_iter() { + let status = nodes[i].get_payment_status(payment_hash).await; + eprintln!("payment_hash: {:?} got status : {:?}", payment_hash, status); + if matches!( + status, + PaymentSessionStatus::Success | PaymentSessionStatus::Failed + ) { + result.push((payment_hash, status)); + all_sent.remove(&(i, payment_hash)); + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + if all_sent.is_empty() { + break; + } + } + + // make sure all the channels are still workable with small accounts + for i in 0..6 { + if let Ok(res) = nodes[i].send_payment_keysend_to_self(500, false).await { + nodes[i].wait_until_success(res.payment_hash).await; + } + } + + result +} + +#[tokio::test] +async fn test_send_payment_complex_network_payself_all_succeed() { + // from issue 475 + // channel amount is enough, so all payments should success + let res = run_complex_network_with_params(MIN_RESERVED_CKB + 100000000, || 1000).await; + let failed_count = res + .iter() + .filter(|(_, status)| *status == PaymentSessionStatus::Failed) + .count(); + + assert_eq!(failed_count, 0); +} + +#[tokio::test] +async fn test_send_payment_complex_network_payself_amount_exceeded() { + // variant from issue 475 + // the channel amount is not enough, so payments maybe be failed + let ckb_unit = 100_000_000; + let res = run_complex_network_with_params(MIN_RESERVED_CKB + 1000 * ckb_unit, || { + (400 as u128 + (rand::random::() % 100) as u128) * ckb_unit + }) + .await; + + // some may failed and some may success + let failed_count = res + .iter() + .filter(|(_, status)| *status == PaymentSessionStatus::Failed) + .count(); + assert!(failed_count > 0); + let succ_count = res + .iter() + .filter(|(_, status)| *status == PaymentSessionStatus::Success) + .count(); + assert!(succ_count > 0); +} diff --git a/src/fiber/tests/test_utils.rs b/src/fiber/tests/test_utils.rs index 56b7379d..1ec6403b 100644 --- a/src/fiber/tests/test_utils.rs +++ b/src/fiber/tests/test_utils.rs @@ -3,6 +3,7 @@ use crate::fiber::channel::ChannelActorStateStore; use crate::fiber::channel::ChannelCommand; use crate::fiber::channel::ChannelCommandWithId; use crate::fiber::channel::ReloadParams; +use crate::fiber::channel::UpdateCommand; use crate::fiber::graph::NetworkGraphStateStore; use crate::fiber::graph::PaymentSession; use crate::fiber::graph::PaymentSessionStatus; @@ -21,6 +22,7 @@ use ractor::{call, Actor, ActorRef}; use rand::rngs::OsRng; use secp256k1::{Message, Secp256k1}; use std::collections::HashMap; +use std::collections::HashSet; use std::{ env, ffi::OsStr, @@ -186,6 +188,8 @@ pub struct NetworkNode { pub peer_id: PeerId, pub event_emitter: mpsc::Receiver, pub pubkey: Pubkey, + pub unexpected_events: Arc>>, + pub triggered_unexpected_events: Arc>>, } pub struct NetworkNodeConfig { @@ -549,7 +553,7 @@ impl NetworkNode { } pub async fn send_payment( - &mut self, + &self, command: SendPaymentCommand, ) -> std::result::Result { let message = |rpc_reply| -> NetworkActorMessage { @@ -562,7 +566,7 @@ impl NetworkNode { } pub async fn send_payment_keysend( - &mut self, + &self, recipient: &NetworkNode, amount: u128, dry_run: bool, @@ -587,7 +591,7 @@ impl NetworkNode { } pub async fn send_payment_keysend_to_self( - &mut self, + &self, amount: u128, dry_run: bool, ) -> std::result::Result { @@ -665,6 +669,7 @@ impl NetworkNode { pub async fn wait_until_success(&self, payment_hash: Hash256) { loop { + assert!(self.get_triggered_unexpected_events().await.is_empty()); let status = self.get_payment_status(payment_hash).await; if status == PaymentSessionStatus::Success { eprintln!("Payment success: {:?}\n\n", payment_hash); @@ -680,6 +685,7 @@ impl NetworkNode { pub async fn wait_until_failed(&self, payment_hash: Hash256) { loop { + assert!(self.get_triggered_unexpected_events().await.is_empty()); let status = self.get_payment_status(payment_hash).await; if status == PaymentSessionStatus::Failed { eprintln!("Payment failed: {:?}\n\n", payment_hash); @@ -762,6 +768,24 @@ impl NetworkNode { .await; } + pub async fn update_channel_with_command( + &mut self, + channel_id: Hash256, + command: UpdateCommand, + ) { + let message = |rpc_reply| -> NetworkActorMessage { + NetworkActorMessage::Command(NetworkActorCommand::ControlFiberChannel( + ChannelCommandWithId { + channel_id, + command: ChannelCommand::Update(command, rpc_reply), + }, + )) + }; + call!(self.network_actor, message) + .expect("node_a alive") + .expect("update channel success"); + } + pub fn get_payment_session(&self, payment_hash: Hash256) -> Option { self.store.get_payment_session(payment_hash) } @@ -840,6 +864,44 @@ impl NetworkNode { } }; + let mut unexpected_events: HashSet = HashSet::new(); + + // Some usual unexpected events that we want to not happended + // use `assert!(node.get_triggered_unexpected_events().await.is_empty())` to check it + let default_unexpected_events = vec![ + "Musig2VerifyError", + "Musig2RoundFinalizeError", + "InvalidOnionError", + ]; + for event in default_unexpected_events { + unexpected_events.insert(event.to_string()); + } + + let unexpected_events = Arc::new(TokioRwLock::new(unexpected_events)); + let triggered_unexpected_events = Arc::new(TokioRwLock::new(Vec::::new())); + let (self_event_sender, self_event_receiver) = mpsc::channel(10000); + let unexpected_events_clone = unexpected_events.clone(); + let triggered_unexpected_events_clone = triggered_unexpected_events.clone(); + // spwan a new thread to collect all the events from event_receiver + tokio::spawn(async move { + while let Some(event) = event_receiver.recv().await { + self_event_sender + .send(event.clone()) + .await + .expect("send event"); + let unexpected_events = unexpected_events_clone.read().await; + let event_content = format!("{:?}", event); + for unexpected_event in unexpected_events.iter() { + if event_content.contains(unexpected_event) { + triggered_unexpected_events_clone + .write() + .await + .push(unexpected_event.clone()); + } + } + } + }); + println!( "Network node started for peer_id {:?} in directory {:?}", &peer_id, @@ -858,8 +920,10 @@ impl NetworkNode { chain_actor, private_key: secret_key.into(), peer_id, - event_emitter: event_receiver, + event_emitter: self_event_receiver, pubkey: public_key.into(), + unexpected_events, + triggered_unexpected_events, } } @@ -872,6 +936,17 @@ impl NetworkNode { } } + pub async fn add_unexpected_events(&self, events: Vec) { + let mut unexpected_events = self.unexpected_events.write().await; + for event in events { + unexpected_events.insert(event); + } + } + + pub async fn get_triggered_unexpected_events(&self) -> Vec { + self.triggered_unexpected_events.read().await.clone() + } + pub async fn get_network_channels(&self) -> Vec { self.network_graph .read() diff --git a/src/fiber/tests/tlc_op.rs b/src/fiber/tests/tlc_op.rs index 4bfa9b7c..f321344b 100644 --- a/src/fiber/tests/tlc_op.rs +++ b/src/fiber/tests/tlc_op.rs @@ -213,6 +213,7 @@ impl Actor for TlcActor { shared_secret: command.shared_secret, previous_tlc: None, status: TlcStatus::Outbound(OutboundTlcStatus::LocalAnnounced), + removed_confirmed_at: None, }; state.tlc_state.add_offered_tlc(add_tlc.clone()); state.tlc_state.increment_offering(); @@ -327,7 +328,9 @@ impl Actor for TlcActor { let hash = sign_tlcs(tlcs); assert_eq!(hash, peer_hash); - state.tlc_state.update_for_revoke_and_ack(); + state + .tlc_state + .update_for_revoke_and_ack(CommitmentNumbers::default()); } } Ok(()) @@ -467,6 +470,7 @@ fn test_tlc_state_v2() { created_at: CommitmentNumbers::default(), removed_reason: None, previous_tlc: None, + removed_confirmed_at: None, }; let mut add_tlc2 = TlcInfo { amount: 20000, @@ -481,6 +485,7 @@ fn test_tlc_state_v2() { created_at: CommitmentNumbers::default(), removed_reason: None, previous_tlc: None, + removed_confirmed_at: None, }; tlc_state.add_offered_tlc(add_tlc1.clone()); tlc_state.add_offered_tlc(add_tlc2.clone()); diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 010b8932..48f058ce 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1568,7 +1568,9 @@ impl Debug for RemoveTlcReason { RemoveTlcReason::RemoveTlcFulfill(_fulfill) => { write!(f, "RemoveTlcFulfill") } - RemoveTlcReason::RemoveTlcFail(_fail) => write!(f, "RemoveTlcFail"), + RemoveTlcReason::RemoveTlcFail(_fail) => { + write!(f, "RemoveTlcFail") + } } } } @@ -2385,6 +2387,30 @@ pub enum FiberChannelMessage { AnnouncementSignatures(AnnouncementSignatures), } +impl Display for FiberChannelMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + FiberChannelMessage::AcceptChannel(_) => write!(f, "AcceptChannel"), + FiberChannelMessage::CommitmentSigned(_) => write!(f, "CommitmentSigned"), + FiberChannelMessage::TxSignatures(_) => write!(f, "TxSignatures"), + FiberChannelMessage::ChannelReady(_) => write!(f, "ChannelReady"), + FiberChannelMessage::TxUpdate(_) => write!(f, "TxUpdate"), + FiberChannelMessage::TxComplete(_) => write!(f, "TxComplete"), + FiberChannelMessage::TxAbort(_) => write!(f, "TxAbort"), + FiberChannelMessage::TxInitRBF(_) => write!(f, "TxInitRBF"), + FiberChannelMessage::TxAckRBF(_) => write!(f, "TxAckRBF"), + FiberChannelMessage::Shutdown(_) => write!(f, "Shutdown"), + FiberChannelMessage::ClosingSigned(_) => write!(f, "ClosingSigned"), + FiberChannelMessage::UpdateTlcInfo(_) => write!(f, "UpdateTlcInfo"), + FiberChannelMessage::AddTlc(_) => write!(f, "AddTlc"), + FiberChannelMessage::RevokeAndAck(_) => write!(f, "RevokeAndAck"), + FiberChannelMessage::RemoveTlc(_) => write!(f, "RemoveTlc"), + FiberChannelMessage::ReestablishChannel(_) => write!(f, "ReestablishChannel"), + FiberChannelMessage::AnnouncementSignatures(_) => write!(f, "AnnouncementSignatures"), + } + } +} + impl FiberChannelMessage { pub fn get_channel_id(&self) -> Hash256 { match self { diff --git a/src/rpc/channel.rs b/src/rpc/channel.rs index 1a1024f5..42278169 100644 --- a/src/rpc/channel.rs +++ b/src/rpc/channel.rs @@ -421,8 +421,8 @@ where state: state.state.into(), local_balance: state.get_local_balance(), remote_balance: state.get_remote_balance(), - offered_tlc_balance: state.get_offered_tlc_balance(), - received_tlc_balance: state.get_received_tlc_balance(), + offered_tlc_balance: state.get_offered_tlc_balance(true), + received_tlc_balance: state.get_received_tlc_balance(true), latest_commitment_transaction_hash: state .latest_commitment_transaction .as_ref()