Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reply to Request::MempoolTransactionIds with mempool content #2720

Merged
merged 10 commits into from
Sep 2, 2021
4 changes: 4 additions & 0 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ impl StartCmd {
)
.await;

info!("initializing mempool");
let mempool = mempool::Mempool::new(config.network.network);

info!("initializing network");
// The service that our node uses to respond to requests by peers. The
// load_shed middleware ensures that we reduce the size of the peer set
Expand All @@ -77,6 +80,7 @@ impl StartCmd {
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
mempool,
));

let (peer_set, address_book) =
Expand Down
20 changes: 17 additions & 3 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use super::{
mempool,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};

mod downloads;
#[cfg(test)]
mod tests;

use downloads::Downloads as BlockDownloads;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
Expand Down Expand Up @@ -126,6 +132,9 @@ pub struct Inbound {

/// A service that manages cached blockchain state.
state: State,

/// A service that manages transactions in the memory pool.
mempool: mempool::Mempool,
}

impl Inbound {
Expand All @@ -134,6 +143,7 @@ impl Inbound {
state: State,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
mempool: mempool::Mempool,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
Expand All @@ -142,6 +152,7 @@ impl Inbound {
tx_verifier,
},
state,
mempool,
}
}

Expand Down Expand Up @@ -372,8 +383,11 @@ impl Service<zn::Request> for Inbound {
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::MempoolTransactionIds => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
self.mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids),
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
})
.boxed()
}
zn::Request::Ping(_) => {
unreachable!("ping requests are handled internally");
Expand Down
69 changes: 69 additions & 0 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use tower::ServiceExt;

use super::mempool::{unmined_transactions_in_blocks, Mempool};

use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;

use zebra_chain::{
parameters::Network,
transaction::{UnminedTx, UnminedTxId},
};
use zebra_consensus::Config as ConsensusConfig;
use zebra_network::{Request, Response};
use zebra_state::Config as StateConfig;

#[tokio::test]
async fn mempool_requests_for_transaction_ids() {
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let mut mempool_service = Mempool::new(network);

let added_transaction_ids: Vec<UnminedTxId> =
add_some_stuff_to_mempool(&mut mempool_service, network)
.iter()
.map(|t| t.id)
.collect();

let (block_verifier, transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (_setup_tx, setup_rx) = oneshot::channel();

let inbound_service = ServiceBuilder::new()
.load_shed()
.buffer(1)
.service(super::Inbound::new(
setup_rx,
state_service,
block_verifier.clone(),
transaction_verifier.clone(),
mempool_service,
));

let request = inbound_service
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
}

fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
// get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Insert the genesis block coinbase transaction into the mempool storage.
mempool_service
.storage()
.insert(genesis_transactions.1[0].clone())
.unwrap();

genesis_transactions.1
}
8 changes: 8 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mod tests;

pub use self::crawler::Crawler;
pub use self::error::MempoolError;
#[cfg(test)]
pub use self::storage::tests::unmined_transactions_in_blocks;

#[derive(Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -62,6 +64,12 @@ impl Mempool {
storage: Default::default(),
}
}

/// Get the storage field of the mempool for testing purposes.
#[cfg(test)]
pub fn storage(&mut self) -> &mut storage::Storage {
&mut self.storage
}
}

impl Service<Request> for Mempool {
Expand Down