diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index c34d4b28da96..44c8f1ee8c81 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -139,8 +139,8 @@ where config: PoolConfig, /// Manages listeners for transaction state change events. event_listener: RwLock>, - /// Listeners for new pending transactions. - pending_transaction_listener: Mutex>, + /// Listeners for new _full_ pending transactions. + pending_transaction_listener: Mutex>, /// Listeners for new transactions added to the pool. transaction_listener: Mutex>>, /// Listener for new blob transaction sidecars added to the pool. @@ -232,7 +232,7 @@ where /// transaction inserted into the pool pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver { let (sender, rx) = mpsc::channel(PENDING_TX_LISTENER_BUFFER_SIZE); - let listener = PendingTransactionListener { sender, kind }; + let listener = PendingTransactionHashListener { sender, kind }; self.pending_transaction_listener.lock().push(listener); rx } @@ -520,24 +520,7 @@ where } // broadcast all pending transactions to the listener - for tx_hash in pending.pending_transactions(listener.kind) { - match listener.sender.try_send(tx_hash) { - Ok(()) => {} - Err(err) => { - return if matches!(err, mpsc::error::TrySendError::Full(_)) { - debug!( - target: "txpool", - "[{:?}] failed to send pending tx; channel full", - tx_hash, - ); - true - } else { - false - } - } - } - } - true + listener.send_all(pending.pending_transactions(listener.kind)) }); } @@ -551,20 +534,7 @@ where return !listener.sender.is_closed() } - match listener.sender.try_send(event.clone()) { - Ok(()) => true, - Err(err) => { - if matches!(err, mpsc::error::TrySendError::Full(_)) { - debug!( - target: "txpool", - "skipping transaction on full transaction listener", - ); - true - } else { - false - } - } - } + listener.send(event.clone()) }); } @@ -597,25 +567,7 @@ where { let mut transaction_listeners = self.pending_transaction_listener.lock(); transaction_listeners.retain_mut(|listener| { - // broadcast all pending transactions to the listener - for tx_hash in outcome.pending_transactions(listener.kind) { - match listener.sender.try_send(tx_hash) { - Ok(()) => {} - Err(err) => { - return if matches!(err, mpsc::error::TrySendError::Full(_)) { - debug!( - target: "txpool", - "[{:?}] failed to send pending tx; channel full", - tx_hash, - ); - true - } else { - false - } - } - } - } - true + listener.send_all(outcome.pending_transactions(listener.kind)) }); } @@ -825,12 +777,38 @@ impl fmt::Debug for PoolInner { /// An active listener for new pending transactions. #[derive(Debug)] -struct PendingTransactionListener { +struct PendingTransactionHashListener { sender: mpsc::Sender, /// Whether to include transactions that should not be propagated over the network. kind: TransactionListenerKind, } +impl PendingTransactionHashListener { + /// Attempts to send all hashes to the listener. + /// + /// Returns false if the channel is closed (receiver dropped) + fn send_all(&self, hashes: impl IntoIterator) -> bool { + for tx_hash in hashes.into_iter() { + match self.sender.try_send(tx_hash) { + Ok(()) => {} + Err(err) => { + return if matches!(err, mpsc::error::TrySendError::Full(_)) { + debug!( + target: "txpool", + "[{:?}] failed to send pending tx; channel full", + tx_hash, + ); + true + } else { + false + } + } + } + } + true + } +} + /// An active listener for new pending transactions. #[derive(Debug)] struct TransactionListener { @@ -839,6 +817,39 @@ struct TransactionListener { kind: TransactionListenerKind, } +impl TransactionListener { + /// Attempts to send the event to the listener. + /// + /// Returns false if the channel is closed (receiver dropped) + fn send(&self, event: NewTransactionEvent) -> bool { + self.send_all(std::iter::once(event)) + } + + /// Attempts to send all events to the listener. + /// + /// Returns false if the channel is closed (receiver dropped) + fn send_all(&self, events: impl IntoIterator>) -> bool { + for event in events.into_iter() { + match self.sender.try_send(event) { + Ok(()) => {} + Err(err) => { + return if let mpsc::error::TrySendError::Full(event) = err { + debug!( + target: "txpool", + "[{:?}] failed to send pending tx; channel full", + event.transaction.hash(), + ); + true + } else { + false + } + } + } + } + true + } +} + /// An active listener for new blobs #[derive(Debug)] struct BlobTransactionSidecarListener {