Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add dedicated send functions #4903

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 66 additions & 55 deletions crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ where
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
Expand Down Expand Up @@ -232,7 +232,7 @@ where
/// transaction inserted into the pool
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
let (sender, rx) = mpsc::channel(PENDING_TX_LISTENER_BUFFER_SIZE);
let listener = PendingTransactionListener { sender, kind };
let listener = PendingTransactionHashListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
rx
}
Expand Down Expand Up @@ -520,24 +520,7 @@ where
}

// broadcast all pending transactions to the listener
for tx_hash in pending.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
listener.send_all(pending.pending_transactions(listener.kind))
});
}

Expand All @@ -551,20 +534,7 @@ where
return !listener.sender.is_closed()
}

match listener.sender.try_send(event.clone()) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"skipping transaction on full transaction listener",
);
true
} else {
false
}
}
}
listener.send(event.clone())
});
}

Expand Down Expand Up @@ -597,25 +567,7 @@ where
{
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// broadcast all pending transactions to the listener
for tx_hash in outcome.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
listener.send_all(outcome.pending_transactions(listener.kind))
});
}

Expand Down Expand Up @@ -825,12 +777,38 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {

/// An active listener for new pending transactions.
#[derive(Debug)]
struct PendingTransactionListener {
struct PendingTransactionHashListener {
sender: mpsc::Sender<TxHash>,
/// Whether to include transactions that should not be propagated over the network.
kind: TransactionListenerKind,
}

impl PendingTransactionHashListener {
/// Attempts to send all hashes to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
for tx_hash in hashes.into_iter() {
match self.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
}
}

/// An active listener for new pending transactions.
#[derive(Debug)]
struct TransactionListener<T: PoolTransaction> {
Expand All @@ -839,6 +817,39 @@ struct TransactionListener<T: PoolTransaction> {
kind: TransactionListenerKind,
}

impl<T: PoolTransaction> TransactionListener<T> {
/// Attempts to send the event to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send(&self, event: NewTransactionEvent<T>) -> bool {
self.send_all(std::iter::once(event))
}

/// Attempts to send all events to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
for event in events.into_iter() {
match self.sender.try_send(event) {
Ok(()) => {}
Err(err) => {
return if let mpsc::error::TrySendError::Full(event) = err {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
event.transaction.hash(),
);
true
} else {
false
}
}
}
}
true
}
}

/// An active listener for new blobs
#[derive(Debug)]
struct BlobTransactionSidecarListener {
Expand Down
Loading