diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 331aecf89874..9df6d666fc49 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -419,6 +419,33 @@ impl EnabledHandler { } } + // Drop the next message in queue if it's stale. + let mut peakable = self.send_queue.clone().peekable(); + if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) { + match message { + RpcOut::Publish { + message: _, + ref mut timeout, + } + | RpcOut::Forward { + message: _, + ref mut timeout, + } => { + if Pin::new(timeout).poll(cx).is_ready() { + // Drop the message. + // TODO: Should we also inform the network behaviour about the dropped message? + let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx)) + .expect("There should be a message"); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(dropped), + )); + } + } + // the next message in queue is not time bound. + _ => {} + } + } + Poll::Pending } }