diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 10fa28f9227ba..b0cc069aff768 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -17,7 +17,7 @@ use aptos_executor_types::{ state_compute_result::StateComputeResult, ExecutorError, ExecutorResult, }; use aptos_infallible::Mutex; -use aptos_logger::{error, warn}; +use aptos_logger::{error, info, warn}; use aptos_types::{ block_info::BlockInfo, contract_event::ContractEvent, @@ -427,7 +427,9 @@ impl PipelinedBlock { /// Pipeline related functions impl PipelinedBlock { pub fn pipeline_enabled(&self) -> bool { - self.pipeline_futs.lock().is_some() + // if the pipeline_tx is set, the pipeline is enabled, + // we don't use pipeline fut here because it can't be taken when abort + self.pipeline_tx.lock().is_some() } pub fn pipeline_futs(&self) -> Option { @@ -451,6 +453,12 @@ impl PipelinedBlock { } pub fn abort_pipeline(&self) -> Option { + info!( + "[Pipeline] Aborting pipeline for block {} {} {}", + self.id(), + self.epoch(), + self.round() + ); if let Some(abort_handles) = self.pipeline_abort_handle.lock().take() { for handle in abort_handles { handle.abort(); @@ -461,7 +469,9 @@ impl PipelinedBlock { pub async fn wait_for_compute_result(&self) -> ExecutorResult<(StateComputeResult, Duration)> { self.pipeline_futs() - .expect("Pipeline needs to be enabled") + .ok_or(ExecutorError::InternalError { + error: "Pipeline aborted".to_string(), + })? .ledger_update_fut .await .map(|(compute_result, execution_time, _)| (compute_result, execution_time)) @@ -471,11 +481,11 @@ impl PipelinedBlock { } pub async fn wait_for_commit_ledger(&self) { - self.pipeline_futs() - .expect("Pipeline needs to be enabled") - .commit_ledger_fut - .await - .expect("Commit ledger should succeed"); + // may be aborted (e.g. by reset) + if let Some(fut) = self.pipeline_futs() { + // this may be cancelled + let _ = fut.commit_ledger_fut.await; + } } } diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index ae20e0a5b01cc..b8c8146121e0a 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -400,9 +400,9 @@ impl BlockStore { }); pipeline_builder.build( &pipelined_block, - parent_block - .pipeline_futs() - .expect("Futures should exist when pipeline enabled"), + parent_block.pipeline_futs().ok_or_else(|| { + anyhow::anyhow!("Parent future doesn't exist, potentially epoch ended") + })?, callback, ); } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index cb6a90d463498..94577fad0511b 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -348,6 +348,7 @@ impl PipelineBuilder { Self::post_commit_ledger( pre_commit_fut.clone(), commit_ledger_fut.clone(), + post_pre_commit_fut.clone(), parent.post_commit_fut.clone(), self.payload_manager.clone(), block_store_callback, @@ -679,11 +680,12 @@ impl PipelineBuilder { Ok(Some(ledger_info_with_sigs)) } - /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes + /// Precondition: 1. commit ledger finishes, 2. parent block's phase finishes 3. post pre commit finishes /// What it does: Update counters for the block, and notify block tree about the commit async fn post_commit_ledger( pre_commit_fut: TaskFuture, commit_ledger_fut: TaskFuture, + post_pre_commit_fut: TaskFuture, parent_post_commit: TaskFuture, payload_manager: Arc, block_store_callback: Box, @@ -692,6 +694,7 @@ impl PipelineBuilder { parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; + post_pre_commit_fut.await?; let _tracker = Tracker::new("post_commit_ledger", &block); update_counters_for_block(&block);