Skip to content

Commit

Permalink
[pipeline] fixes
Browse files Browse the repository at this point in the history
[pipeline] expect cancellation error
[pipeline] block callback until notification finishes
[pipeline] avoid panic when parent fut doesn't exist during epoch ending
  • Loading branch information
Zekun Li authored and zekun000 committed Dec 12, 2024
1 parent 652be07 commit 2ee2c33
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
26 changes: 18 additions & 8 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use aptos_executor_types::{
state_compute_result::StateComputeResult, ExecutorError, ExecutorResult,
};
use aptos_infallible::Mutex;
use aptos_logger::{error, warn};
use aptos_logger::{error, info, warn};
use aptos_types::{
block_info::BlockInfo,
contract_event::ContractEvent,
Expand Down Expand Up @@ -427,7 +427,9 @@ impl PipelinedBlock {
/// Pipeline related functions
impl PipelinedBlock {
pub fn pipeline_enabled(&self) -> bool {
self.pipeline_futs.lock().is_some()
// if the pipeline_tx is set, the pipeline is enabled,
// we don't use pipeline fut here because it can't be taken when abort
self.pipeline_tx.lock().is_some()
}

pub fn pipeline_futs(&self) -> Option<PipelineFutures> {
Expand All @@ -451,6 +453,12 @@ impl PipelinedBlock {
}

pub fn abort_pipeline(&self) -> Option<PipelineFutures> {
info!(
"[Pipeline] Aborting pipeline for block {} {} {}",
self.id(),
self.epoch(),
self.round()
);
if let Some(abort_handles) = self.pipeline_abort_handle.lock().take() {
for handle in abort_handles {
handle.abort();
Expand All @@ -461,7 +469,9 @@ impl PipelinedBlock {

pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> {
self.pipeline_futs()
.expect("Pipeline needs to be enabled")
.ok_or(ExecutorError::InternalError {
error: "Pipeline aborted".to_string(),
})?
.ledger_update_fut
.await
.map(|(compute_result, execution_time, _)| (compute_result, execution_time))
Expand All @@ -471,11 +481,11 @@ impl PipelinedBlock {
}

pub async fn wait_for_commit_ledger(&self) {
self.pipeline_futs()
.expect("Pipeline needs to be enabled")
.commit_ledger_fut
.await
.expect("Commit ledger should succeed");
// may be aborted (e.g. by reset)
if let Some(fut) = self.pipeline_futs() {
// this may be cancelled
let _ = fut.commit_ledger_fut.await;
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ impl BlockStore {
});
pipeline_builder.build(
&pipelined_block,
parent_block
.pipeline_futs()
.expect("Futures should exist when pipeline enabled"),
parent_block.pipeline_futs().ok_or_else(|| {
anyhow::anyhow!("Parent future doesn't exist, potentially epoch ended")
})?,
callback,
);
}
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl PipelineBuilder {
Self::post_commit_ledger(
pre_commit_fut.clone(),
commit_ledger_fut.clone(),
post_pre_commit_fut.clone(),
parent.post_commit_fut.clone(),
self.payload_manager.clone(),
block_store_callback,
Expand Down Expand Up @@ -679,11 +680,12 @@ impl PipelineBuilder {
Ok(Some(ledger_info_with_sigs))
}

/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes
/// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes
/// What it does: Update counters for the block, and notify block tree about the commit
async fn post_commit_ledger(
pre_commit_fut: TaskFuture<PreCommitResult>,
commit_ledger_fut: TaskFuture<CommitLedgerResult>,
post_pre_commit_fut: TaskFuture<PostPreCommitResult>,
parent_post_commit: TaskFuture<PostCommitResult>,
payload_manager: Arc<dyn TPayloadManager>,
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
Expand All @@ -692,6 +694,7 @@ impl PipelineBuilder {
parent_post_commit.await?;
let maybe_ledger_info_with_sigs = commit_ledger_fut.await?;
let compute_result = pre_commit_fut.await?;
post_pre_commit_fut.await?;

let _tracker = Tracker::new("post_commit_ledger", &block);
update_counters_for_block(&block);
Expand Down

0 comments on commit 2ee2c33

Please sign in to comment.