Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus] use dynamic timeouts in commit sync #19705

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 62 additions & 18 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,15 @@ impl<C: NetworkClient> CommitSyncer<C> {
inner: Arc<Inner<C>>,
commit_range: CommitRange,
) -> (CommitIndex, Vec<TrustedCommit>, Vec<VerifiedBlock>) {
// Individual request base timeout.
const TIMEOUT: Duration = Duration::from_secs(10);
// Max per-request timeout will be base timeout times a multiplier.
// At the extreme, this means there will be 120s timeout to fetch max_blocks_per_fetch blocks.
const MAX_TIMEOUT_MULTIPLIER: u32 = 12;
// timeout * max number of targets should be reasonably small, so the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should commit_sync_parallel_fetches be considered here as well?

// system can adjust to slow network or large data sizes quickly.
const MAX_NUM_TARGETS: usize = 24;
let mut timeout_multiplier = 0;
let _timer = inner
.context
.metrics
Expand All @@ -381,7 +390,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
.start_timer();
info!("Starting to fetch commits in {commit_range:?} ...",);
loop {
let mut all_authorities = inner
// Attempt to fetch commits and blocks through min(committee size, MAX_NUM_TARGETS) peers.
let mut target_authorities = inner
.context
.committee
.authorities()
Expand All @@ -393,21 +403,42 @@ impl<C: NetworkClient> CommitSyncer<C> {
}
})
.collect_vec();
all_authorities.shuffle(&mut ThreadRng::default());
for authority in all_authorities {
match Self::fetch_once(inner.clone(), authority, commit_range.clone()).await {
Ok((commits, blocks)) => {
target_authorities.shuffle(&mut ThreadRng::default());
target_authorities.truncate(MAX_NUM_TARGETS);
// Increase timeout multiplier for each loop until MAX_TIMEOUT_MULTIPLIER.
timeout_multiplier = (timeout_multiplier + 1).min(MAX_TIMEOUT_MULTIPLIER);
let request_timeout = TIMEOUT * timeout_multiplier;
// Give enough overall timeout for fetching commits and blocks.
// - Timeout for fetching commits and commit certifying blocks.
// - Timeout for fetching blocks referenced by the commits.
// - Time spent on pipelining requests to fetch blocks.
// - Another headroom to allow fetch_once() to timeout gracefully if possible.
let fetch_timeout = request_timeout * 4;
// Try fetching from selected target authority.
for authority in target_authorities {
match tokio::time::timeout(
fetch_timeout,
Self::fetch_once(
inner.clone(),
authority,
commit_range.clone(),
request_timeout,
),
)
.await
{
Ok(Ok((commits, blocks))) => {
info!("Finished fetching commits in {commit_range:?}",);
return (commit_range.end(), commits, blocks);
}
Err(e) => {
Ok(Err(e)) => {
let hostname = inner
.context
.committee
.authority(authority)
.hostname
.clone();
warn!("Failed to fetch from {hostname}: {}", e);
warn!("Failed to fetch {commit_range:?} from {hostname}: {}", e);
let error: &'static str = e.into();
inner
.context
Expand All @@ -417,6 +448,22 @@ impl<C: NetworkClient> CommitSyncer<C> {
.with_label_values(&[&hostname, error])
.inc();
}
Err(_) => {
let hostname = inner
.context
.committee
.authority(authority)
.hostname
.clone();
warn!("Timed out fetching {commit_range:?} from {authority}",);
inner
.context
.metrics
.node_metrics
.commit_sync_fetch_once_errors
.with_label_values(&[&hostname, "FetchTimeout"])
.inc();
}
}
}
}
Expand All @@ -429,10 +476,8 @@ impl<C: NetworkClient> CommitSyncer<C> {
inner: Arc<Inner<C>>,
target_authority: AuthorityIndex,
commit_range: CommitRange,
timeout: Duration,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(15);
const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(120);

let _timer = inner
.context
.metrics
Expand All @@ -443,11 +488,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
// 1. Fetch commits in the commit range from the target authority.
let (serialized_commits, serialized_blocks) = inner
.network_client
.fetch_commits(
target_authority,
commit_range.clone(),
FETCH_COMMITS_TIMEOUT,
)
.fetch_commits(target_authority, commit_range.clone(), timeout)
.await?;

// 2. Verify the response contains blocks that can certify the last returned commit,
Expand All @@ -462,23 +503,26 @@ impl<C: NetworkClient> CommitSyncer<C> {

// 3. Fetch blocks referenced by the commits, from the same authority.
let block_refs: Vec<_> = commits.iter().flat_map(|c| c.blocks()).cloned().collect();
let num_chunks = block_refs
.len()
.div_ceil(inner.context.parameters.max_blocks_per_fetch)
as u32;
let mut requests: FuturesOrdered<_> = block_refs
.chunks(inner.context.parameters.max_blocks_per_fetch)
.enumerate()
.map(|(i, request_block_refs)| {
let i = i as u32;
let inner = inner.clone();
async move {
// 4. Send out pipelined fetch requests to avoid overloading the target authority.
sleep(Duration::from_millis(200) * i).await;
sleep(timeout * i as u32 / num_chunks).await;
// TODO: add some retries.
let serialized_blocks = inner
.network_client
.fetch_blocks(
target_authority,
request_block_refs.to_vec(),
vec![],
FETCH_BLOCKS_TIMEOUT,
timeout,
)
.await?;
// 5. Verify the same number of blocks are returned as requested.
Expand Down
Loading