diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index 41c41a28fd9..d8489dbe6b5 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -54,7 +54,7 @@ pub struct P2PAdapter { #[cfg(feature = "p2p")] p2p_service: Arc, #[cfg(feature = "p2p")] - tx_receiver: Receiver, + tx_receiver: Option>, } #[cfg(feature = "p2p")] @@ -67,10 +67,12 @@ impl Clone for P2PAdapter { #[cfg(feature = "p2p")] impl P2PAdapter { pub fn new(p2p_service: Arc) -> Self { - let tx_receiver = p2p_service.subscribe_tx(); Self { p2p_service, - tx_receiver, + // don't autogenerate a fresh receiver unless it is actually used + // otherwise we may encounter "lagged" errors on recv + // https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging + tx_receiver: None, } } diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 62f4584cf37..fbb8d06f4f4 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -46,8 +46,18 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter { } async fn next_gossiped_transaction(&mut self) -> Self::GossipedTransaction { + // lazily instantiate a long-lived tx receiver only when there + // is consumer for the messages to avoid lagging the channel + if self.tx_receiver.is_none() { + self.tx_receiver = Some(self.p2p_service.subscribe_tx()); + } // todo: handle unwrap - self.tx_receiver.recv().await.unwrap() + self.tx_receiver + .as_mut() + .expect("Should always be some") + .recv() + .await + .unwrap() } async fn notify_gossip_transaction_validity(