Skip to content

Commit

Permalink
Merge pull request #124 from contrun/keep-closed-channels
Browse files Browse the repository at this point in the history
Add option to keep closed channels
  • Loading branch information
quake authored Jul 9, 2024
2 parents 3cdb357 + 6e18ed6 commit 5676a32
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 142 deletions.
59 changes: 51 additions & 8 deletions src/ckb/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub struct ChannelActor<S> {
store: S,
}

impl<S> ChannelActor<S> {
impl<S: ChannelActorStateStore> ChannelActor<S> {
pub fn new(peer_id: PeerId, network: ActorRef<NetworkActorMessage>, store: S) -> Self {
Self {
peer_id,
Expand Down Expand Up @@ -647,16 +647,21 @@ impl<S> ChannelActor<S> {
) -> ProcessingChannelResult {
debug!("Handling shutdown command: {:?}", &command);
let flags = match state.state {
ChannelState::Closed => {
debug!("Channel already closed, ignoring shutdown command");
return Ok(());
}
ChannelState::ChannelReady() => {
debug!("Handling shutdown command in ChannelReady state");
ShuttingDownFlags::empty()
}
ChannelState::ShuttingDown(flags) => flags,
_ => {
debug!("Handling shutdown command in state {:?}", &state.state);
return Err(ProcessingChannelError::InvalidState(
"Trying to send shutdown message while in invalid state".to_string(),
));
return Err(ProcessingChannelError::InvalidState(format!(
"Trying to send shutdown message while in invalid state {:?}",
&state.state
)));
}
};

Expand Down Expand Up @@ -825,10 +830,12 @@ impl<S> ChannelActor<S> {
ChannelCommand::Shutdown(command, reply) => {
match self.handle_shutdown_command(state, command) {
Ok(_) => {
debug!("Shutdown command processed successfully");
let _ = reply.send(Ok(()));
Ok(())
}
Err(err) => {
debug!("Error processing shutdown command: {:?}", &err);
let _ = reply.send(Err(err.to_string()));
Err(err)
}
Expand Down Expand Up @@ -875,6 +882,9 @@ impl<S> ChannelActor<S> {
ChannelEvent::PeerDisconnected => {
myself.stop(Some("PeerDisconnected".to_string()));
}
ChannelEvent::ClosingTransactionConfirmed => {
myself.stop(Some("ChannelClosed".to_string()));
}
}
Ok(())
}
Expand Down Expand Up @@ -1228,8 +1238,9 @@ where
}
match state.state {
ChannelState::Closed => {
myself.stop(Some("ChannelClosed".to_string()));
self.store.delete_channel_actor_state(&state.get_id());
debug!(
"The channel is closed, waiting for the closing transaction to be confirmed."
);
}
_ => {
self.store.insert_channel_actor_state(state.clone());
Expand Down Expand Up @@ -1464,8 +1475,9 @@ pub struct ClosedChannel {}

#[derive(Debug)]
pub enum ChannelEvent {
FundingTransactionConfirmed,
PeerDisconnected,
FundingTransactionConfirmed,
ClosingTransactionConfirmed,
}

pub type ProcessingChannelResult = Result<(), ProcessingChannelError>;
Expand Down Expand Up @@ -1583,6 +1595,12 @@ pub enum ChannelState {
Closed,
}

impl ChannelState {
fn is_closed(&self) -> bool {
matches!(self, ChannelState::Closed)
}
}

pub fn new_channel_id_from_seed(seed: &[u8]) -> Hash256 {
blake2b_256(seed).into()
}
Expand Down Expand Up @@ -1875,6 +1893,10 @@ impl ChannelActorState {
.as_micros() as u64
}

pub fn is_closed(&self) -> bool {
self.state.is_closed()
}

fn update_state(&mut self, new_state: ChannelState) {
debug!(
"Updating channel state from {:?} to {:?}",
Expand Down Expand Up @@ -2800,7 +2822,11 @@ impl ChannelActorState {

network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::ChannelClosed(self.get_id(), self.peer_id.clone(), tx),
NetworkActorEvent::ClosingTransactionPending(
self.get_id(),
self.peer_id.clone(),
tx,
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
}
Expand Down Expand Up @@ -3977,7 +4003,24 @@ pub trait ChannelActorStateStore {
fn insert_channel_actor_state(&self, state: ChannelActorState);
fn delete_channel_actor_state(&self, id: &Hash256);
fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256>;
fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256> {
self.get_channel_ids_by_peer(peer_id)
.into_iter()
.filter(
|id| matches!(self.get_channel_actor_state(id), Some(state) if !state.is_closed()),
)
.collect()
}
fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)>;
fn get_active_channel_states(
&self,
peer_id: Option<PeerId>,
) -> Vec<(PeerId, Hash256, ChannelState)> {
self.get_channel_states(peer_id)
.into_iter()
.filter(|(_, _, state)| !state.is_closed())
.collect()
}
}

/// A wrapper on CommitmentTransaction that has a partial signature along with
Expand Down
Loading

0 comments on commit 5676a32

Please sign in to comment.