diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index a6d325ff4..ed027c206 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -189,7 +189,7 @@ pub struct ChannelActor { store: S, } -impl ChannelActor { +impl ChannelActor { pub fn new(peer_id: PeerId, network: ActorRef, store: S) -> Self { Self { peer_id, @@ -647,6 +647,10 @@ impl ChannelActor { ) -> ProcessingChannelResult { debug!("Handling shutdown command: {:?}", &command); let flags = match state.state { + ChannelState::Closed => { + debug!("Channel already closed, ignoring shutdown command"); + return Ok(()); + } ChannelState::ChannelReady() => { debug!("Handling shutdown command in ChannelReady state"); ShuttingDownFlags::empty() @@ -654,9 +658,10 @@ impl ChannelActor { ChannelState::ShuttingDown(flags) => flags, _ => { debug!("Handling shutdown command in state {:?}", &state.state); - return Err(ProcessingChannelError::InvalidState( - "Trying to send shutdown message while in invalid state".to_string(), - )); + return Err(ProcessingChannelError::InvalidState(format!( + "Trying to send shutdown message while in invalid state {:?}", + &state.state + ))); } }; @@ -825,10 +830,12 @@ impl ChannelActor { ChannelCommand::Shutdown(command, reply) => { match self.handle_shutdown_command(state, command) { Ok(_) => { + debug!("Shutdown command processed successfully"); let _ = reply.send(Ok(())); Ok(()) } Err(err) => { + debug!("Error processing shutdown command: {:?}", &err); let _ = reply.send(Err(err.to_string())); Err(err) } @@ -875,6 +882,9 @@ impl ChannelActor { ChannelEvent::PeerDisconnected => { myself.stop(Some("PeerDisconnected".to_string())); } + ChannelEvent::ClosingTransactionConfirmed => { + myself.stop(Some("ChannelClosed".to_string())); + } } Ok(()) } @@ -1228,8 +1238,9 @@ where } match state.state { ChannelState::Closed => { - myself.stop(Some("ChannelClosed".to_string())); - self.store.delete_channel_actor_state(&state.get_id()); + debug!( + "The channel is closed, waiting for the closing transaction to be confirmed." + ); } _ => { self.store.insert_channel_actor_state(state.clone()); @@ -1464,8 +1475,9 @@ pub struct ClosedChannel {} #[derive(Debug)] pub enum ChannelEvent { - FundingTransactionConfirmed, PeerDisconnected, + FundingTransactionConfirmed, + ClosingTransactionConfirmed, } pub type ProcessingChannelResult = Result<(), ProcessingChannelError>; @@ -1583,6 +1595,12 @@ pub enum ChannelState { Closed, } +impl ChannelState { + fn is_closed(&self) -> bool { + matches!(self, ChannelState::Closed) + } +} + pub fn new_channel_id_from_seed(seed: &[u8]) -> Hash256 { blake2b_256(seed).into() } @@ -1875,6 +1893,10 @@ impl ChannelActorState { .as_micros() as u64 } + pub fn is_closed(&self) -> bool { + self.state.is_closed() + } + fn update_state(&mut self, new_state: ChannelState) { debug!( "Updating channel state from {:?} to {:?}", @@ -2800,7 +2822,11 @@ impl ChannelActorState { network .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::ChannelClosed(self.get_id(), self.peer_id.clone(), tx), + NetworkActorEvent::ClosingTransactionPending( + self.get_id(), + self.peer_id.clone(), + tx, + ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); } @@ -3977,7 +4003,24 @@ pub trait ChannelActorStateStore { fn insert_channel_actor_state(&self, state: ChannelActorState); fn delete_channel_actor_state(&self, id: &Hash256); fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec; + fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec { + self.get_channel_ids_by_peer(peer_id) + .into_iter() + .filter( + |id| matches!(self.get_channel_actor_state(id), Some(state) if !state.is_closed()), + ) + .collect() + } fn get_channel_states(&self, peer_id: Option) -> Vec<(PeerId, Hash256, ChannelState)>; + fn get_active_channel_states( + &self, + peer_id: Option, + ) -> Vec<(PeerId, Hash256, ChannelState)> { + self.get_channel_states(peer_id) + .into_iter() + .filter(|(_, _, state)| !state.is_closed()) + .collect() + } } /// A wrapper on CommitmentTransaction that has a partial signature along with diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 64627b77e..fab52a9bf 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -1,9 +1,9 @@ use ckb_jsonrpc_types::Status; use ckb_types::core::TransactionView; -use ckb_types::packed::{OutPoint, Script, Transaction}; +use ckb_types::packed::{Byte32, OutPoint, Script, Transaction}; use ckb_types::prelude::{IntoTransactionView, Pack, Unpack}; use ractor::{ - async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef, + async_trait as rasync_trait, call_t, Actor, ActorCell, ActorProcessingErr, ActorRef, RactorErr, RpcReplyPort, SupervisionEvent, }; use std::collections::{HashMap, HashSet}; @@ -137,11 +137,15 @@ pub enum NetworkServiceEvent { NetworkStarted(PeerId, Multiaddr), PeerConnected(PeerId, Multiaddr), PeerDisConnected(PeerId, Multiaddr), + // An incoming/outgoing channel is created. ChannelCreated(PeerId, Hash256), + // A outgoing channel is pending to be accepted. ChannelPendingToBeAccepted(PeerId, Hash256), + // The channel is ready to use (with funding transaction confirmed + // and both parties sent ChannelReady messages). ChannelReady(PeerId, Hash256), - ChannelShutDown(PeerId, Hash256), - ChannelClosed(PeerId, Hash256, TransactionView), + // The channel is closed (closing transaction is confirmed). + ChannelClosed(PeerId, Hash256), // We should sign a commitment transaction and send it to the other party. CommitmentSignaturePending(PeerId, Hash256, u64), // We have signed a commitment transaction and sent it to the other party. @@ -186,10 +190,8 @@ pub enum NetworkActorEvent { ), /// A channel is ready to use. ChannelReady(Hash256, PeerId), - /// A channel is being shutting down. - ChannelShutdown(Hash256, PeerId), /// A channel is already closed. - ChannelClosed(Hash256, PeerId, TransactionView), + ClosingTransactionPending(Hash256, PeerId, TransactionView), /// Both parties are now able to broadcast a valid funding transaction. FundingTransactionPending(Transaction, OutPoint, Hash256), @@ -203,6 +205,12 @@ pub enum NetworkActorEvent { /// A commitment transaction is signed by us and has sent to the other party. LocalCommitmentSigned(PeerId, Hash256, u64, TransactionView, Vec), + /// A closing transaction has been confirmed. + ClosingTransactionConfirmed(PeerId, Hash256, Byte32), + + /// A closing transaction has failed (either because of invalid transaction or timeout) + ClosingTransactionFailed(PeerId, Hash256, Byte32), + /// Network service events to be sent to outside observers. /// These events may be both present at `NetworkActorEvent` and /// this branch of `NetworkActorEvent`. This is because some events @@ -431,51 +439,6 @@ where )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } - NetworkActorEvent::ChannelShutdown(channel_id, peer_id) => { - info!( - "Channel ({:?}) to peer {:?} is being shutdown.", - channel_id, peer_id - ); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelShutDown(peer_id, channel_id), - ), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); - } - NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => { - state.on_channel_closed(&channel_id, &peer_id); - info!( - "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", - channel_id, peer_id, tx - ); - match call_t!( - self.chain_actor, - CkbChainMessage::SendTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - tx.clone() - ) - .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) - { - Ok(_) => { - info!("Closing transaction sent to the network: {:x}", tx.hash()); - } - Err(err) => { - error!("Failed to send closing transaction to the network: {}", err); - } - } - - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( - peer_id, channel_id, tx, - )), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); - } NetworkActorEvent::PeerMessage(peer_id, message) => { self.handle_peer_message(state, peer_id, message).await? } @@ -487,8 +450,24 @@ where NetworkActorEvent::FundingTransactionConfirmed(outpoint) => { state.on_funding_transaction_confirmed(outpoint).await; } - NetworkActorEvent::FundingTransactionFailed(_outpoint) => { - unimplemented!("handling funding transaction failed"); + NetworkActorEvent::FundingTransactionFailed(outpoint) => { + error!("Funding transaction failed: {:?}", outpoint); + } + NetworkActorEvent::ClosingTransactionPending(channel_id, peer_id, tx) => { + state + .on_closing_transaction_pending(channel_id, peer_id.clone(), tx.clone()) + .await; + } + NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, _tx_hash) => { + state + .on_closing_transaction_confirmed(&peer_id, &channel_id) + .await; + } + NetworkActorEvent::ClosingTransactionFailed(peer_id, tx_hash, channel_id) => { + error!( + "Closing transaction failed for channel {:?}, tx hash: {:?}, peer id: {:?}", + &channel_id, &tx_hash, &peer_id + ); } NetworkActorEvent::LocalCommitmentSigned( peer_id, @@ -873,6 +852,44 @@ impl NetworkActorState { Ok((channel, temp_channel_id, new_id)) } + async fn broadcast_tx_with_callback(&self, transaction: TransactionView, callback: F) + where + F: Send + 'static + FnOnce(Result>) -> R, + { + debug!("Trying to broadcast transaction {:?}", &transaction); + let chain = self.chain_actor.clone(); + call_t!( + &chain, + CkbChainMessage::SendTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + transaction.clone() + ) + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) + .expect("valid tx to broadcast"); + + let tx_hash = transaction.hash(); + info!("Transactoin sent to the network: {}", tx_hash); + + // TODO: make number of confirmation to transaction configurable. + const NUM_CONFIRMATIONS: u64 = 4; + let request = TraceTxRequest { + tx_hash: tx_hash.clone(), + confirmations: NUM_CONFIRMATIONS, + }; + + // Spawn a new task to avoid blocking current actor message processing. + ractor::concurrency::tokio_primatives::spawn(async move { + debug!("Tracing transaction status {:?}", &request.tx_hash); + let result = call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + request.clone() + ); + callback(result); + }); + } + fn get_peer_session(&self, peer_id: &PeerId) -> Option { self.peer_session_map.get(peer_id).cloned() } @@ -1017,7 +1034,7 @@ impl NetworkActorState { ) { self.peer_session_map.insert(peer_id.clone(), session.id); - for channel_id in store.get_channel_ids_by_peer(peer_id) { + for channel_id in store.get_active_channel_ids_by_peer(peer_id) { debug!("Reestablishing channel {:x}", &channel_id); if let Ok((channel, _)) = Actor::spawn_linked( Some(generate_channel_actor_name(&self.peer_id, peer_id)), @@ -1061,13 +1078,63 @@ impl NetworkActorState { } } - fn on_channel_closed(&mut self, id: &Hash256, peer_id: &PeerId) { - self.channels.remove(id); - if let Some(session) = self.get_peer_session(peer_id) { - if let Some(set) = self.session_channels_map.get_mut(&session) { - set.remove(id); + async fn on_closing_transaction_pending( + &mut self, + channel_id: Hash256, + peer_id: PeerId, + transaction: TransactionView, + ) { + let tx_hash: Byte32 = transaction.hash(); + info!( + "Channel ({:?}) to peer {:?} is closed. Broadcasting closing transaction ({:?}) now.", + &channel_id, &peer_id, &tx_hash + ); + let network: ActorRef = self.network.clone(); + self.broadcast_tx_with_callback(transaction, move |result| { + let message = match result { + Ok(Status::Committed) => { + info!("Cloisng transaction {:?} confirmed", &tx_hash); + NetworkActorEvent::ClosingTransactionConfirmed(peer_id, channel_id, tx_hash) + } + Ok(status) => { + error!( + "Closing transaction {:?} failed to be confirmed with final status {:?}", + &tx_hash, &status + ); + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash) + } + Err(err) => { + error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); + NetworkActorEvent::ClosingTransactionFailed(peer_id, channel_id, tx_hash) + } }; + network + .send_message(NetworkActorMessage::new_event(message)) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + }) + .await; + } + + async fn on_closing_transaction_confirmed(&mut self, peer_id: &PeerId, channel_id: &Hash256) { + self.channels.remove(&channel_id); + if let Some(session) = self.get_peer_session(&peer_id) { + if let Some(set) = self.session_channels_map.get_mut(&session) { + set.remove(&channel_id); + } } + self.send_message_to_channel_actor( + *channel_id, + ChannelActorMessage::Event(ChannelEvent::ClosingTransactionConfirmed), + ); + // Notify outside observers. + self.network + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( + peer_id.clone(), + *channel_id, + )), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); } pub async fn on_open_channel_msg( @@ -1129,61 +1196,28 @@ impl NetworkActorState { } self.pending_channels.insert(outpoint.clone(), channel_id); // TODO: try to broadcast the transaction to the network. + let transaction = transaction.into_view(); + let tx_hash = transaction.hash(); debug!( - "Funding transaction (outpoint {:?}) for channel {:?} is now ready. We can broadcast transaction {:?} now.", - &outpoint, &channel_id, &transaction + "Funding transaction (outpoint {:?}) for channel {:?} is now ready. Broadcast it {:?} now.", + &outpoint, &channel_id, &tx_hash ); - let transaction = transaction.into_view(); - debug!("Trying to broadcast funding transaction {:?}", &transaction); - - call_t!( - self.chain_actor, - CkbChainMessage::SendTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - transaction.clone() - ) - .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) - .expect("valid funding tx"); - - let hash = transaction.hash(); - - info!("Funding transactoin sent to the network: {}", hash); - - // Trace the transaction status. - - // TODO: make number of confirmation to transaction configurable. - const NUM_CONFIRMATIONS: u64 = 4; - let request = TraceTxRequest { - tx_hash: hash, - confirmations: NUM_CONFIRMATIONS, - }; - let chain = self.chain_actor.clone(); let network = self.network.clone(); - // Spawn a new task to avoid blocking current actor message processing. - ractor::concurrency::tokio_primatives::spawn(async move { - debug!("Tracing transaction status {:?}", &request.tx_hash); - let message = match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - request.clone() - ) { + self.broadcast_tx_with_callback(transaction, move |result| { + let message = match result { Ok(Status::Committed) => { - info!("Funding transaction {:?} confirmed", &request.tx_hash,); + info!("Funding transaction {:?} confirmed", &tx_hash); NetworkActorEvent::FundingTransactionConfirmed(outpoint) } Ok(status) => { error!( "Funding transaction {:?} failed to be confirmed with final status {:?}", - &request.tx_hash, &status + &tx_hash, &status ); NetworkActorEvent::FundingTransactionFailed(outpoint) } Err(err) => { - error!( - "Failed to trace transaction {:?}: {:?}", - &request.tx_hash, &err - ); + error!("Failed to trace transaction {:?}: {:?}", &tx_hash, &err); NetworkActorEvent::FundingTransactionFailed(outpoint) } }; @@ -1192,7 +1226,8 @@ impl NetworkActorState { network .send_message(NetworkActorMessage::new_event(message)) .expect(ASSUME_NETWORK_MYSELF_ALIVE); - }); + }) + .await; } async fn on_funding_transaction_confirmed(&mut self, outpoint: OutPoint) { diff --git a/src/rpc/channel.rs b/src/rpc/channel.rs index fb498f6a6..5912d3d7f 100644 --- a/src/rpc/channel.rs +++ b/src/rpc/channel.rs @@ -244,7 +244,7 @@ where ) -> Result { let mut channels: Vec<_> = self .store - .get_channel_states(params.peer_id) + .get_active_channel_states(params.peer_id) .into_iter() .filter_map(|(peer_id, channel_id, _state)| { self.store diff --git a/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru b/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru index 022658c31..976927e75 100644 --- a/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru +++ b/tests/bruno/e2e/3-nodes-transfer/22-node2-send-shutdown-channel-1.bru @@ -35,15 +35,11 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined + res.body.error: isUndefined + res.body.result: isNull } script:post-response { // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } } diff --git a/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru b/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru index f0d3d0bd7..4431e746d 100644 --- a/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru +++ b/tests/bruno/e2e/3-nodes-transfer/24-node3-send-shutdown-channel-2.bru @@ -35,15 +35,6 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined -} - -script:post-response { - // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step - await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } -} + res.body.error: isUndefined + res.body.result: isNull +} \ No newline at end of file diff --git a/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru b/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru index 9470f5b20..729b7c5f6 100644 --- a/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru +++ b/tests/bruno/e2e/udt/12-node2-send-shutdown-channel-error.bru @@ -35,15 +35,6 @@ body:json { } assert { - res.body.error: isDefined - res.body.result: isUndefined -} - -script:post-response { - // Sleep for sometime to make sure current operation finishes before next request starts. - // will get error message since channel is closed in previous step - await new Promise(r => setTimeout(r, 100)); - if (!(res.body.error.message === "Messaging failed because channel is closed")) { - throw new Error("Assertion failed: error message is not right"); - } -} + res.body.error: isUndefined + res.body.result: isNull +} \ No newline at end of file