Skip to content

Commit

Permalink
Reply to Request::MempoolTransactionIds with mempool content (#2720)
Browse files Browse the repository at this point in the history
* reply to `Request::MempoolTransactionIds`

* remove boilerplate

* get storage from mempool with a method

* change panic message

* try fix for mac

* use normal init instead of init_tests for state service

* newline

* rustfmt

* fix test build
  • Loading branch information
oxarbitrage authored Sep 2, 2021
1 parent 44ac067 commit 9c220af
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 3 deletions.
4 changes: 4 additions & 0 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ impl StartCmd {
)
.await;

info!("initializing mempool");
let mempool = mempool::Mempool::new(config.network.network);

info!("initializing network");
// The service that our node uses to respond to requests by peers. The
// load_shed middleware ensures that we reduce the size of the peer set
Expand All @@ -77,6 +80,7 @@ impl StartCmd {
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
mempool,
));

let (peer_set, address_book) =
Expand Down
20 changes: 17 additions & 3 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use super::{
mempool,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};

mod downloads;
#[cfg(test)]
mod tests;

use downloads::Downloads as BlockDownloads;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
Expand Down Expand Up @@ -126,6 +132,9 @@ pub struct Inbound {

/// A service that manages cached blockchain state.
state: State,

/// A service that manages transactions in the memory pool.
mempool: mempool::Mempool,
}

impl Inbound {
Expand All @@ -134,6 +143,7 @@ impl Inbound {
state: State,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
mempool: mempool::Mempool,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
Expand All @@ -142,6 +152,7 @@ impl Inbound {
tx_verifier,
},
state,
mempool,
}
}

Expand Down Expand Up @@ -372,8 +383,11 @@ impl Service<zn::Request> for Inbound {
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::MempoolTransactionIds => {
debug!("ignoring unimplemented request");
async { Ok(zn::Response::Nil) }.boxed()
self.mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids),
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
})
.boxed()
}
zn::Request::Ping(_) => {
unreachable!("ping requests are handled internally");
Expand Down
69 changes: 69 additions & 0 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use tower::ServiceExt;

use super::mempool::{unmined_transactions_in_blocks, Mempool};

use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;

use zebra_chain::{
parameters::Network,
transaction::{UnminedTx, UnminedTxId},
};
use zebra_consensus::Config as ConsensusConfig;
use zebra_network::{Request, Response};
use zebra_state::Config as StateConfig;

#[tokio::test]
async fn mempool_requests_for_transaction_ids() {
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let mut mempool_service = Mempool::new(network);

let added_transaction_ids: Vec<UnminedTxId> =
add_some_stuff_to_mempool(&mut mempool_service, network)
.iter()
.map(|t| t.id)
.collect();

let (block_verifier, transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (_setup_tx, setup_rx) = oneshot::channel();

let inbound_service = ServiceBuilder::new()
.load_shed()
.buffer(1)
.service(super::Inbound::new(
setup_rx,
state_service,
block_verifier.clone(),
transaction_verifier.clone(),
mempool_service,
));

let request = inbound_service
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
};
}

fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
// get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Insert the genesis block coinbase transaction into the mempool storage.
mempool_service
.storage()
.insert(genesis_transactions.1[0].clone())
.unwrap();

genesis_transactions.1
}
8 changes: 8 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mod tests;

pub use self::crawler::Crawler;
pub use self::error::MempoolError;
#[cfg(test)]
pub use self::storage::tests::unmined_transactions_in_blocks;

#[derive(Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -62,6 +64,12 @@ impl Mempool {
storage: Default::default(),
}
}

/// Get the storage field of the mempool for testing purposes.
#[cfg(test)]
pub fn storage(&mut self) -> &mut storage::Storage {
&mut self.storage
}
}

impl Service<Request> for Mempool {
Expand Down

0 comments on commit 9c220af

Please sign in to comment.