Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): Pause new downloads when Zebra reaches the lookahead limit #5561

Merged
merged 10 commits into from
Nov 9, 2022
3 changes: 2 additions & 1 deletion .github/workflows/continous-integration-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ jobs:
app_name: zebrad
test_id: full-sync-to-tip
test_description: Test a full sync up to the tip
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=600'
# The value of FULL_SYNC_MAINNET_TIMEOUT_MINUTES is currently ignored.
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=0'
# This test runs for longer than 6 hours, so it needs multiple jobs
is_long_test: true
needs_zebra_state: false
Expand Down
4 changes: 3 additions & 1 deletion zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Requ
pub use response::{ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
init, spawn_init, OutputIndex, OutputLocation, TransactionLocation,
init, spawn_init,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionLocation,
};

#[cfg(any(test, feature = "proptest-impl"))]
Expand Down
49 changes: 33 additions & 16 deletions zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tokio::{sync::watch, time::sleep};
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
Expand Down Expand Up @@ -83,8 +83,7 @@ pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPO
/// The default for the user-specified lookahead limit.
///
/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize =
zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 2;
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;

/// A lower bound on the user-specified concurrency limit.
///
Expand Down Expand Up @@ -359,6 +358,10 @@ where

/// The lengths of recent sync responses.
recent_syncs: RecentSyncLengths,

/// Receiver that is `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
}

/// Polls the network to determine whether further blocks are available and
Expand Down Expand Up @@ -438,6 +441,7 @@ where
}

let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);

// The Hedge middleware is the outermost layer, hedging requests
// between two retry-wrapped networks. The innermost timeout
// layer is relatively unimportant, because slow requests will
Expand All @@ -464,27 +468,33 @@ where

let (sync_status, recent_syncs) = SyncStatus::new();

let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);

let downloads = Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
past_lookahead_limit_sender,
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
));

let new_syncer = Self {
genesis_hash: genesis_hash(config.network.network),
max_checkpoint_height,
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
tip_network,
downloads: Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
// TODO: change the download lookahead for full verification?
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
)),
downloads,
state,
latest_chain_tip,
prospective_tips: HashSet::new(),
recent_syncs,
past_lookahead_limit_receiver,
};

(new_syncer, sync_status)
Expand Down Expand Up @@ -545,7 +555,14 @@ where
}
self.update_metrics();

while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) {
// Pause new downloads while the syncer or downloader are past their lookahead limits.
//
// To avoid a deadlock or long waits for blocks to expire, we ignore the download
// lookahead limit when there are only a small number of blocks waiting.
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
|| (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
&& self.past_lookahead_limit_receiver.cloned_watch_data())
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
{
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
Expand Down Expand Up @@ -957,7 +974,7 @@ where
}

/// The configured lookahead limit, based on the currently verified height,
/// and the number of hashes we haven't queued yet..
/// and the number of hashes we haven't queued yet.
fn lookahead_limit(&self, new_hashes: usize) -> usize {
let max_checkpoint_height: usize = self
.max_checkpoint_height
Expand Down
149 changes: 116 additions & 33 deletions zebrad/src/components/sync/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
collections::HashMap,
convert::{self, TryFrom},
pin::Pin,
sync::Arc,
sync::{Arc, TryLockError},
task::{Context, Poll},
};

Expand All @@ -15,7 +15,11 @@ use futures::{
};
use pin_project::pin_project;
use thiserror::Error;
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
time::timeout,
};
use tower::{hedge, Service, ServiceExt};
use tracing_futures::Instrument;

Expand All @@ -42,14 +46,17 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// to hold a few extra tips responses worth of blocks,
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
///
/// If this capacity is exceeded, the downloader will start failing download blocks with
/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset.
/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
///
/// Since the syncer queue is limited to the `lookahead_limit`,
/// the rest of the capacity is reserved for the other queues.
/// There is no reserved capacity for the syncer queue:
/// if the other queues stay full, the syncer will eventually time out and reset.
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 5;
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;

/// The maximum height difference between Zebra's state tip and a downloaded block.
/// Blocks higher than this will get dropped and return an error.
pub const VERIFICATION_PIPELINE_DROP_LIMIT: i32 = 50_000;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Copy, Clone, Debug)]
pub(super) struct AlwaysHedge;
Expand Down Expand Up @@ -89,6 +96,14 @@ pub enum BlockDownloadVerifyError {
hash: block::Hash,
},

/// A downloaded block was a long way ahead of the state chain tip.
/// This error should be very rare during normal operation.
///
/// We need to reset the syncer on this error, to allow the verifier and state to catch up,
/// or prevent it following a bad chain.
///
/// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
/// chain, or blocks far ahead of the current state tip.
#[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
AboveLookaheadHeightLimit {
height: block::Height,
Expand Down Expand Up @@ -157,6 +172,7 @@ where
ZSTip: ChainTip + Clone + Send + 'static,
{
// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
Expand All @@ -168,13 +184,24 @@ where
latest_chain_tip: ZSTip,

// Configuration
//
/// The configured lookahead limit, after applying the minimum limit.
lookahead_limit: usize,

/// The largest block height for the checkpoint verifier, based on the current config.
max_checkpoint_height: Height,

// Shared syncer state
//
/// Sender that is set to `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,

/// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,

// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<
Expand Down Expand Up @@ -259,15 +286,23 @@ where
network: ZN,
verifier: ZV,
latest_chain_tip: ZSTip,
past_lookahead_limit_sender: watch::Sender<bool>,
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
lookahead_limit: usize,
max_checkpoint_height: Height,
) -> Self {
let past_lookahead_limit_receiver =
zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());

Self {
network,
verifier,
latest_chain_tip,
lookahead_limit,
max_checkpoint_height,
past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
past_lookahead_limit_sender,
)),
past_lookahead_limit_receiver,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
Expand Down Expand Up @@ -307,9 +342,13 @@ where

let mut verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();

let lookahead_limit = self.lookahead_limit;
let max_checkpoint_height = self.max_checkpoint_height;

let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();

let task = tokio::spawn(
async move {
// Download the block.
Expand Down Expand Up @@ -346,19 +385,26 @@ where
// that will timeout before being verified.
let tip_height = latest_chain_tip.best_tip_height();

// TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification?
let max_lookahead_height = if let Some(tip_height) = tip_height {
let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
// Scale the height limit with the lookahead limit,
// so users with low capacity or under DoS can reduce them both.
let lookahead = i32::try_from(
let lookahead_pause = i32::try_from(
lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
)
.expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
.expect("fits in i32");


((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
} else {
let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
let genesis_lookahead =
u32::try_from(lookahead_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)

(block::Height(genesis_drop),
block::Height(genesis_lookahead),
block::Height(genesis_lookahead/2))
};

// Get the finalized tip height, assuming we're using the non-finalized state.
Expand Down Expand Up @@ -388,28 +434,59 @@ where
return Err(BlockDownloadVerifyError::InvalidHeight { hash });
};

if block_height > max_lookahead_height {
info!(
?hash,
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("sync.max.height.limit.dropped.block.count", 1);

// This error should be very rare during normal operation.
//
// We need to reset the syncer on this error,
// to allow the verifier and state to catch up,
// or prevent it following a bad chain.
//
// If we don't reset the syncer on this error,
// it will continue downloading blocks from a bad chain,
// (or blocks far ahead of the current state tip).
if block_height > lookahead_drop_height {
Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
} else if block_height < min_accepted_height {
} else if block_height > lookahead_pause_height {
// This log can be very verbose, usually hundreds of blocks are dropped.
// So we only log at info level for the first above-height block.
if !past_lookahead_limit_receiver.cloned_watch_data() {
info!(
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);

// Set the watched value to true, since we're over the limit.
//
// It is ok to block here, because we're going to pause new downloads anyway.
// But if Zebra is shutting down, ignore the send error.
let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
} else {
debug!(
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);
}

metrics::counter!("sync.max.height.limit.paused.count", 1);
} else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
// Try to reset the watched value to false, since we're well under the limit.
match past_lookahead_limit_sender.try_lock() {
Ok(watch_sender_guard) => {
// If Zebra is shutting down, ignore the send error.
let _ = watch_sender_guard.send(true);
metrics::counter!("sync.max.height.limit.reset.count", 1);
},
Err(TryLockError::Poisoned(_)) => panic!("thread panicked while holding the past_lookahead_limit_sender mutex guard"),
// We'll try allowing new downloads when we get the next block
Err(TryLockError::WouldBlock) => {}
}

metrics::counter!("sync.max.height.limit.reset.attempt.count", 1);
}

if block_height < min_accepted_height {
debug!(
?hash,
?block_height,
Expand Down Expand Up @@ -504,8 +581,14 @@ where
assert!(self.cancel_handles.is_empty());
}

/// Get the number of currently in-flight download tasks.
/// Get the number of currently in-flight download and verify tasks.
pub fn in_flight(&mut self) -> usize {
self.pending.len()
}

/// Returns true if there are no in-flight download and verify tasks.
#[allow(dead_code)]
pub fn is_empty(&mut self) -> bool {
self.pending.is_empty()
}
}
Loading