Skip to content

Commit

Permalink
Apply the full verify concurrency limit to the inbound service
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Jul 1, 2022
1 parent de4f2c3 commit 1c6958d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 25 deletions.
5 changes: 4 additions & 1 deletion zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ impl StartCmd {
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.service(Inbound::new(setup_rx));
.service(Inbound::new(
config.sync.full_verify_concurrency_limit,
setup_rx,
));

let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await;
Expand Down
34 changes: 29 additions & 5 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use zebra_network::{
};
use zebra_node_services::mempool;

use crate::BoxError;

// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::BoxError;

use InventoryResponse::*;

Expand Down Expand Up @@ -85,6 +86,13 @@ pub enum Setup {
///
/// All requests are ignored.
Pending {
// Configuration
//
/// The configured full verification concurrency limit.
full_verify_concurrency_limit: usize,

// Services
//
/// A oneshot channel used to receive required services,
/// after they are set up.
setup: oneshot::Receiver<InboundSetupData>,
Expand All @@ -94,6 +102,8 @@ pub enum Setup {
///
/// All requests are answered.
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,

Expand Down Expand Up @@ -169,9 +179,15 @@ impl Inbound {
/// Create a new inbound service.
///
/// Dependent services are sent via the `setup` channel after initialization.
pub fn new(setup: oneshot::Receiver<InboundSetupData>) -> Inbound {
pub fn new(
full_verify_concurrency_limit: usize,
setup: oneshot::Receiver<InboundSetupData>,
) -> Inbound {
Inbound {
setup: Setup::Pending { setup },
setup: Setup::Pending {
full_verify_concurrency_limit,
setup,
},
}
}

Expand Down Expand Up @@ -200,7 +216,10 @@ impl Service<zn::Request> for Inbound {
let result;

self.setup = match self.take_setup() {
Setup::Pending { mut setup } => match setup.try_recv() {
Setup::Pending {
full_verify_concurrency_limit,
mut setup,
} => match setup.try_recv() {
Ok(setup_data) => {
let InboundSetupData {
address_book,
Expand All @@ -212,6 +231,7 @@ impl Service<zn::Request> for Inbound {
} = setup_data;

let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
state.clone(),
Expand All @@ -229,7 +249,10 @@ impl Service<zn::Request> for Inbound {
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
result = Ok(());
Setup::Pending { setup }
Setup::Pending {
full_verify_concurrency_limit,
setup,
}
}
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because setup failed
Expand All @@ -256,6 +279,7 @@ impl Service<zn::Request> for Inbound {
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}

result = Ok(());

Setup::Initialized {
address_book,
block_downloads,
Expand Down
51 changes: 36 additions & 15 deletions zebrad/src/components/inbound/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use zebra_chain::{
use zebra_network as zn;
use zebra_state as zs;

use crate::components::sync::MIN_CONCURRENCY_LIMIT;

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The maximum number of concurrent inbound download and verify tasks.
Expand Down Expand Up @@ -64,7 +66,7 @@ pub enum DownloadAction {
/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
/// to the tip. The queue's capacity is [`Downloads.full_verify_concurrency_limit`].
FullQueue,
}

Expand All @@ -80,7 +82,13 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
// Configuration
//
/// The configured full verification concurrency limit, after applying the minimum limit.
full_verify_concurrency_limit: usize,

// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
Expand All @@ -95,6 +103,7 @@ where
latest_chain_tip: zs::LatestChainTip,

// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<JoinHandle<Result<block::Hash, (BoxError, block::Hash)>>>,
Expand Down Expand Up @@ -162,8 +171,19 @@ where
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS, latest_chain_tip: zs::LatestChainTip) -> Self {
pub fn new(
full_verify_concurrency_limit: usize,
network: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: zs::LatestChainTip,
) -> Self {
// The syncer already warns about the minimum.
let full_verify_concurrency_limit =
full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);

Self {
full_verify_concurrency_limit,
network,
verifier,
state,
Expand All @@ -182,8 +202,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"block hash already queued for inbound download: ignored block",
);

metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
Expand All @@ -192,12 +212,12 @@ where
return DownloadAction::AlreadyQueued;
}

if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
if self.pending.len() >= self.full_verify_concurrency_limit {
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"too many blocks queued for inbound download: ignored block",
);

metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
Expand All @@ -213,6 +233,7 @@ where
let network = self.network.clone();
let verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();
let full_verify_concurrency_limit = self.full_verify_concurrency_limit;

let fut = async move {
// Check if the block is already in the state.
Expand All @@ -232,7 +253,7 @@ where
assert_eq!(
blocks.len(),
1,
"wrong number of blocks in response to a single hash"
"wrong number of blocks in response to a single hash",
);

blocks
Expand All @@ -257,11 +278,11 @@ where
let tip_height = latest_chain_tip.best_tip_height();

let max_lookahead_height = if let Some(tip_height) = tip_height {
let lookahead = i32::try_from(MAX_INBOUND_CONCURRENCY).expect("fits in i32");
let lookahead = i32::try_from(full_verify_concurrency_limit).expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
} else {
let genesis_lookahead =
u32::try_from(MAX_INBOUND_CONCURRENCY - 1).expect("fits in u32");
u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)
};

Expand Down Expand Up @@ -296,8 +317,8 @@ where
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?MAX_INBOUND_CONCURRENCY,
"gossiped block height too far ahead of the tip: dropped downloaded block"
lookahead_limit = full_verify_concurrency_limit,
"gossiped block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("gossip.max.height.limit.dropped.block.count", 1);

Expand All @@ -309,7 +330,7 @@ where
?tip_height,
?min_accepted_height,
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
"gossiped block height behind the finalized tip: dropped downloaded block"
"gossiped block height behind the finalized tip: dropped downloaded block",
);
metrics::counter!("gossip.min.height.limit.dropped.block.count", 1);

Expand Down Expand Up @@ -353,8 +374,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
concurrency_limit = self.full_verify_concurrency_limit,
"queued hash for download",
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);

Expand Down
4 changes: 2 additions & 2 deletions zebrad/src/components/inbound/tests/fake_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion};

use crate::{
components::{
inbound::{Inbound, InboundSetupData},
inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData},
mempool::{
gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig,
Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError,
Expand Down Expand Up @@ -785,7 +785,7 @@ async fn setup(

let inbound_service = ServiceBuilder::new()
.load_shed()
.service(Inbound::new(setup_rx));
.service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);

Expand Down
4 changes: 2 additions & 2 deletions zebrad/src/components/inbound/tests/real_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion};

use crate::{
components::{
inbound::{Inbound, InboundSetupData},
inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData},
mempool::{gossip_mempool_transaction_id, Config as MempoolConfig, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
Expand Down Expand Up @@ -637,7 +637,7 @@ async fn setup(

// Inbound
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = Inbound::new(setup_rx);
let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx);
let inbound_service = ServiceBuilder::new()
.boxed_clone()
.load_shed()
Expand Down

0 comments on commit 1c6958d

Please sign in to comment.