Skip to content

Commit

Permalink
[consensus] split round timeout msg out of vote msg
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 27, 2024
1 parent a556091 commit 5672524
Show file tree
Hide file tree
Showing 15 changed files with 791 additions and 228 deletions.
2 changes: 2 additions & 0 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub struct ConsensusConfig {
pub proof_cache_capacity: u64,
pub rand_rb_config: ReliableBroadcastConfig,
pub num_bounded_executor_tasks: u64,
pub enable_round_timeout_msg: bool,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -352,6 +353,7 @@ impl Default for ConsensusConfig {
rpc_timeout_ms: 10000,
},
num_bounded_executor_tasks: 16,
enable_round_timeout_msg: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/consensus-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod proposal_msg;
pub mod quorum_cert;
pub mod randomness;
pub mod request_response;
pub mod round_timeout;
pub mod safety_data;
pub mod sync_info;
pub mod timeout_2chain;
Expand Down
179 changes: 179 additions & 0 deletions consensus/consensus-types/src/round_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use crate::{
common::{Author, Round},
sync_info::SyncInfo,
timeout_2chain::TwoChainTimeout,
};
use anyhow::{ensure, Context};
use aptos_crypto::{bls12381, hash::CryptoHash, CryptoMaterialError, HashValue};
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::{
ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
validator_verifier::ValidatorVerifier,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub enum RoundTimeoutReason {
Unknown,
ProposalNotReceived,
PayloadNotAvailable,
}

impl std::fmt::Display for RoundTimeoutReason {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
RoundTimeoutReason::Unknown => write!(f, "Unknown"),
RoundTimeoutReason::ProposalNotReceived => write!(f, "ProposalNotReceived"),
RoundTimeoutReason::PayloadNotAvailable => write!(f, "PayloadNotAvailable"),
}
}
}

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct RoundTimeout {
// The timeout
timeout: TwoChainTimeout,
author: Author,
reason: RoundTimeoutReason,
/// Signature on the Timeout
signature: bls12381::Signature,
}

// this is required by structured log
impl std::fmt::Debug for RoundTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self)
}
}

impl std::fmt::Display for RoundTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RoundTimeoutV2: [timeout: {}, author: {}, reason: {}]",
self.timeout,
self.author.short_str(),
self.reason
)
}
}

impl RoundTimeout {
pub fn new(
timeout: TwoChainTimeout,
author: Author,
reason: RoundTimeoutReason,
signature: bls12381::Signature,
) -> Self {
Self {
timeout,
author,
reason,
signature,
}
}

pub fn epoch(&self) -> u64 {
self.timeout.epoch()
}

pub fn round(&self) -> Round {
self.timeout.round()
}

pub fn two_chain_timeout(&self) -> &TwoChainTimeout {
&self.timeout
}

pub fn author(&self) -> Author {
self.author
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
self.timeout.verify(validator)?;
validator
.verify(
self.author(),
&self.timeout.signing_format(),
&self.signature,
)
.context("Failed to verify 2-chain timeout signature")?;
Ok(())
}

pub fn reason(&self) -> &RoundTimeoutReason {
&self.reason
}

pub fn signature(&self) -> &bls12381::Signature {
&self.signature
}
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct RoundTimeoutMsg {
/// The container for the vote (VoteData, LedgerInfo, Signature)
round_timeout: RoundTimeout,
/// Sync info carries information about highest QC, TC and LedgerInfo
sync_info: SyncInfo,
}

impl std::fmt::Display for RoundTimeoutMsg {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"RoundTimeoutV2Msg: [{}], SyncInfo: [{}]",
self.round_timeout, self.sync_info
)
}
}

impl RoundTimeoutMsg {
pub fn new(round_timeout: RoundTimeout, sync_info: SyncInfo) -> Self {
Self {
round_timeout,
sync_info,
}
}

/// SyncInfo of the given vote message
pub fn sync_info(&self) -> &SyncInfo {
&self.sync_info
}

pub fn epoch(&self) -> u64 {
self.round_timeout.epoch()
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(
self.round_timeout.epoch() == self.sync_info.epoch(),
"RoundTimeoutV2Msg has different epoch"
);
ensure!(
self.round_timeout.round() > self.sync_info.highest_round(),
"Timeout Round should be higher than SyncInfo"
);
ensure!(
self.round_timeout.two_chain_timeout().hqc_round()
<= self.sync_info.highest_certified_round(),
"2-chain Timeout hqc should be less or equal than the sync info hqc"
);
// We're not verifying SyncInfo here yet: we are going to verify it only in case we need
// it. This way we avoid verifying O(n) SyncInfo messages while aggregating the votes
// (O(n^2) signature verifications).
self.round_timeout.verify(validator)
}

pub fn round(&self) -> u64 {
self.round_timeout.round()
}

pub fn author(&self) -> Author {
self.round_timeout.author()
}

pub fn timeout(&self) -> RoundTimeout {
self.round_timeout.clone()
}
}
2 changes: 1 addition & 1 deletion consensus/consensus-types/src/safety_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::vote::Vote;
use crate::{round_timeout::RoundTimeout, vote::Vote};
use serde::{Deserialize, Serialize};
use std::fmt;

Expand Down
2 changes: 2 additions & 0 deletions consensus/consensus-types/src/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl Vote {
/// Verifies that the consensus data hash of LedgerInfo corresponds to the vote info,
/// and then verifies the signature.
pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
// TODO(ibalajiarun): Ensure timeout is None if RoundTimeoutMsg is enabled.

ensure!(
self.ledger_info.consensus_data_hash() == self.vote_data.hash(),
"Vote's hash mismatch with LedgerInfo"
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,12 @@ async fn test_insert_vote() {
let time_service = Arc::new(SimulatedTimeService::new());
let (delayed_qc_tx, _) = unbounded();

let mut pending_votes =
PendingVotes::new(time_service, delayed_qc_tx, QcAggregatorType::NoDelay);
let mut pending_votes = PendingVotes::new(
time_service,
delayed_qc_tx,
QcAggregatorType::NoDelay,
false,
);

assert!(block_store.get_quorum_cert_for_block(block.id()).is_none());
for (i, voter) in signers.iter().enumerate().take(10).skip(1) {
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

fn create_round_state(
&self,
config: &ConsensusConfig,
time_service: Arc<dyn TimeService>,
timeout_sender: aptos_channels::Sender<Round>,
delayed_qc_tx: UnboundedSender<DelayedQcMsg>,
Expand All @@ -279,6 +280,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
timeout_sender,
delayed_qc_tx,
qc_aggregator_type,
config.enable_round_timeout_msg,
)
}

Expand Down Expand Up @@ -797,6 +799,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

info!(epoch = epoch, "Create RoundState");
let round_state = self.create_round_state(
&self.config,
self.time_service.clone(),
self.timeout_sender.clone(),
delayed_qc_tx,
Expand Down Expand Up @@ -1509,6 +1512,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
ConsensusMsg::ProposalMsg(_)
| ConsensusMsg::SyncInfo(_)
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::RoundTimeoutMsg(_)
| ConsensusMsg::OrderVoteMsg(_)
| ConsensusMsg::CommitVoteMsg(_)
| ConsensusMsg::CommitDecisionMsg(_)
Expand Down
38 changes: 36 additions & 2 deletions consensus/src/liveness/round_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use aptos_config::config::QcAggregatorType;
use aptos_consensus_types::{
common::Round, delayed_qc_msg::DelayedQcMsg, sync_info::SyncInfo,
common::Round, delayed_qc_msg::DelayedQcMsg, round_timeout::RoundTimeout, sync_info::SyncInfo,
timeout_2chain::TwoChainTimeoutWithPartialSignatures, vote::Vote,
};
use aptos_crypto::HashValue;
Expand Down Expand Up @@ -161,11 +161,15 @@ pub struct RoundState {
pending_votes: PendingVotes,
// Vote sent locally for the current round.
vote_sent: Option<Vote>,
// Timeout sent locally for the current round.
timeout_sent: Option<RoundTimeout>,
// The handle to cancel previous timeout task when moving to next round.
abort_handle: Option<AbortHandle>,
// Self sender to send delayed QC aggregation events to the round manager.
delayed_qc_tx: UnboundedSender<DelayedQcMsg>,
qc_aggregator_type: QcAggregatorType,

cfg_timeout_msg_enabled: bool,
}

#[derive(Default, Schema)]
Expand Down Expand Up @@ -196,6 +200,7 @@ impl RoundState {
timeout_sender: aptos_channels::Sender<Round>,
delayed_qc_tx: UnboundedSender<DelayedQcMsg>,
qc_aggregator_type: QcAggregatorType,
cfg_timeout_msg_enabled: bool,
) -> Self {
// Our counters are initialized lazily, so they're not going to appear in
// Prometheus if some conditions never happen. Invoking get() function enforces creation.
Expand All @@ -207,6 +212,7 @@ impl RoundState {
time_service.clone(),
delayed_qc_tx.clone(),
qc_aggregator_type.clone(),
cfg_timeout_msg_enabled,
);
Self {
time_interval,
Expand All @@ -217,15 +223,17 @@ impl RoundState {
timeout_sender,
pending_votes,
vote_sent: None,
timeout_sent: None,
abort_handle: None,
delayed_qc_tx,
qc_aggregator_type,
cfg_timeout_msg_enabled,
}
}

/// Return if already voted for timeout
pub fn is_vote_timeout(&self) -> bool {
self.vote_sent.as_ref().map_or(false, |v| v.is_timeout())
self.vote_sent.as_ref().map_or(false, |v| v.is_timeout()) || self.timeout_sent.is_some()
}

/// Return the current round.
Expand Down Expand Up @@ -266,8 +274,10 @@ impl RoundState {
self.time_service.clone(),
self.delayed_qc_tx.clone(),
self.qc_aggregator_type.clone(),
self.cfg_timeout_msg_enabled,
);
self.vote_sent = None;
self.timeout_sent = None;
let timeout = self.setup_timeout(1);
// The new round reason is QCReady in case both QC.round + 1 == new_round, otherwise
// it's Timeout and TC.round + 1 == new_round.
Expand Down Expand Up @@ -304,12 +314,32 @@ impl RoundState {
}
}

pub fn insert_round_timeout(
&mut self,
timeout: &RoundTimeout,
verifier: &ValidatorVerifier,
) -> VoteReceptionResult {
assert!(self.cfg_timeout_msg_enabled);
if timeout.round() == self.current_round {
self.pending_votes.insert_round_timeout(timeout, verifier)
} else {
VoteReceptionResult::UnexpectedRound(timeout.round(), self.current_round)
}
}

pub fn record_vote(&mut self, vote: Vote) {
if vote.vote_data().proposed().round() == self.current_round {
self.vote_sent = Some(vote);
}
}

pub fn record_round_timeout(&mut self, timeout: RoundTimeout) {
assert!(self.cfg_timeout_msg_enabled);
if timeout.round() == self.current_round {
self.timeout_sent = Some(timeout)
}
}

pub fn process_delayed_qc_msg(
&mut self,
validator_verifier: &ValidatorVerifier,
Expand All @@ -324,6 +354,10 @@ impl RoundState {
self.vote_sent.clone()
}

pub fn timeout_sent(&self) -> Option<RoundTimeout> {
self.timeout_sent.clone()
}

/// Setup the timeout task and return the duration of the current timeout
fn setup_timeout(&mut self, multiplier: u32) -> Duration {
let timeout_sender = self.timeout_sender.clone();
Expand Down
1 change: 1 addition & 0 deletions consensus/src/liveness/round_state_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ fn make_round_state() -> (RoundState, aptos_channels::Receiver<Round>) {
timeout_tx,
delayed_qc_tx,
QcAggregatorType::NoDelay,
false,
),
timeout_rx,
)
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum LogEvent {
ReceiveProposal,
ReceiveSyncInfo,
ReceiveVote,
ReceiveRoundTimeout,
ReceiveOrderVote,
RetrieveBlock,
StateSync,
Expand Down
Loading

0 comments on commit 5672524

Please sign in to comment.