Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): Temporarily set full verification concurrency to 30 blocks #4726

Merged
merged 16 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Next Release (Draft)

(Draft notes for the next release can be added here.)
This release improves Zebra's sync and verification performance under heavy load.
(TODO - complete the summary.)

### Configuration Changes

- Split the checkpoint and full verification [`sync` concurrency options](https://doc.zebra.zfnd.org/zebrad/config/struct.SyncSection.html) (#4726):
- Add a new `full_verify_concurrency_limit`
- Rename `max_concurrent_block_requests` to `download_concurrency_limit`
- Rename `lookahead_limit` to `checkpoint_verify_concurrency_limit`
For backwards compatibility, the old names are still accepted as aliases.

(TODO - insert changelog here)

## [Zebra 1.0.0-beta.12](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-beta.12) - 2022-06-29

Expand Down
13 changes: 10 additions & 3 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::{instrument, Span};

use zebra_chain::{
block::{self, Block},
block::{self, Block, Height},
parameters::Network,
};

Expand Down Expand Up @@ -163,7 +163,8 @@ where
/// config parameter and if the download is not already started.
///
/// Returns a block verifier, transaction verifier,
/// and the Groth16 parameter download task [`JoinHandle`].
/// the Groth16 parameter download task [`JoinHandle`],
/// and the maximum configured checkpoint verification height.
///
/// The consensus configuration is specified by `config`, and the Zcash network
/// to verify blocks for is specified by `network`.
Expand Down Expand Up @@ -203,6 +204,7 @@ pub async fn init<S>(
transaction::Request,
>,
JoinHandle<()>,
Height,
)
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
Expand Down Expand Up @@ -266,5 +268,10 @@ where

let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND);

(chain, transaction, groth16_download_handle)
(
chain,
transaction,
groth16_download_handle,
max_checkpoint_height,
)
}
4 changes: 2 additions & 2 deletions zebra-consensus/src/chain/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn verifiers_from_network(
+ 'static,
) {
let state_service = zs::init_test(network);
let (chain_verifier, _transaction_verifier, _groth16_download_handle) =
let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
crate::chain::init(Config::default(), network, state_service.clone(), true).await;

// We can drop the download task handle here, because:
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
// init_from_verifiers.
//
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (chain_verifier, _transaction_verifier, _groth16_download_handle) =
let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
super::init(config.clone(), network, zs::init_test(network), true).await;

// Add a timeout layer
Expand Down
28 changes: 17 additions & 11 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
//!
//! Some of the diagnostic features are optional, and need to be enabled at compile-time.

use std::cmp::max;

use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use futures::FutureExt;
Expand Down Expand Up @@ -119,13 +117,16 @@ impl StartCmd {
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.service(Inbound::new(setup_rx));
.service(Inbound::new(
config.sync.full_verify_concurrency_limit,
setup_rx,
));

let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await;

info!("initializing verifiers");
let (chain_verifier, tx_verifier, mut groth16_download_handle) =
let (chain_verifier, tx_verifier, mut groth16_download_handle, max_checkpoint_height) =
zebra_consensus::chain::init(
config.consensus.clone(),
config.network.network,
Expand All @@ -137,6 +138,7 @@ impl StartCmd {
info!("initializing syncer");
let (syncer, sync_status) = ChainSync::new(
&config,
max_checkpoint_height,
peer_set.clone(),
chain_verifier.clone(),
state.clone(),
Expand Down Expand Up @@ -342,15 +344,19 @@ impl StartCmd {
fn state_buffer_bound() -> usize {
let config = app_config().clone();

// Ignore the checkpoint verify limit, because it is very large.
//
// TODO: do we also need to account for concurrent use across services?
// we could multiply the maximum by 3/2, or add a fixed constant
max(
config.sync.max_concurrent_block_requests,
max(
inbound::downloads::MAX_INBOUND_CONCURRENCY,
mempool::downloads::MAX_INBOUND_CONCURRENCY,
),
)
[
config.sync.download_concurrency_limit,
config.sync.full_verify_concurrency_limit,
inbound::downloads::MAX_INBOUND_CONCURRENCY,
mempool::downloads::MAX_INBOUND_CONCURRENCY,
]
.into_iter()
.max()
.unwrap()
}
}

Expand Down
34 changes: 29 additions & 5 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use zebra_network::{
};
use zebra_node_services::mempool;

use crate::BoxError;

// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::BoxError;

use InventoryResponse::*;

Expand Down Expand Up @@ -85,6 +86,13 @@ pub enum Setup {
///
/// All requests are ignored.
Pending {
// Configuration
//
/// The configured full verification concurrency limit.
full_verify_concurrency_limit: usize,

// Services
//
/// A oneshot channel used to receive required services,
/// after they are set up.
setup: oneshot::Receiver<InboundSetupData>,
Expand All @@ -94,6 +102,8 @@ pub enum Setup {
///
/// All requests are answered.
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,

Expand Down Expand Up @@ -169,9 +179,15 @@ impl Inbound {
/// Create a new inbound service.
///
/// Dependent services are sent via the `setup` channel after initialization.
pub fn new(setup: oneshot::Receiver<InboundSetupData>) -> Inbound {
pub fn new(
full_verify_concurrency_limit: usize,
setup: oneshot::Receiver<InboundSetupData>,
) -> Inbound {
Inbound {
setup: Setup::Pending { setup },
setup: Setup::Pending {
full_verify_concurrency_limit,
setup,
},
}
}

Expand Down Expand Up @@ -200,7 +216,10 @@ impl Service<zn::Request> for Inbound {
let result;

self.setup = match self.take_setup() {
Setup::Pending { mut setup } => match setup.try_recv() {
Setup::Pending {
full_verify_concurrency_limit,
mut setup,
} => match setup.try_recv() {
Ok(setup_data) => {
let InboundSetupData {
address_book,
Expand All @@ -212,6 +231,7 @@ impl Service<zn::Request> for Inbound {
} = setup_data;

let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
state.clone(),
Expand All @@ -229,7 +249,10 @@ impl Service<zn::Request> for Inbound {
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
result = Ok(());
Setup::Pending { setup }
Setup::Pending {
full_verify_concurrency_limit,
setup,
}
}
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because setup failed
Expand All @@ -256,6 +279,7 @@ impl Service<zn::Request> for Inbound {
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}

result = Ok(());

Setup::Initialized {
address_book,
block_downloads,
Expand Down
51 changes: 36 additions & 15 deletions zebrad/src/components/inbound/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use zebra_chain::{
use zebra_network as zn;
use zebra_state as zs;

use crate::components::sync::MIN_CONCURRENCY_LIMIT;

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The maximum number of concurrent inbound download and verify tasks.
Expand Down Expand Up @@ -64,7 +66,7 @@ pub enum DownloadAction {
/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
/// to the tip. The queue's capacity is [`Downloads.full_verify_concurrency_limit`].
FullQueue,
}

Expand All @@ -80,7 +82,13 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
// Configuration
//
/// The configured full verification concurrency limit, after applying the minimum limit.
full_verify_concurrency_limit: usize,

// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
Expand All @@ -95,6 +103,7 @@ where
latest_chain_tip: zs::LatestChainTip,

// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<JoinHandle<Result<block::Hash, (BoxError, block::Hash)>>>,
Expand Down Expand Up @@ -162,8 +171,19 @@ where
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS, latest_chain_tip: zs::LatestChainTip) -> Self {
pub fn new(
full_verify_concurrency_limit: usize,
network: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: zs::LatestChainTip,
) -> Self {
// The syncer already warns about the minimum.
let full_verify_concurrency_limit =
full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);

Self {
full_verify_concurrency_limit,
network,
verifier,
state,
Expand All @@ -182,8 +202,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"block hash already queued for inbound download: ignored block",
);

metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
Expand All @@ -192,12 +212,12 @@ where
return DownloadAction::AlreadyQueued;
}

if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
if self.pending.len() >= self.full_verify_concurrency_limit {
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"too many blocks queued for inbound download: ignored block",
);

metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
Expand All @@ -213,6 +233,7 @@ where
let network = self.network.clone();
let verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();
let full_verify_concurrency_limit = self.full_verify_concurrency_limit;

let fut = async move {
// Check if the block is already in the state.
Expand All @@ -232,7 +253,7 @@ where
assert_eq!(
blocks.len(),
1,
"wrong number of blocks in response to a single hash"
"wrong number of blocks in response to a single hash",
);

blocks
Expand All @@ -257,11 +278,11 @@ where
let tip_height = latest_chain_tip.best_tip_height();

let max_lookahead_height = if let Some(tip_height) = tip_height {
let lookahead = i32::try_from(MAX_INBOUND_CONCURRENCY).expect("fits in i32");
let lookahead = i32::try_from(full_verify_concurrency_limit).expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
} else {
let genesis_lookahead =
u32::try_from(MAX_INBOUND_CONCURRENCY - 1).expect("fits in u32");
u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)
};

Expand Down Expand Up @@ -296,8 +317,8 @@ where
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?MAX_INBOUND_CONCURRENCY,
"gossiped block height too far ahead of the tip: dropped downloaded block"
lookahead_limit = full_verify_concurrency_limit,
"gossiped block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("gossip.max.height.limit.dropped.block.count", 1);

Expand All @@ -309,7 +330,7 @@ where
?tip_height,
?min_accepted_height,
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
"gossiped block height behind the finalized tip: dropped downloaded block"
"gossiped block height behind the finalized tip: dropped downloaded block",
);
metrics::counter!("gossip.min.height.limit.dropped.block.count", 1);

Expand Down Expand Up @@ -353,8 +374,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
concurrency_limit = self.full_verify_concurrency_limit,
"queued hash for download",
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);

Expand Down
Loading