Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lazily instantiate receiver for p2p txs #861

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct P2PAdapter {
#[cfg(feature = "p2p")]
p2p_service: Arc<P2PService>,
#[cfg(feature = "p2p")]
tx_receiver: Receiver<fuel_core_types::services::p2p::TransactionGossipData>,
tx_receiver: Option<Receiver<fuel_core_types::services::p2p::TransactionGossipData>>,
}

#[cfg(feature = "p2p")]
Expand All @@ -67,10 +67,12 @@ impl Clone for P2PAdapter {
#[cfg(feature = "p2p")]
impl P2PAdapter {
pub fn new(p2p_service: Arc<P2PService>) -> Self {
let tx_receiver = p2p_service.subscribe_tx();
Self {
p2p_service,
tx_receiver,
// don't autogenerate a fresh receiver unless it is actually used
// otherwise we may encounter "lagged" errors on recv
// https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging
tx_receiver: None,
}
}

Expand Down
12 changes: 11 additions & 1 deletion crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
}

async fn next_gossiped_transaction(&mut self) -> Self::GossipedTransaction {
// lazily instantiate a long-lived tx receiver only when there
// is consumer for the messages to avoid lagging the channel
if self.tx_receiver.is_none() {
self.tx_receiver = Some(self.p2p_service.subscribe_tx());
}
// todo: handle unwrap
self.tx_receiver.recv().await.unwrap()
self.tx_receiver
.as_mut()
.expect("Should always be some")
.recv()
.await
.unwrap()
}

async fn notify_gossip_transaction_validity(
Expand Down