Skip to content

Commit

Permalink
Use debug_enable_at_height in the mempool crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 authored and conradoplg committed Oct 12, 2021
1 parent 21c1a5a commit ce51b29
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 22 deletions.
11 changes: 8 additions & 3 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,17 @@ impl StartCmd {

let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change,
chain_tip_change.clone(),
peer_set.clone(),
));

let mempool_crawler_task_handle =
mempool::Crawler::spawn(peer_set.clone(), mempool, sync_status);
let mempool_crawler_task_handle = mempool::Crawler::spawn(
&config.mempool,
peer_set.clone(),
mempool,
sync_status,
chain_tip_change,
);

let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id(
mempool_transaction_receiver,
Expand Down
74 changes: 67 additions & 7 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
use std::time::Duration;

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{task::JoinHandle, time::sleep};
use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};

use zebra_chain::block::Height;
use zebra_network as zn;
use zebra_state::ChainTipChange;

use super::{
super::{mempool, sync::SyncStatus},
downloads::Gossip,
use crate::components::{
mempool::{self, downloads::Gossip, Config},
sync::SyncStatus,
};

#[cfg(test)]
Expand All @@ -35,9 +37,20 @@ const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);

/// The mempool transaction crawler.
pub struct Crawler<PeerSet, Mempool> {
/// The network peer set to crawl.
peer_set: Timeout<PeerSet>,

/// The mempool service that receives crawled transaction IDs.
mempool: Mempool,

/// Allows checking if we are near the tip to enable/disable the mempool crawler.
sync_status: SyncStatus,

/// Notifies the crawler when the best chain tip height changes.
chain_tip_change: ChainTipChange,

/// If the state's best chain tip has reached this height, always enable the mempool crawler.
debug_enable_at_height: Option<Height>,
}

impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
Expand All @@ -51,19 +64,67 @@ where
{
/// Spawn an asynchronous task to run the mempool crawler.
pub fn spawn(
config: &Config,
peer_set: PeerSet,
mempool: Mempool,
sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
mempool,
sync_status,
chain_tip_change,
debug_enable_at_height: config.debug_enable_at_height.map(Height),
};

tokio::spawn(crawler.run())
}

/// Waits until the mempool crawler is enabled by a debug config option.
///
/// Returns an error if communication with the state is lost.
async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> {
// optimise non-debug performance
if self.debug_enable_at_height.is_none() {
return future::pending().await;
}

let enable_at_height = self
.debug_enable_at_height
.expect("unexpected debug_enable_at_height: just checked for None");

loop {
let best_tip_height = self
.chain_tip_change
.wait_for_tip_change()
.await?
.best_tip_height();

if best_tip_height >= enable_at_height {
return Ok(());
}
}
}

/// Waits until the mempool crawler is enabled.
///
/// Returns an error if communication with the syncer or state is lost.
async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> {
let mut sync_status = self.sync_status.clone();
let tip_future = sync_status.wait_until_close_to_tip();
let debug_future = self.wait_until_enabled_by_debug();

pin_mut!(tip_future);
pin_mut!(debug_future);

let (result, _unready_future) = future::select(tip_future, debug_future)
.await
.factor_first();

result
}

/// Periodically crawl peers for transactions to include in the mempool.
///
/// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
Expand All @@ -72,8 +133,7 @@ where
info!("initializing mempool crawler task");

loop {
self.sync_status.wait_until_close_to_tip().await?;

self.wait_until_enabled().await?;
self.crawl_transactions().await?;
sleep(RATE_LIMIT_DELAY).await;
}
Expand Down
50 changes: 38 additions & 12 deletions zebrad/src/components/mempool/crawler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ use std::time::Duration;
use proptest::{collection::vec, prelude::*};
use tokio::time;

use zebra_chain::transaction::UnminedTxId;
use zebra_chain::{parameters::Network, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_state::ChainTipSender;
use zebra_test::mock_service::{MockService, PropTestAssertion};

use super::{
super::{
super::{mempool, sync::RecentSyncLengths},
use crate::components::{
mempool::{
self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
downloads::Gossip,
error::MempoolError,
Config,
},
Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY,
sync::RecentSyncLengths,
};

/// The number of iterations to crawl while testing.
Expand Down Expand Up @@ -54,7 +57,8 @@ proptest! {
sync_lengths.push(0);

runtime.block_on(async move {
let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths) = setup_crawler();
let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths, _chain_tip_sender,
) = setup_crawler();

time::pause();

Expand Down Expand Up @@ -103,7 +107,7 @@ proptest! {
let transaction_id_count = transaction_ids.len();

runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) =
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();

time::pause();
Expand Down Expand Up @@ -144,7 +148,7 @@ proptest! {
let _guard = runtime.enter();

runtime.block_on(async move {
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) =
let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths, _chain_tip_sender) =
setup_crawler();

time::pause();
Expand Down Expand Up @@ -218,14 +222,36 @@ proptest! {
}

/// Spawn a crawler instance using mock services.
fn setup_crawler() -> (MockPeerSet, MockMempool, SyncStatus, RecentSyncLengths) {
fn setup_crawler() -> (
MockPeerSet,
MockMempool,
SyncStatus,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests();
let mempool = MockService::build().for_prop_tests();
let (sync_status, recent_sync_lengths) = SyncStatus::new();

Crawler::spawn(peer_set.clone(), mempool.clone(), sync_status.clone());

(peer_set, mempool, sync_status, recent_sync_lengths)
// the network should be irrelevant here
let (chain_tip_sender, _latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, Network::Mainnet);

Crawler::spawn(
&Config::default(),
peer_set.clone(),
mempool.clone(),
sync_status.clone(),
chain_tip_change,
);

(
peer_set,
mempool,
sync_status,
recent_sync_lengths,
chain_tip_sender,
)
}

/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list.
Expand Down

0 comments on commit ce51b29

Please sign in to comment.