Skip to content

Commit

Permalink
fix(rebuild): don't clone receiver channel
Browse files Browse the repository at this point in the history
When creating a rebuild job we were reusing the same comms type which is used to do bi-dir
communication between the rebuild frontend and backend.
In some situations this could lead into blocking stats because the receiver channel was kept
alive by the frontend even after the frontend terminates.

Force closing the channel should in theory help but it seems the queue is not drained on close.
See: smol-rs/async-channel#23

Instead, let's split the channels into two types meaning only the backend has the receiver side.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Mar 20, 2023
1 parent aea67e1 commit 4063f73
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
29 changes: 27 additions & 2 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct RebuildJob {
src_uri: String,
/// Target URI of the out of sync child in need of a rebuild.
pub(crate) dst_uri: String,
comms: super::RebuildFBendChan,
comms: RebuildFBendChan,
/// Current state of the rebuild job.
states: Arc<parking_lot::RwLock<RebuildStates>>,
/// Channel used to Notify rebuild updates when the state changes.
Expand Down Expand Up @@ -86,7 +86,7 @@ impl RebuildJob {
src_uri: backend.src_uri.clone(),
dst_uri: backend.dst_uri.clone(),
states: backend.states.clone(),
comms: backend.info_chan.clone(),
comms: RebuildFBendChan::from(&backend.info_chan),
complete_chan: Arc::downgrade(&backend.complete_chan),
notify_chan: backend.notify_chan.1.clone(),
start_time: Utc::now(),
Expand Down Expand Up @@ -293,3 +293,28 @@ impl RebuildJob {

/// List of rebuild jobs indexed by the destination's replica uri.
type RebuildJobInstances = HashMap<String, Arc<RebuildJob>>;

#[derive(Debug, Clone)]
struct RebuildFBendChan {
sender: async_channel::Sender<RebuildJobRequest>,
}
impl RebuildFBendChan {
/// Forward the given request to the backend job.
async fn send(&self, req: RebuildJobRequest) -> Result<(), RebuildError> {
self.sender
.send(req)
.await
.map_err(|_| RebuildError::BackendGone)
}
/// Get a clone of the sender channel.
fn send_clone(&self) -> async_channel::Sender<RebuildJobRequest> {
self.sender.clone()
}
}
impl From<&super::RebuildFBendChan> for RebuildFBendChan {
fn from(value: &super::RebuildFBendChan) -> Self {
Self {
sender: value.sender_clone(),
}
}
}
27 changes: 10 additions & 17 deletions io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,25 @@ impl RebuildFBendChan {
receiver,
}
}
/// Forward the given request to the backend job.
pub(super) async fn send(
&self,
req: RebuildJobRequest,
) -> Result<(), RebuildError> {
self.sender
.send(req)
.await
.map_err(|_| RebuildError::BackendGone)
}
/// Get a clone of the sender channel.
pub(super) fn send_clone(
&self,
) -> async_channel::Sender<RebuildJobRequest> {
self.sender.clone()
}
async fn recv(&mut self) -> Result<RebuildJobRequest, RebuildError> {
self.receiver
.recv()
.await
.map_err(|_| RebuildError::FrontendGone {})
}

/// Get a clone of the receive channel.
fn recv_clone(&self) -> async_channel::Receiver<RebuildJobRequest> {
pub(super) fn recv_clone(
&self,
) -> async_channel::Receiver<RebuildJobRequest> {
self.receiver.clone()
}
/// Get a clone of the send channel.
pub(super) fn sender_clone(
&self,
) -> async_channel::Sender<RebuildJobRequest> {
self.sender.clone()
}
}

/// A rebuild job is responsible for managing a rebuild (copy) which reads
Expand Down

0 comments on commit 4063f73

Please sign in to comment.