Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Merge Notifier and TransactionsPoolNotifier #10591

Merged
merged 2 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions ethcore/light/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,22 @@ pub enum ImportDestination {
Future,
}

type Listener = Box<Fn(&[H256]) + Send + Sync>;

/// Light transaction queue. See module docs for more details.
#[derive(Default)]
pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>,
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
}

impl fmt::Debug for TransactionQueue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionQueue")
.field("by_account", &self.by_account)
.field("by_hash", &self.by_hash)
.field("listeners", &self.listeners.len())
.field("pending_listeners", &self.pending_listeners.len())
.field("full_listeners", &self.pending_listeners.len())
.finish()
}
}
Expand Down Expand Up @@ -360,30 +359,40 @@ impl TransactionQueue {
}

/// Add a transaction queue listener.
pub fn add_listener(&mut self, f: Listener) {
self.listeners.push(f);
pub fn pending_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
let (sender, receiver) = mpsc::unbounded();
self.pending_listeners.push(sender);
receiver
}

/// Add a transaction queue listener.
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
pub fn full_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.tx_statuses_listeners.push(sender);
self.full_listeners.push(sender);
receiver
}

/// Notifies all listeners about new pending transaction.
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
for listener in &self.listeners {
listener(hashes)
if status == TxStatus::Added {
let to_pending_send: Arc<Vec<H256>> = Arc::new(
hashes
.into_iter()
.map(|hash| hash.clone())
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());

}

let to_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
let to_full_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
hashes
.into_iter()
.map(|hash| (hash.clone(), status)).collect()
.map(|hash| (hash.clone(), status))
.collect()
);

self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok());
self.full_listeners.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
}
}

Expand Down
12 changes: 7 additions & 5 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ impl Miner {
}

/// Set a callback to be notified about imported transactions' hashes.
pub fn add_transactions_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
self.transaction_queue.add_listener(f);
pub fn pending_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_pending_listener(sender);
receiver
}

/// Set a callback to be notified
pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
/// Set a callback to be notified about imported transactions' hashes.
pub fn full_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_tx_pool_listener(sender);
self.transaction_queue.add_full_listener(sender);
receiver
}

Expand Down
100 changes: 37 additions & 63 deletions miner/src/pool/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,6 @@ use txpool::{self, VerifiedTransaction};
use pool::VerifiedTransaction as Transaction;
use pool::TxStatus;

type Listener = Box<Fn(&[H256]) + Send + Sync>;

/// Manages notifications to pending transaction listeners.
#[derive(Default)]
pub struct Notifier {
listeners: Vec<Listener>,
pending: Vec<H256>,
}

impl fmt::Debug for Notifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Notifier")
.field("listeners", &self.listeners.len())
.field("pending", &self.pending)
.finish()
}
}

impl Notifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: Listener) {
self.listeners.push(f)
}

/// Notify listeners about all currently pending transactions.
pub fn notify(&mut self) {
if self.pending.is_empty() {
return;
}

for l in &self.listeners {
(l)(&self.pending);
}

self.pending.clear();
}
}

impl txpool::Listener<Transaction> for Notifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.pending.push(*tx.hash());
}
}

/// Transaction pool logger.
#[derive(Default, Debug)]
pub struct Logger;
Expand Down Expand Up @@ -121,14 +77,20 @@ impl txpool::Listener<Transaction> for Logger {
/// Transactions pool notifier
#[derive(Default)]
pub struct TransactionsPoolNotifier {
listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
tx_statuses: Vec<(H256, TxStatus)>,
}

impl TransactionsPoolNotifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.listeners.push(f);
/// Add new full listener to receive notifications.
pub fn add_full_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.full_listeners.push(f);
}

/// Add new pending listener to receive notifications.
pub fn add_pending_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
self.pending_listeners.push(f);
}

/// Notify listeners about all currently transactions.
Expand All @@ -137,16 +99,25 @@ impl TransactionsPoolNotifier {
return;
}

let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.listeners
.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
let to_pending_send: Arc<Vec<H256>> = Arc::new(
self.tx_statuses.clone()
.into_iter()
.map(|(hash, _)| hash)
.collect()
);
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());

let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.full_listeners
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
}
}

impl fmt::Debug for TransactionsPoolNotifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionsPoolNotifier")
.field("listeners", &self.listeners.len())
.field("full_listeners", &self.full_listeners.len())
.field("pending_listeners", &self.pending_listeners.len())
.finish()
}
}
Expand Down Expand Up @@ -180,33 +151,36 @@ impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use types::transaction;
use txpool::Listener;
use futures::{Stream, Future};
use ethereum_types::Address;

#[test]
fn should_notify_listeners() {
// given
let received = Arc::new(Mutex::new(vec![]));
let r = received.clone();
let listener = Box::new(move |hashes: &[H256]| {
*r.lock() = hashes.iter().map(|x| *x).collect();
});
let (full_sender, full_receiver) = mpsc::unbounded();
let (pending_sender, pending_receiver) = mpsc::unbounded();

let mut tx_listener = Notifier::default();
tx_listener.add(listener);
let mut tx_listener = TransactionsPoolNotifier::default();
tx_listener.add_full_listener(full_sender);
tx_listener.add_pending_listener(pending_sender);

// when
let tx = new_tx();
tx_listener.added(&tx, None);
assert_eq!(*received.lock(), vec![]);

// then
tx_listener.notify();
let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap();
let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap();
assert_eq!(
full_res,
Some(Arc::new(vec![(serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)]))
);
assert_eq!(
*received.lock(),
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
pending_res,
Some(Arc::new(vec![serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()]))
);
}

Expand Down
16 changes: 7 additions & 9 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use pool::{
};
use pool::local_transactions::LocalTransactionsList;

type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier)));
type Listener = (LocalTransactionsList, (listener::TransactionsPoolNotifier, listener::Logger));
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;

/// Max cache time in milliseconds for pending transactions.
Expand Down Expand Up @@ -305,8 +305,6 @@ impl TransactionQueue {
// Notify about imported transactions.
(self.pool.write().listener_mut().1).0.notify();

((self.pool.write().listener_mut().1).1).1.notify();

if results.iter().any(|r| r.is_ok()) {
self.cached_pending.write().clear();
}
Expand Down Expand Up @@ -499,7 +497,7 @@ impl TransactionQueue {
/// removes them from the pool.
/// That method should be used if invalid transactions are detected
/// or you want to cancel a transaction.
pub fn remove<'a, T: IntoIterator<Item = &'a H256>>(
pub fn remove<'a, T: IntoIterator<Item=&'a H256>>(
&self,
hashes: T,
is_invalid: bool,
Expand Down Expand Up @@ -571,16 +569,16 @@ impl TransactionQueue {
self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect()
}

/// Add a callback to be notified about all transactions entering the pool.
pub fn add_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
/// Add a listener to be notified about all transactions the pool
pub fn add_pending_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
let mut pool = self.pool.write();
(pool.listener_mut().1).0.add(f);
(pool.listener_mut().1).0.add_pending_listener(f);
}

/// Add a listener to be notified about all transactions the pool
pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
pub fn add_full_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
let mut pool = self.pool.write();
((pool.listener_mut().1).1).1.add(f);
(pool.listener_mut().1).0.add_full_listener(f);
}

/// Check if pending set is cached.
Expand Down
26 changes: 7 additions & 19 deletions parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ impl FullDependencies {
}
Api::EthPubSub => {
if !for_generic_pubsub {
let pool_receiver = self.miner.pending_transactions_receiver();
let mut client =
EthPubSubClient::new(self.client.clone(), self.executor.clone());
EthPubSubClient::new(self.client.clone(), self.executor.clone(), pool_receiver);
let weak_client = Arc::downgrade(&self.client);

client.add_sync_notifier(self.sync.sync_notification(), move |state| {
Expand All @@ -345,14 +346,6 @@ impl FullDependencies {
})
});

let h = client.handler();
self.miner
.add_transactions_listener(Box::new(move |hashes| {
if let Some(h) = h.upgrade() {
h.notify_new_transactions(hashes);
}
}));

if let Some(h) = client.handler().upgrade() {
self.client.add_notify(h);
}
Expand All @@ -361,7 +354,7 @@ impl FullDependencies {
}
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.miner.tx_pool_receiver();
let receiver = self.miner.full_transactions_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
Expand Down Expand Up @@ -583,13 +576,16 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
}
}
Api::EthPubSub => {
let receiver = self.transaction_queue.write().pending_transactions_receiver();

let mut client = EthPubSubClient::light(
self.client.clone(),
self.on_demand.clone(),
self.sync.clone(),
self.cache.clone(),
self.executor.clone(),
self.gas_price_percentile,
receiver
);

let weak_client = Arc::downgrade(&self.client);
Expand All @@ -607,19 +603,11 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
});

self.client.add_listener(client.handler() as Weak<_>);
let h = client.handler();
self.transaction_queue
.write()
.add_listener(Box::new(move |transactions| {
if let Some(h) = h.upgrade() {
h.notify_new_transactions(transactions);
}
}));
handler.extend_with(EthPubSub::to_delegate(client));
}
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.transaction_queue.write().tx_statuses_receiver();
let receiver = self.transaction_queue.write().full_transactions_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
Expand Down
Loading