From e67b0482de702363235ba4936fd7224666d1fb02 Mon Sep 17 00:00:00 2001 From: link2xt Date: Mon, 20 May 2024 06:46:33 +0000 Subject: [PATCH] fix: ignore event channel overflows async-broadcast returns Overflowed error once if channel overflow happened. Public APIs such as get_next_event JSON-RPC method are only expecting an error if the channel is closed, so we should not propagate overflow error outside. In particular, Delta Chat Desktop stop receiving events completely if an error is returned once. If overflow happens, we should ignore it and try again until we get an event or an error because the channel is closed (in case of recv()) or empty (in case of try_recv()). --- src/events.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/events.rs b/src/events.rs index 40c24ebab8..296785a23f 100644 --- a/src/events.rs +++ b/src/events.rs @@ -71,7 +71,17 @@ impl EventEmitter { /// [`try_recv`]: Self::try_recv pub async fn recv(&self) -> Option { let mut lock = self.0.lock().await; - lock.recv().await.ok() + loop { + match lock.recv().await { + Err(async_broadcast::RecvError::Overflowed(_)) => { + // Some events have been lost, + // but the channel is not closed. + continue; + } + Err(async_broadcast::RecvError::Closed) => return None, + Ok(event) => return Some(event), + } + } } /// Tries to receive an event without blocking. @@ -86,8 +96,18 @@ impl EventEmitter { // to avoid blocking // in case there is a concurrent call to `recv`. let mut lock = self.0.try_lock()?; - let event = lock.try_recv()?; - Ok(event) + loop { + match lock.try_recv() { + Err(async_broadcast::TryRecvError::Overflowed(_)) => { + // Some events have been lost, + // but the channel is not closed. + continue; + } + res @ (Err(async_broadcast::TryRecvError::Empty) + | Err(async_broadcast::TryRecvError::Closed) + | Ok(_)) => return Ok(res?), + } + } } }