Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip download and verification if the transaction is already in the mempool or state #2718

Merged
merged 13 commits into from
Sep 8, 2021
Merged
15 changes: 10 additions & 5 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use tower::util::BoxService;

use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig;
use crate::{
components::{mempool, tokio::TokioComponent, ChainSync},
components::{
mempool::{self, Mempool},
tokio::{RuntimeRun, TokioComponent},
ChainSync, Inbound,
},
config::ZebradConfig,
prelude::*,
};

Expand Down Expand Up @@ -65,7 +69,8 @@ impl StartCmd {
.await;

info!("initializing mempool");
let mempool = mempool::Mempool::new(config.network.network);
let mempool_service = BoxService::new(Mempool::new(config.network.network));
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved

info!("initializing network");
// The service that our node uses to respond to requests by peers. The
Expand All @@ -80,7 +85,7 @@ impl StartCmd {
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
mempool,
mempool.clone(),
));

let (peer_set, address_book) =
Expand Down
16 changes: 11 additions & 5 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ use zebra_consensus::transaction;
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
use zebra_network::AddressBook;

use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
use super::mempool::{
self as mp,
downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
},
};
// Re-use the syncer timeouts for consistency.
use super::{
Expand All @@ -38,13 +41,14 @@ use downloads::Downloads as BlockDownloads;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type TxVerifier = Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>;
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State, Mempool>;

pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);

Expand Down Expand Up @@ -134,7 +138,7 @@ pub struct Inbound {
state: State,

/// A service that manages transactions in the memory pool.
mempool: mempool::Mempool,
mempool: Mempool,
}

impl Inbound {
Expand All @@ -143,7 +147,7 @@ impl Inbound {
state: State,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
mempool: mempool::Mempool,
mempool: Mempool,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
Expand Down Expand Up @@ -195,6 +199,7 @@ impl Service<zn::Request> for Inbound {
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
self.state.clone(),
self.mempool.clone(),
));
result = Ok(());
Setup::Initialized {
Expand Down Expand Up @@ -350,6 +355,7 @@ impl Service<zn::Request> for Inbound {
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
// https://github.com/ZcashFoundation/zebra/issues/2692
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(transactions) => {
Expand Down
7 changes: 5 additions & 2 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use super::mempool::{unmined_transactions_in_blocks, Mempool};

use tokio::sync::oneshot;
use tower::{builder::ServiceBuilder, ServiceExt};
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};

use zebra_chain::{
parameters::Network,
Expand All @@ -26,6 +26,9 @@ async fn mempool_requests_for_transactions() {
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();

let mempool_service = BoxService::new(mempool_service);
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);

let (block_verifier, transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
Expand All @@ -39,7 +42,7 @@ async fn mempool_requests_for_transactions() {
state_service,
block_verifier.clone(),
transaction_verifier.clone(),
mempool_service,
mempool,
));

// Test `Request::MempoolTransactionIds`
Expand Down
9 changes: 8 additions & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId},
};

use crate::BoxError;
pub use crate::BoxError;

mod crawler;
pub mod downloads;
Expand All @@ -35,12 +35,14 @@ pub use self::storage::tests::unmined_transactions_in_blocks;
pub enum Request {
TransactionIds,
TransactionsById(HashSet<UnminedTxId>),
RejectedTransactionIds(HashSet<UnminedTxId>),
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug)]
pub enum Response {
Transactions(Vec<UnminedTx>),
TransactionIds(Vec<UnminedTxId>),
RejectedTransactionIds(Vec<UnminedTxId>),
}

/// Mempool async management and query service.
Expand Down Expand Up @@ -93,6 +95,11 @@ impl Service<Request> for Mempool {
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(self.storage.clone().rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
}
}
}
96 changes: 81 additions & 15 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;

use crate::components::mempool as mp;
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -85,14 +86,16 @@ pub enum DownloadAction {
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
pub struct Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
// Services
/// A service that forwards requests to connected peers, and returns their
Expand All @@ -105,6 +108,9 @@ where
/// A service that manages cached blockchain state.
state: ZS,

/// A service that manages the mempool.
mempool: ZM,

// Internal downloads state
/// A list of pending transaction download and verify tasks.
#[pin]
Expand All @@ -115,14 +121,16 @@ where
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
}

impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Stream for Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
type Item = Result<UnminedTxId, BoxError>;

Expand Down Expand Up @@ -158,26 +166,29 @@ where
}
}

impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
/// Initialize a new download stream with the provided `network` and
/// `verifier` services.
///
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
pub fn new(network: ZN, verifier: ZV, state: ZS, mempool: ZM) -> Self {
Self {
network,
verifier,
state,
mempool,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
Expand Down Expand Up @@ -213,19 +224,11 @@ where

let network = self.network.clone();
let verifier = self.verifier.clone();
let state = self.state.clone();
let mut state = self.state.clone();
let mut mempool = self.mempool.clone();

let fut = async move {
// TODO: adapt this for transaction / mempool
// // Check if the block is already in the state.
// // BUG: check if the hash is in any chain (#862).
// // Depth only checks the main chain.
// match state.oneshot(zs::Request::Depth(hash)).await {
// Ok(zs::Response::Depth(None)) => Ok(()),
// Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
// Ok(_) => unreachable!("wrong response"),
// Err(e) => Err(e),
// }?;
Self::should_download(&mut state, &mut mempool, txid).await?;

let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
Expand Down Expand Up @@ -298,4 +301,67 @@ where

DownloadAction::AddedToQueue
}

/// Check if transaction should be downloaded and verified.
///
/// If it is already in the mempool (or in its rejected list)
/// or in state, then it shouldn't be downloaded (and an error is returned).
async fn should_download(
state: &mut ZS,
mempool: &mut ZM,
txid: UnminedTxId,
) -> Result<(), BoxError> {
// Check if the transaction is already in the mempool.
match mempool
.ready_and()
.await?
.call(mp::Request::TransactionsById(
[txid].iter().cloned().collect(),
))
.await
{
Ok(mp::Response::Transactions(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("already present in mempool".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is in the mempool rejected list.
match mempool
.oneshot(mp::Request::RejectedTransactionIds(
[txid].iter().cloned().collect(),
))
.await
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
{
Ok(mp::Response::RejectedTransactionIds(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("in mempool rejected list".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is already in the state.
match state
.ready_and()
.await?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

Ok(())
}
}
8 changes: 8 additions & 0 deletions zebrad/src/components/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,12 @@ impl Storage {
.filter(|tx| tx_ids.contains(&tx.id))
.collect()
}

/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
tx_ids
.into_iter()
.filter(|tx| self.rejected.contains_key(tx))
.collect()
}
}
13 changes: 13 additions & 0 deletions zebrad/src/components/mempool/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
assert!(!storage.clone().contains(&tx.id));
}

// Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE`
let all_ids: HashSet<UnminedTxId> = unmined_transactions.iter().map(|tx| tx.id).collect();
let rejected_ids: HashSet<UnminedTxId> = unmined_transactions
.iter()
.take(total_transactions - MEMPOOL_SIZE)
.map(|tx| tx.id)
.collect();
// Convert response to a `HashSet` as we need a fixed order to compare.
let rejected_response: HashSet<UnminedTxId> =
storage.rejected_transactions(all_ids).into_iter().collect();

assert_eq!(rejected_response, rejected_ids);

Ok(())
}

Expand Down
Loading