Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Refactor BlockBuilder::propose_with #14405

Merged
merged 8 commits into from
Jun 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 84 additions & 45 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub const DEFAULT_BLOCK_SIZE_LIMIT: usize = 4 * 1024 * 1024 + 512;

const DEFAULT_SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(50);

const LOG_TARGET: &'static str = "basic-authorship";

/// [`Proposer`] factory.
pub struct ProposerFactory<A, B, C, PR> {
spawn_handle: Box<dyn SpawnNamed>,
Expand Down Expand Up @@ -302,7 +304,7 @@ where
.propose_with(inherent_data, inherent_digests, deadline, block_size_limit)
.await;
if tx.send(res).is_err() {
trace!("Could not send block production result to proposer!");
trace!(target: LOG_TARGET, "Could not send block production result to proposer!");
}
}),
);
Expand Down Expand Up @@ -339,10 +341,34 @@ where
block_size_limit: Option<usize>,
) -> Result<Proposal<Block, backend::TransactionFor<B, Block>, PR::Proof>, sp_blockchain::Error>
{
let propose_with_start = time::Instant::now();
let propose_with_timer = time::Instant::now();
let mut block_builder =
self.client.new_block_at(self.parent_hash, inherent_digests, PR::ENABLED)?;

self.apply_inherents(&mut block_builder, inherent_data)?;

// TODO call `after_inherents` and check if we should apply extrinsincs here
// <https://github.com/paritytech/substrate/pull/14275/>
kianenigma marked this conversation as resolved.
Show resolved Hide resolved

let block_timer = time::Instant::now();
let end_reason =
self.apply_extrinsics(&mut block_builder, deadline, block_size_limit).await?;
let (block, storage_changes, proof) = block_builder.build()?.into_inner();
let block_took = block_timer.elapsed();

let proof =
PR::into_proof(proof).map_err(|e| sp_blockchain::Error::Application(Box::new(e)))?;

self.print_summary(&block, end_reason, block_took, propose_with_timer.elapsed());
Ok(Proposal { block, proof, storage_changes })
}

/// Apply all inherents to the block.
fn apply_inherents(
&self,
block_builder: &mut sc_block_builder::BlockBuilder<'_, Block, C, B>,
inherent_data: InherentData,
) -> Result<(), sp_blockchain::Error> {
let create_inherents_start = time::Instant::now();
let inherents = block_builder.create_inherents(inherent_data)?;
let create_inherents_end = time::Instant::now();
Expand All @@ -358,7 +384,7 @@ where
for inherent in inherents {
match block_builder.push(inherent) {
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
warn!("⚠️ Dropping non-mandatory inherent from overweight block.")
warn!(target: LOG_TARGET, "⚠️ Dropping non-mandatory inherent from overweight block.")
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.was_mandatory() => {
error!(
Expand All @@ -367,20 +393,28 @@ where
return Err(ApplyExtrinsicFailed(Validity(e)))
},
Err(e) => {
warn!("❗️ Inherent extrinsic returned unexpected error: {}. Dropping.", e);
warn!(target: LOG_TARGET, "❗️ Inherent extrinsic returned unexpected error: {}. Dropping.", e);
},
Ok(_) => {},
}
}
Ok(())
}

/// Apply as many extrinsics as possible to the block.
async fn apply_extrinsics(
&self,
block_builder: &mut sc_block_builder::BlockBuilder<'_, Block, C, B>,
deadline: time::Instant,
block_size_limit: Option<usize>,
) -> Result<EndProposingReason, sp_blockchain::Error> {
// proceed with transactions
// We calculate soft deadline used only in case we start skipping transactions.
let now = (self.now)();
let left = deadline.saturating_duration_since(now);
let left_micros: u64 = left.as_micros().saturated_into();
let soft_deadline =
now + time::Duration::from_micros(self.soft_deadline_percent.mul_floor(left_micros));
let block_timer = time::Instant::now();
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();

Expand All @@ -391,7 +425,7 @@ where
let mut pending_iterator = select! {
res = t1 => res,
_ = t2 => {
log::warn!(
warn!(target: LOG_TARGET,
"Timeout fired waiting for transaction pool at block #{}. \
Proceeding with production.",
self.parent_number,
Expand All @@ -402,8 +436,8 @@ where

let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);

debug!("Attempting to push transactions from the pool.");
debug!("Pool status: {:?}", self.transaction_pool.status());
debug!(target: LOG_TARGET, "Attempting to push transactions from the pool.");
debug!(target: LOG_TARGET, "Pool status: {:?}", self.transaction_pool.status());
let mut transaction_pushed = false;

let end_reason = loop {
Expand All @@ -415,7 +449,7 @@ where

let now = (self.now)();
if now > deadline {
debug!(
debug!(target: LOG_TARGET,
"Consensus deadline reached when pushing block transactions, \
proceeding with proposing."
);
Expand All @@ -431,87 +465,104 @@ where
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
debug!(target: LOG_TARGET,
"Transaction would overflow the block size limit, \
but will try {} more transactions before quitting.",
MAX_SKIPPED_TRANSACTIONS - skipped,
);
continue
} else if now < soft_deadline {
debug!(
debug!(target: LOG_TARGET,
"Transaction would overflow the block size limit, \
but we still have time before the soft deadline, so \
we will try a bit more."
);
continue
} else {
debug!("Reached block size limit, proceeding with proposing.");
debug!(target: LOG_TARGET, "Reached block size limit, proceeding with proposing.");
break EndProposingReason::HitBlockSizeLimit
}
}

trace!("[{:?}] Pushing to the block.", pending_tx_hash);
match sc_block_builder::BlockBuilder::push(&mut block_builder, pending_tx_data) {
trace!(target: LOG_TARGET, "[{:?}] Pushing to the block.", pending_tx_hash);
match sc_block_builder::BlockBuilder::push(block_builder, pending_tx_data) {
Ok(()) => {
transaction_pushed = true;
debug!("[{:?}] Pushed to the block.", pending_tx_hash);
debug!(target: LOG_TARGET, "[{:?}] Pushed to the block.", pending_tx_hash);
},
Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
pending_iterator.report_invalid(&pending_tx);
if skipped < MAX_SKIPPED_TRANSACTIONS {
skipped += 1;
debug!(
debug!(target: LOG_TARGET,
"Block seems full, but will try {} more transactions before quitting.",
MAX_SKIPPED_TRANSACTIONS - skipped,
);
} else if (self.now)() < soft_deadline {
debug!(
debug!(target: LOG_TARGET,
"Block seems full, but we still have time before the soft deadline, \
so we will try a bit more before quitting."
);
} else {
debug!("Reached block weight limit, proceeding with proposing.");
debug!(target: LOG_TARGET, "Reached block weight limit, proceeding with proposing.");
break EndProposingReason::HitBlockWeightLimit
}
},
Err(e) => {
pending_iterator.report_invalid(&pending_tx);
debug!("[{:?}] Invalid transaction: {}", pending_tx_hash, e);
debug!(target: LOG_TARGET, "[{:?}] Invalid transaction: {}", pending_tx_hash, e);
unqueue_invalid.push(pending_tx_hash);
},
}
};

if matches!(end_reason, EndProposingReason::HitBlockSizeLimit) && !transaction_pushed {
warn!(
warn!(target: LOG_TARGET,
"Hit block size limit of `{}` without including any transaction!",
block_size_limit,
);
}

self.transaction_pool.remove_invalid(&unqueue_invalid);
Ok(end_reason)
}

let (block, storage_changes, proof) = block_builder.build()?.into_inner();

/// Prints a summary and does telemetry + metrics.
fn print_summary(
&self,
block: &Block,
end_reason: EndProposingReason,
block_took: time::Duration,
propose_with_took: time::Duration,
) {
let extrinsics = block.extrinsics();
self.metrics.report(|metrics| {
metrics.number_of_transactions.set(block.extrinsics().len() as u64);
metrics.block_constructed.observe(block_timer.elapsed().as_secs_f64());

metrics.number_of_transactions.set(extrinsics.len() as u64);
metrics.block_constructed.observe(block_took.as_secs_f64());
metrics.report_end_proposing_reason(end_reason);
metrics.create_block_proposal_time.observe(propose_with_took.as_secs_f64());
});

let extrinsics_summary = if extrinsics.is_empty() {
"no extrinsics".to_string()
} else {
format!(
"extrinsics ({}): [{}]",
extrinsics.len(),
extrinsics
.iter()
.map(|xt| BlakeTwo256::hash_of(xt).to_string())
.collect::<Vec<_>>()
.join(", ")
)
};

info!(
"🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics ({}): [{}]]",
"🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; {extrinsics_summary}",
block.header().number(),
block_timer.elapsed().as_millis(),
block_took.as_millis(),
<Block as BlockT>::Hash::from(block.header().hash()),
block.header().parent_hash(),
block.extrinsics().len(),
block.extrinsics()
.iter()
.map(|xt| BlakeTwo256::hash_of(xt).to_string())
.collect::<Vec<_>>()
.join(", ")
);
telemetry!(
self.telemetry;
Expand All @@ -520,18 +571,6 @@ where
"number" => ?block.header().number(),
"hash" => ?<Block as BlockT>::Hash::from(block.header().hash()),
);

let proof =
PR::into_proof(proof).map_err(|e| sp_blockchain::Error::Application(Box::new(e)))?;

let propose_with_end = time::Instant::now();
self.metrics.report(|metrics| {
metrics.create_block_proposal_time.observe(
propose_with_end.saturating_duration_since(propose_with_start).as_secs_f64(),
);
});

Ok(Proposal { block, proof, storage_changes })
}
}

Expand Down