Skip to content

Commit

Permalink
Backport #225 on v1.14 (#256)
Browse files Browse the repository at this point in the history
* buffer bundles that exceed cost model
  • Loading branch information
jedleggett authored and segfaultdoc committed Feb 15, 2023
1 parent b8569be commit 177662a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
41 changes: 32 additions & 9 deletions core/src/bundle_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,24 +304,25 @@ impl BundleStage {
&bank_start.working_bank,
);

// qos rate-limited a tx in here, drop the bundle
// accumulates QoS to metrics
qos_service.accumulate_estimated_transaction_costs(
&Self::accumulate_batched_transaction_costs(
tx_costs.iter(),
transactions_qos_results.iter(),
),
);

// either qos rate-limited a tx in here or bundle exceeds max cost, drop the bundle
if sanitized_bundle.transactions.len() != num_included {
QosService::remove_transaction_costs(
tx_costs.iter(),
transactions_qos_results.iter(),
&bank_start.working_bank,
);
qos_service.report_metrics(bank_start.working_bank.clone());
return Err(BundleExecutionError::ExceedsCostModel);
}

// accumulates QoS to metrics
qos_service.accumulate_estimated_transaction_costs(
&Self::accumulate_batched_transaction_costs(
tx_costs.iter(),
transactions_qos_results.iter(),
),
);

match Self::execute_record_commit_bundle(
sanitized_bundle,
recorder,
Expand Down Expand Up @@ -870,6 +871,7 @@ impl BundleStage {
fn execute_bundles_until_empty_or_end_of_slot(
bundle_account_locker: &BundleAccountLocker,
unprocessed_bundles: &mut VecDeque<PacketBundle>,
cost_model_failed_bundles: &mut VecDeque<PacketBundle>,
blacklisted_accounts: &HashSet<Pubkey>,
bank_start: &BankStart,
consensus_accounts_cache: &HashSet<Pubkey>,
Expand Down Expand Up @@ -1016,6 +1018,8 @@ impl BundleStage {
bundle_stage_leader_stats
.bundle_stage_stats()
.increment_execution_results_exceeds_cost_model(1);
// retry the bundle
cost_model_failed_bundles.push_back(packet_bundle);
}
Err(BundleExecutionError::TipError(_)) => {
bundle_stage_leader_stats
Expand Down Expand Up @@ -1282,6 +1286,7 @@ impl BundleStage {
fn process_buffered_bundles(
bundle_account_locker: &BundleAccountLocker,
unprocessed_bundles: &mut VecDeque<PacketBundle>,
cost_model_failed_bundles: &mut VecDeque<PacketBundle>,
blacklisted_accounts: &HashSet<Pubkey>,
consensus_cache_updater: &mut ConsensusCacheUpdater,
cluster_info: &Arc<ClusterInfo>,
Expand All @@ -1307,16 +1312,32 @@ impl BundleStage {
r_poh_recorder.would_be_leader(DROP_BUNDLE_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT);
drop(r_poh_recorder);

let last_slot = bundle_stage_leader_stats.current_slot;
bundle_stage_leader_stats.maybe_report(id, &working_bank_start);

match (working_bank_start, would_be_leader_soon) {
// leader now, insert new read bundles + as many as can read then return bank
(Some(bank_start), _) => {
consensus_cache_updater.maybe_update(&bank_start.working_bank);

let is_new_slot = match (last_slot, bundle_stage_leader_stats.current_slot) {
(Some(last_slot), Some(current_slot)) => last_slot != current_slot,
(None, Some(_)) => true,
(_, _) => false,
};
if is_new_slot && !cost_model_failed_bundles.is_empty() {
debug!(
"Slot {}: Re-buffering {} bundles that failed cost model!",
&bank_start.working_bank.slot(),
cost_model_failed_bundles.len()
);
unprocessed_bundles.extend(cost_model_failed_bundles.drain(..));
}

Self::execute_bundles_until_empty_or_end_of_slot(
bundle_account_locker,
unprocessed_bundles,
cost_model_failed_bundles,
blacklisted_accounts,
bank_start,
consensus_cache_updater.consensus_accounts_cache(),
Expand Down Expand Up @@ -1378,6 +1399,7 @@ impl BundleStage {
let blacklisted_accounts = HashSet::from_iter([tip_manager.tip_payment_program_id()]);

let mut unprocessed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
let mut cost_model_failed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
while !exit.load(Ordering::Relaxed) {
if !unprocessed_bundles.is_empty()
|| last_leader_slots_update_time.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
Expand All @@ -1386,6 +1408,7 @@ impl BundleStage {
Self::process_buffered_bundles(
&bundle_account_locker,
&mut unprocessed_bundles,
&mut cost_model_failed_bundles,
&blacklisted_accounts,
&mut consensus_cache_updater,
&cluster_info,
Expand Down
2 changes: 1 addition & 1 deletion core/src/bundle_stage_leader_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
// Stats emitted only during leader slots
#[derive(Default)]
pub struct BundleStageLeaderSlotTrackingMetrics {
current_slot: Option<Slot>,
pub(crate) current_slot: Option<Slot>,
bundle_stage_leader_stats: BundleStageLeaderStats,
}

Expand Down

0 comments on commit 177662a

Please sign in to comment.