From 9a51c658a56df8c49b7d2fb336fdc644aaa244f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 30 Jan 2024 22:26:28 +0000 Subject: [PATCH] drop the next message in queue if it's stale --- protocols/gossipsub/src/handler.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 331aecf89874..9df6d666fc49 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -419,6 +419,33 @@ impl EnabledHandler { } } + // Drop the next message in queue if it's stale. + let mut peakable = self.send_queue.clone().peekable(); + if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) { + match message { + RpcOut::Publish { + message: _, + ref mut timeout, + } + | RpcOut::Forward { + message: _, + ref mut timeout, + } => { + if Pin::new(timeout).poll(cx).is_ready() { + // Drop the message. + // TODO: Should we also inform the network behaviour about the dropped message? + let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx)) + .expect("There should be a message"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(dropped), + )); + } + } + // the next message in queue is not time bound. + _ => {} + } + } + Poll::Pending } }