Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jedleggett committed Feb 22, 2023
1 parent 5b6c48c commit c5e2424
Showing 1 changed file with 97 additions and 47 deletions.
144 changes: 97 additions & 47 deletions core/src/bundle_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,80 @@ struct AllExecutionResults {
pub post_balances: (TransactionBalances, TransactionTokenBalances),
}

struct BundleReservedSpace {
current_allocated_cost: u64,
initial_allocated_cost: u64,
}

impl BundleReservedSpace {
fn reset_reserved_cost(&mut self, working_bank: &Arc<Bank>) {
self.current_allocated_cost = self.initial_allocated_cost;

// Reserve space from cost tracker
QosService::update_or_remove_transaction_costs(
vec![TransactionCost::new_with_capacity(1)].iter(),
vec![Ok(())].iter(),
Some(&vec![CommitTransactionDetails::Committed {
compute_units: self.current_allocated_cost,
}]),
working_bank,
);
info!(
"Slot: {} - Update reserved space to {}. Block Cost {}",
working_bank.slot(),
self.current_allocated_cost,
working_bank
.read_cost_tracker()
// Using unwrap here is scary, but that's the way it's done in QoS
.unwrap()
.block_cost()
);
}

fn use_reserved_cost(&mut self, bundle_cost: u64, working_bank: &Arc<Bank>) {
// If there is enough reserved space left, deduct bundle cost from reserved space before adding
if bundle_cost <= self.current_allocated_cost {
self.current_allocated_cost -= bundle_cost;
let mut dummy_tx_cost = TransactionCost::new_with_capacity(1);
dummy_tx_cost.bpf_execution_cost = bundle_cost;
QosService::remove_transaction_costs(
vec![dummy_tx_cost].iter(),
vec![Ok(())].iter(),
working_bank,
);
info!(
"Bundle uses {bundle_cost} of reserved space. Remaining: {}",
self.current_allocated_cost
);
} else {
info!(
"Bundle Cost {bundle_cost} exceeds available reserved space {}",
self.current_allocated_cost
);
}
}

fn update_reserved_cost(&mut self, working_bank: &Arc<Bank>) {
let target_reserved_cost = self.initial_allocated_cost * working_bank.tick_height()
/ working_bank.ticks_per_slot();
if self.current_allocated_cost > target_reserved_cost {
let mut dummy_tx_cost = TransactionCost::new_with_capacity(1);
dummy_tx_cost.bpf_execution_cost = self.current_allocated_cost - target_reserved_cost;
QosService::remove_transaction_costs(
vec![dummy_tx_cost].iter(),
vec![Ok(())].iter(),
working_bank,
);
debug!(
"Reserved Space Updated. Remaining: {}",
self.current_allocated_cost
);

self.current_allocated_cost = target_reserved_cost;
}
}
}

pub struct BundleStage {
bundle_thread: JoinHandle<()>,
}
Expand Down Expand Up @@ -296,28 +370,16 @@ impl BundleStage {
bank_start: &BankStart,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
max_bundle_retry_duration: &Duration,
reserved_space: &mut u64,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
if sanitized_bundle.transactions.is_empty() {
return Ok(());
}

let tx_costs = qos_service.compute_transaction_costs(sanitized_bundle.transactions.iter());
let bundle_cost = tx_costs.iter().map(|c| c.sum()).sum();
// If there is enough reserved space left, deduct bundle cost from reserved space before adding
if bundle_cost <= *reserved_space {
*reserved_space -= bundle_cost;
let mut dummy_tx_cost = TransactionCost::new_with_capacity(1);
dummy_tx_cost.bpf_execution_cost = bundle_cost;
QosService::remove_transaction_costs(
vec![dummy_tx_cost].iter(),
vec![Ok(())].iter(),
&bank_start.working_bank,
);
info!("Bundle uses {bundle_cost} of reserved space. Remaining: {reserved_space}");
} else {
info!("Bundle Cost {bundle_cost} exceeds available reserved space {reserved_space}");
}
// Use available reserved space
reserved_space.use_reserved_cost(bundle_cost, &bank_start.working_bank);
let (transactions_qos_results, num_included) = qos_service.select_transactions_per_cost(
sanitized_bundle.transactions.iter(),
tx_costs.iter(),
Expand Down Expand Up @@ -913,7 +975,7 @@ impl BundleStage {
last_tip_update_slot: &mut Slot,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut u64,
reserved_space: &mut BundleReservedSpace,
) {
let (sanitized_bundles, sanitized_bundle_elapsed) = measure!(
unprocessed_bundles
Expand Down Expand Up @@ -1086,7 +1148,7 @@ impl BundleStage {
tip_manager: &TipManager,
max_bundle_retry_duration: &Duration,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
reserved_space: &mut u64,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
let initialize_tip_accounts_bundle = SanitizedBundle {
transactions: Self::get_initialize_tip_accounts_transactions(
Expand Down Expand Up @@ -1150,7 +1212,7 @@ impl BundleStage {
max_bundle_retry_duration: &Duration,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut u64,
reserved_space: &mut BundleReservedSpace,
) -> BundleStageResult<()> {
let start_handle_tips = Instant::now();

Expand Down Expand Up @@ -1232,7 +1294,7 @@ impl BundleStage {
last_tip_update_slot: &mut Slot,
bundle_stage_leader_stats: &mut BundleStageLeaderStats,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut u64,
reserved_space: &mut BundleReservedSpace,
) -> Vec<BundleStageResult<()>> {
let tip_pdas = tip_manager.get_tip_accounts();

Expand Down Expand Up @@ -1340,8 +1402,7 @@ impl BundleStage {
bundle_stage_stats: &mut BundleStageLoopStats,
id: u32,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
reserved_space: &mut u64,
preallocated_bundle_cost: u64,
reserved_space: &mut BundleReservedSpace,
) {
const DROP_BUNDLE_SLOT_OFFSET: u64 = 4;

Expand Down Expand Up @@ -1376,28 +1437,7 @@ impl BundleStage {
(_, _) => false,
};
if is_new_slot {
// Reserve space from cost tracker
*reserved_space = preallocated_bundle_cost;
QosService::update_or_remove_transaction_costs(
vec![TransactionCost::new_with_capacity(1)].iter(),
vec![Ok(())].iter(),
Some(&vec![CommitTransactionDetails::Committed {
compute_units: *reserved_space,
}]),
&bank_start.working_bank,
);
info!(
"Slot: {} - Update reserved space to {}. Block Cost {}",
bundle_stage_leader_stats.current_slot.unwrap_or_default(),
*reserved_space,
&bank_start
.working_bank
.read_cost_tracker()
// Using unwrap here is scary, but that's the way it's done in QoS
.unwrap()
.block_cost()
);

reserved_space.reset_reserved_cost(&bank_start.working_bank);
// Re-Buffer any bundles that didn't fit into last block
if !cost_model_failed_bundles.is_empty() {
debug!(
Expand All @@ -1407,6 +1447,8 @@ impl BundleStage {
);
unprocessed_bundles.extend(cost_model_failed_bundles.drain(..));
}
} else {
reserved_space.update_reserved_cost(&bank_start.working_bank);
}

Self::execute_bundles_until_empty_or_end_of_slot(
Expand Down Expand Up @@ -1466,7 +1508,10 @@ impl BundleStage {

let mut unprocessed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
let mut cost_model_failed_bundles: VecDeque<PacketBundle> = VecDeque::with_capacity(1000);
let mut reserved_space = u64::default();
let mut reserved_space = BundleReservedSpace {
current_allocated_cost: preallocated_bundle_cost,
initial_allocated_cost: preallocated_bundle_cost,
};

while !exit.load(Ordering::Relaxed) {
if !unprocessed_bundles.is_empty()
Expand All @@ -1493,7 +1538,6 @@ impl BundleStage {
id,
&block_builder_fee_info,
&mut reserved_space,
preallocated_bundle_cost,
),
"process_buffered_bundles_elapsed"
);
Expand Down Expand Up @@ -1713,7 +1757,10 @@ mod tests {
&bank_start,
&mut bundle_stage_leader_stats,
&TEST_MAX_RETRY_DURATION,
&mut 0,
&mut BundleReservedSpace {
current_allocated_cost: 0,
initial_allocated_cost: 0,
},
);

// This is ugly, not really an option for testing but a test itself.
Expand Down Expand Up @@ -2045,7 +2092,10 @@ mod tests {
&bank_start,
&mut bundle_stage_leader_stats,
&TEST_MAX_RETRY_DURATION,
&mut 0,
&mut BundleReservedSpace {
current_allocated_cost: 0,
initial_allocated_cost: 0,
},
);
info!("test_bundle_max_retries result: {:?}", result);
assert!(matches!(
Expand Down

0 comments on commit c5e2424

Please sign in to comment.