Skip to content

Commit

Permalink
[Consensus Observer] Add basic window traversal logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Feb 21, 2025
1 parent 3197ade commit 27d6916
Show file tree
Hide file tree
Showing 3 changed files with 678 additions and 23 deletions.
127 changes: 114 additions & 13 deletions consensus/src/consensus_observer/observer/consensus_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
},
observer::{
active_state::ActiveObserverState,
execution_pool,
execution_pool::ObservedOrderedBlock,
fallback_manager::ObserverFallbackManager,
ordered_blocks::OrderedBlockStore,
Expand All @@ -37,10 +38,7 @@ use aptos_config::{
config::{ConsensusObserverConfig, NodeConfig},
network_id::PeerNetworkId,
};
use aptos_consensus_types::{
pipeline,
pipelined_block::{PipelineFutures, PipelinedBlock},
};
use aptos_consensus_types::{pipeline, pipelined_block::PipelineFutures};
use aptos_crypto::{bls12381, Genesis};
use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener};
use aptos_executor_types::state_compute_result::StateComputeResult;
Expand Down Expand Up @@ -164,15 +162,50 @@ impl ConsensusObserver {
}
}

/// Returns true iff all payloads exist for the given blocks
fn all_payloads_exist(&self, blocks: &[Arc<PipelinedBlock>]) -> bool {
/// Returns true iff all blocks and payloads exist for the ordered block with window
fn all_blocks_and_payloads_exist(
&self,
ordered_block: &OrderedBlock,
window_size: usize,
) -> bool {
// Get all the ordered blocks for the window
let all_ordered_blocks = match execution_pool::get_all_blocks_for_window(
self.pending_block_store.clone(),
ordered_block,
window_size,
) {
Some(ordered_blocks) => ordered_blocks,
None => return false, // Some blocks were missing!
};

// If quorum store is disabled, all payloads exist (they're embedded in the blocks)
if !self.active_observer_state.is_quorum_store_enabled() {
return true;
}

// Otherwise, check if all payloads exist for the blocks in the window
for ordered_block_with_window in all_ordered_blocks {
let pipelined_blocks = ordered_block_with_window.blocks();
let block_payload_store = self.block_payload_store.lock();
if !block_payload_store.all_payloads_exist(pipelined_blocks) {
return false; // Some payloads were missing!
}
}

true // All blocks and payloads exist
}

/// Returns true iff all payloads exist for the given ordered block
fn all_payloads_exist(&self, ordered_block: &OrderedBlock) -> bool {
// If quorum store is disabled, all payloads exist (they're already in the blocks)
if !self.active_observer_state.is_quorum_store_enabled() {
return true;
}

// Otherwise, check if all the payloads exist in the payload store
self.block_payload_store.lock().all_payloads_exist(blocks)
self.block_payload_store
.lock()
.all_payloads_exist(ordered_block.blocks())
}

/// Checks the progress of the consensus observer
Expand Down Expand Up @@ -574,12 +607,48 @@ impl ConsensusObserver {
let pending_block = self
.ordered_block_store
.lock()
.get_ordered_block(commit_decision.epoch(), commit_decision.round());
.get_observed_ordered_block(commit_decision.epoch(), commit_decision.round());

// Process the pending block
if let Some(pending_block) = pending_block {
// If all payloads exist, add the commit decision to the pending blocks
if self.all_payloads_exist(pending_block.blocks()) {
// Check if all block dependencies exist for the pending block
let block_dependencies_exist = match pending_block {
ObservedOrderedBlock::Ordered(ordered) => {
if self.get_execution_pool_window_size().is_some() {
// Log an error and return false (execution pool is enabled)
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Execution pool is enabled, but found a pending ordered block: {:?}",
ordered.proof_block_info()
))
);
false
} else {
self.all_payloads_exist(&ordered)
}
},
ObservedOrderedBlock::OrderedWithWindow(ordered_with_window) => {
match self.get_execution_pool_window_size() {
Some(window_size) => self.all_blocks_and_payloads_exist(
ordered_with_window.ordered_block(),
window_size,
),
None => {
// Log an error and return false (execution pool is disabled)
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Execution pool is disabled, but found a pending ordered block with window: {:?}",
ordered_with_window.ordered_block().proof_block_info()
))
);
false
},
}
},
};

// If all dependencies exist, add the commit decision to the pending blocks
if block_dependencies_exist {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Adding decision to pending block: {}",
Expand Down Expand Up @@ -742,7 +811,7 @@ impl ConsensusObserver {

// If all payloads exist, process the block. Otherwise, store it
// in the pending block store and wait for the payloads to arrive.
if self.all_payloads_exist(pending_block_with_metadata.ordered_block().blocks()) {
if self.all_payloads_exist(pending_block_with_metadata.ordered_block()) {
self.process_ordered_block(pending_block_with_metadata)
.await;
} else {
Expand Down Expand Up @@ -857,7 +926,7 @@ impl ConsensusObserver {
async fn process_ordered_block_with_window_message(
&mut self,
peer_network_id: PeerNetworkId,
_message_received_time: Instant,
message_received_time: Instant,
ordered_block_with_window: OrderedBlockWithWindow,
) {
// If execution pool is disabled, ignore the message
Expand Down Expand Up @@ -945,7 +1014,39 @@ impl ConsensusObserver {
// Update the metrics for the received ordered block with window
update_metrics_for_ordered_block_with_window_message(peer_network_id, ordered_block);

// TODO: process the ordered block with window message (instead of just dropping it!)
// Create a new pending block with metadata
let observed_ordered_block =
ObservedOrderedBlock::new_with_window(ordered_block_with_window);
let pending_block_with_metadata = PendingBlockWithMetadata::new_with_arc(
peer_network_id,
message_received_time,
observed_ordered_block,
);

// If all blocks and payloads exist, process the block. Otherwise, store it
// in the pending block store and wait for the blocks and payloads to arrive.
if self.all_blocks_and_payloads_exist(
pending_block_with_metadata.ordered_block(),
execution_pool_window_size,
) {
self.process_ordered_block_with_window(pending_block_with_metadata)
.await;
} else {
self.pending_block_store
.lock()
.insert_pending_block(pending_block_with_metadata);
}
}

/// Processes the ordered block with window. This assumes the ordered block
/// has been sanity checked and that all blocks and payloads exist.
async fn process_ordered_block_with_window(
&mut self,
_pending_block_with_metadata: Arc<PendingBlockWithMetadata>,
) {
// TODO: implement me!
// This requires populating the pending block with the appropriate
// pipelined blocks before pushing it into the execution pipeline.
}

/// Processes the given state sync notification
Expand Down
Loading

0 comments on commit 27d6916

Please sign in to comment.