Skip to content

Commit

Permalink
[Consensus Observer] Add ObservedOrderedBlock to the ordered block
Browse files Browse the repository at this point in the history
store.
  • Loading branch information
JoshLind committed Feb 5, 2025
1 parent 6619914 commit d14fca6
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 19 deletions.
22 changes: 19 additions & 3 deletions consensus/src/consensus_observer/observer/active_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,12 @@ fn handle_committed_blocks(
#[cfg(test)]
mod test {
use super::*;
use crate::consensus_observer::network::observer_message::{
BlockPayload, BlockTransactionPayload, OrderedBlock,
use crate::consensus_observer::{
network::observer_message::{
BlockPayload, BlockTransactionPayload, ExecutionPoolWindow, OrderedBlock,
OrderedBlockWithWindow,
},
observer::execution_pool::ObservedOrderedBlock,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::{
Expand All @@ -369,6 +373,7 @@ mod test {
aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo,
transaction::Version,
};
use rand::{rngs::OsRng, Rng};

#[test]
fn test_check_root_epoch_and_round() {
Expand Down Expand Up @@ -573,10 +578,21 @@ mod test {
create_ledger_info(epoch, i as aptos_consensus_types::common::Round);
let ordered_block = OrderedBlock::new(blocks, ordered_proof);

// Create an observed ordered block (the observed type is determined randomly)
let observed_ordered_block = if OsRng.gen::<u8>() % 2 == 0 {
ObservedOrderedBlock::new(ordered_block.clone())
} else {
let ordered_block_with_window = OrderedBlockWithWindow::new(
ordered_block.clone(),
ExecutionPoolWindow::new(vec![]),
);
ObservedOrderedBlock::new_with_window(ordered_block_with_window)
};

// Insert the block into the ordered block store
ordered_block_store
.lock()
.insert_ordered_block(ordered_block.clone());
.insert_ordered_block(observed_ordered_block.clone());

// Add the block to the ordered blocks
ordered_blocks.push(ordered_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ impl ConsensusObserver {
// Unpack the pending block
let (peer_network_id, message_received_time, observed_ordered_block) =
pending_block_with_metadata.into_parts();
let ordered_block = observed_ordered_block.consume_ordered_block();
let ordered_block = observed_ordered_block.ordered_block().clone();

// Verify the ordered block proof
let epoch_state = self.get_epoch_state();
Expand Down Expand Up @@ -837,7 +837,7 @@ impl ConsensusObserver {
// Insert the ordered block into the pending blocks
self.ordered_block_store
.lock()
.insert_ordered_block(ordered_block.clone());
.insert_ordered_block(observed_ordered_block.clone());

// If state sync is not syncing to a commit, finalize the ordered blocks
if !self.state_sync_manager.is_syncing_to_commit() {
Expand Down Expand Up @@ -1085,8 +1085,9 @@ impl ConsensusObserver {

// Process all the newly ordered blocks
let all_ordered_blocks = self.ordered_block_store.lock().get_all_ordered_blocks();
for (_, (ordered_block, commit_decision)) in all_ordered_blocks {
for (_, (observed_ordered_block, commit_decision)) in all_ordered_blocks {
// Finalize the ordered block
let ordered_block = observed_ordered_block.consume_ordered_block();
self.finalize_ordered_block(ordered_block).await;

// If a commit decision is available, forward it to the execution pipeline
Expand Down
50 changes: 37 additions & 13 deletions consensus/src/consensus_observer/observer/ordered_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::consensus_observer::{
metrics,
},
network::observer_message::{CommitDecision, OrderedBlock},
observer::execution_pool::ObservedOrderedBlock,
};
use aptos_config::config::ConsensusObserverConfig;
use aptos_consensus_types::{common::Round, pipelined_block::PipelinedBlock};
Expand All @@ -24,7 +25,7 @@ pub struct OrderedBlockStore {

// Ordered blocks. The key is the epoch and round of the last block in the
// ordered block. Each entry contains the block and the commit decision (if any).
ordered_blocks: BTreeMap<(u64, Round), (OrderedBlock, Option<CommitDecision>)>,
ordered_blocks: BTreeMap<(u64, Round), (ObservedOrderedBlock, Option<CommitDecision>)>,
}

impl OrderedBlockStore {
Expand All @@ -44,7 +45,7 @@ impl OrderedBlockStore {
/// Returns a copy of the ordered blocks
pub fn get_all_ordered_blocks(
&self,
) -> BTreeMap<(u64, Round), (OrderedBlock, Option<CommitDecision>)> {
) -> BTreeMap<(u64, Round), (ObservedOrderedBlock, Option<CommitDecision>)> {
self.ordered_blocks.clone()
}

Expand All @@ -57,28 +58,30 @@ impl OrderedBlockStore {
pub fn get_last_ordered_block(&self) -> Option<Arc<PipelinedBlock>> {
self.ordered_blocks
.last_key_value()
.map(|(_, (ordered_block, _))| ordered_block.last_block())
.map(|(_, (observed_ordered_block, _))| {
observed_ordered_block.ordered_block().last_block()
})
}

/// Returns the ordered block for the given epoch and round (if any)
pub fn get_ordered_block(&self, epoch: u64, round: Round) -> Option<OrderedBlock> {
self.ordered_blocks
.get(&(epoch, round))
.map(|(ordered_block, _)| ordered_block.clone())
.map(|(observed_ordered_block, _)| observed_ordered_block.ordered_block().clone())
}

/// Inserts the given ordered block into the ordered blocks. This function
/// assumes the block has already been checked to extend the current ordered
/// blocks, and that the ordered proof has been verified.
pub fn insert_ordered_block(&mut self, ordered_block: OrderedBlock) {
pub fn insert_ordered_block(&mut self, observed_ordered_block: ObservedOrderedBlock) {
// Verify that the number of ordered blocks doesn't exceed the maximum
let max_num_ordered_blocks = self.consensus_observer_config.max_num_pending_blocks as usize;
if self.ordered_blocks.len() >= max_num_ordered_blocks {
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Exceeded the maximum number of ordered blocks: {:?}. Dropping block: {:?}.",
max_num_ordered_blocks,
ordered_block.proof_block_info()
observed_ordered_block.ordered_block().proof_block_info()
))
);
return; // Drop the block if we've exceeded the maximum
Expand All @@ -88,18 +91,20 @@ impl OrderedBlockStore {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Adding ordered block to the ordered blocks: {:?}",
ordered_block.proof_block_info()
observed_ordered_block.ordered_block().proof_block_info()
))
);

// Get the epoch and round of the last ordered block
let last_block = ordered_block.last_block();
let last_block = observed_ordered_block.ordered_block().last_block();
let last_block_epoch = last_block.epoch();
let last_block_round = last_block.round();

// Insert the ordered block
self.ordered_blocks
.insert((last_block_epoch, last_block_round), (ordered_block, None));
self.ordered_blocks.insert(
(last_block_epoch, last_block_round),
(observed_ordered_block, None),
);
}

/// Removes the ordered blocks for the given commit ledger info. This will
Expand Down Expand Up @@ -173,7 +178,9 @@ impl OrderedBlockStore {
let num_ordered_blocks = self
.ordered_blocks
.values()
.map(|(ordered_block, _)| ordered_block.blocks().len() as u64)
.map(|(observed_ordered_block, _)| {
observed_ordered_block.ordered_block().blocks().len() as u64
})
.sum();
metrics::set_gauge_with_label(
&metrics::OBSERVER_NUM_PROCESSED_BLOCKS,
Expand All @@ -185,7 +192,9 @@ impl OrderedBlockStore {
let highest_ordered_round = self
.ordered_blocks
.last_key_value()
.map(|(_, (ordered_block, _))| ordered_block.last_block().round())
.map(|(_, (observed_ordered_block, _))| {
observed_ordered_block.ordered_block().last_block().round()
})
.unwrap_or(0);
metrics::set_gauge_with_label(
&metrics::OBSERVER_PROCESSED_BLOCK_ROUNDS,
Expand All @@ -209,6 +218,9 @@ impl OrderedBlockStore {
#[cfg(test)]
mod test {
use super::*;
use crate::consensus_observer::network::observer_message::{
ExecutionPoolWindow, OrderedBlockWithWindow,
};
use aptos_consensus_types::{
block::Block,
block_data::{BlockData, BlockType},
Expand All @@ -220,6 +232,7 @@ mod test {
aggregate_signature::AggregateSignature, block_info::BlockInfo, ledger_info::LedgerInfo,
transaction::Version,
};
use rand::{rngs::OsRng, Rng};
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -725,8 +738,19 @@ mod test {
let ordered_proof = create_ledger_info(epoch, i as Round);
let ordered_block = OrderedBlock::new(blocks, ordered_proof);

// Create an observed ordered block (the observed type is determined randomly)
let observed_ordered_block = if OsRng.gen::<u8>() % 2 == 0 {
ObservedOrderedBlock::new(ordered_block.clone())
} else {
let ordered_block_with_window = OrderedBlockWithWindow::new(
ordered_block.clone(),
ExecutionPoolWindow::new(vec![]),
);
ObservedOrderedBlock::new_with_window(ordered_block_with_window)
};

// Insert the block into the ordered block store
ordered_block_store.insert_ordered_block(ordered_block.clone());
ordered_block_store.insert_ordered_block(observed_ordered_block.clone());

// Add the block to the ordered blocks
ordered_blocks.push(ordered_block);
Expand Down

0 comments on commit d14fca6

Please sign in to comment.