Skip to content

Commit

Permalink
Scheduler adjusts underpriced transaction consumed CUs
Browse files Browse the repository at this point in the history
update consumer test

add test for adjustment

refactor to add adjust_units to CommitTransactionDetails::Committed; Reprots committed details to metrics

fix merge
  • Loading branch information
tao-stones committed Nov 12, 2023
1 parent 1776575 commit b8c709e
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 8 deletions.
92 changes: 90 additions & 2 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ use {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CommitTransactionDetails {
Committed { compute_units: u64 },
Committed {
// actual compute units consumed for executing the committed transaction
executed_units: u64,
// actual micro-second elapsed for executing the committed transaction
executed_us: u64,
// compute units to add to executed_units to compensate under priced CUs
adjust_units: u64,
},
NotCommitted,
}

Expand Down Expand Up @@ -106,7 +113,12 @@ impl Committer {
.iter()
.map(|execution_result| match execution_result.details() {
Some(details) => CommitTransactionDetails::Committed {
compute_units: details.executed_units,
executed_units: details.executed_units,
executed_us: details.executed_us,
adjust_units: Self::adjust_executed_units_for_potential_underpricing(
details.executed_units,
details.executed_us,
),
},
None => CommitTransactionDetails::NotCommitted,
})
Expand Down Expand Up @@ -176,4 +188,80 @@ impl Committer {
);
}
}

// If transaction's actual CU/us ratio is below cluster average COMPUTE_UNIT_TO_US_RATIO,
// it is likely has been under priced. To prevent extending replay time significantly,
// we can pad additional CUs to transaction's actual CUs during packing to compensate
// additional executing time it needs.
// adjustment is u64 for now, meaning only add more CUs when transactions are under priced,
// but not to reduce CU if transactions are over priced.
fn adjust_executed_units_for_potential_underpricing(
executed_units: u64,
executed_us: u64,
) -> u64 {
// "actual executed units" is consistent cross cluster, but "adjustment" are only based
// on current leader node. Add a 50% taper to reduce local variance.
const TAPER: u64 = 2;
solana_runtime::block_cost_limits::COMPUTE_UNIT_TO_US_RATIO
.saturating_mul(executed_us)
.saturating_sub(executed_units)
.saturating_div(TAPER)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_adjust_executed_units_for_potential_underpricing() {
solana_logger::setup();
use solana_runtime::block_cost_limits::COMPUTE_UNIT_TO_US_RATIO;

let executed_cu = 70;

// no adjust for over-pricing
assert_eq!(
0,
Committer::adjust_executed_units_for_potential_underpricing(
executed_cu,
executed_cu / (COMPUTE_UNIT_TO_US_RATIO + 10)
)
);

// adjust for under pricing
let slow_execution_time = executed_cu / (COMPUTE_UNIT_TO_US_RATIO - 10);
let expected_adjustment = ((COMPUTE_UNIT_TO_US_RATIO - executed_cu / slow_execution_time)
* slow_execution_time)
/ 2;
assert_eq!(
expected_adjustment,
Committer::adjust_executed_units_for_potential_underpricing(
executed_cu,
slow_execution_time
)
);

// handle zeros
assert_eq!(
0,
Committer::adjust_executed_units_for_potential_underpricing(0, 0)
);

// the case of extreme underpricing
assert_eq!(
u64::MAX / 2, // tapered in half
Committer::adjust_executed_units_for_potential_underpricing(0, u64::MAX)
);

// No adjustment if executed_units is already MAX
assert_eq!(
0,
Committer::adjust_executed_units_for_potential_underpricing(u64::MAX, u64::MAX)
);
assert_eq!(
0,
Committer::adjust_executed_units_for_potential_underpricing(u64::MAX, 0)
);
}
}
42 changes: 41 additions & 1 deletion core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,14 @@ impl Consumer {
self.qos_service.accumulate_actual_execute_cu(cu);
self.qos_service.accumulate_actual_execute_time(us);

let (committed_cu, adjust_cu, committed_us) =
Self::accumulate_commit_transactions_result(commit_transactions_result.as_ref().ok());
self.qos_service
.accumulate_committed_execute_cu(committed_cu);
self.qos_service
.accumulate_committed_execute_time(committed_us);
self.qos_service.accumulate_committed_adjust_cu(adjust_cu);

// reports qos service stats for this batch
self.qos_service.report_metrics(bank.slot());

Expand Down Expand Up @@ -682,6 +690,34 @@ impl Consumer {
)
}

fn accumulate_commit_transactions_result(
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
) -> (u64, u64, u64) {
if let Some(transaction_committed_status) = transaction_committed_status {
transaction_committed_status.iter().fold(
(0, 0, 0),
|(units, time, adjustment), transaction_committed_details| {
if let CommitTransactionDetails::Committed {
executed_units,
executed_us,
adjust_units,
} = transaction_committed_details
{
(
units.saturating_add(*executed_units),
time.saturating_add(*executed_us),
adjustment.saturating_add(*adjust_units),
)
} else {
(units, time, adjustment)
}
},
)
} else {
(0, 0, 0)
}
}

/// This function filters pending packets that are still valid
/// # Arguments
/// * `transactions` - a batch of transactions deserialized from packets
Expand Down Expand Up @@ -1254,7 +1290,11 @@ mod tests {

let expected_block_cost = if !apply_cost_tracker_during_replay_enabled {
let actual_bpf_execution_cost = match commit_transactions_result.get(0).unwrap() {
CommitTransactionDetails::Committed { compute_units } => *compute_units,
CommitTransactionDetails::Committed {
executed_units,
executed_us: _,
adjust_units,
} => executed_units.saturating_add(*adjust_units),
CommitTransactionDetails::NotCommitted => {
unreachable!()
}
Expand Down
65 changes: 60 additions & 5 deletions core/src/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ impl QosService {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
if let CommitTransactionDetails::Committed { compute_units } =
transaction_committed_details
if let CommitTransactionDetails::Committed {
executed_units,
executed_us: _,
adjust_units,
} = transaction_committed_details
{
cost_tracker.update_execution_cost(tx_cost, *compute_units)
let compute_units = executed_units.saturating_add(*adjust_units);
cost_tracker.update_execution_cost(tx_cost, compute_units)
}
}
});
Expand Down Expand Up @@ -361,6 +365,27 @@ impl QosService {
.fetch_add(micro_sec, Ordering::Relaxed);
}

pub fn accumulate_committed_execute_cu(&self, units: u64) {
self.metrics
.stats
.committed_execute_cu
.fetch_add(units, Ordering::Relaxed);
}

pub fn accumulate_committed_execute_time(&self, micro_sec: u64) {
self.metrics
.stats
.committed_execute_time_us
.fetch_add(micro_sec, Ordering::Relaxed);
}

pub fn accumulate_committed_adjust_cu(&self, units: u64) {
self.metrics
.stats
.committed_adjust_cu
.fetch_add(units, Ordering::Relaxed);
}

// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a>(
Expand Down Expand Up @@ -508,6 +533,15 @@ struct QosServiceMetricsStats {

/// accumulated actual program execute micro-sec that have been packed into block
actual_execute_time_us: AtomicU64,

/// accumulated executtion units for all committed transactions
committed_execute_cu: AtomicU64,

/// accumulated execution time for all committed transactions
committed_execute_time_us: AtomicU64,

/// accumulated adjustment units for committed transactions that might have been under-priced
committed_adjust_cu: AtomicU64,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -608,6 +642,23 @@ impl QosServiceMetrics {
self.stats.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"committed_execute_cu",
self.stats.committed_execute_cu.swap(0, Ordering::Relaxed),
i64
),
(
"committed_execute_time_us",
self.stats
.committed_execute_time_us
.swap(0, Ordering::Relaxed),
i64
),
(
"committed_adjust_cu",
self.stats.committed_adjust_cu.swap(0, Ordering::Relaxed),
i64
),
);
datapoint_info!(
"qos-service-errors",
Expand Down Expand Up @@ -803,8 +854,10 @@ mod tests {
let commited_status: Vec<CommitTransactionDetails> = qos_cost_results
.iter()
.map(|tx_cost| CommitTransactionDetails::Committed {
compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost
executed_units: tx_cost.as_ref().unwrap().bpf_execution_cost
+ execute_units_adjustment,
executed_us: 0,
adjust_units: 0,
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
Expand Down Expand Up @@ -930,8 +983,10 @@ mod tests {
CommitTransactionDetails::NotCommitted
} else {
CommitTransactionDetails::Committed {
compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost
executed_units: tx_cost.as_ref().unwrap().bpf_execution_cost
+ execute_units_adjustment,
executed_us: 0,
adjust_units: 0,
}
}
})
Expand Down
1 change: 1 addition & 0 deletions rpc/src/transaction_status_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ pub(crate) mod tests {
)),
return_data: None,
executed_units: 0,
executed_us: 0,
accounts_data_len_delta: 0,
});

Expand Down
1 change: 1 addition & 0 deletions runtime/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,7 @@ mod tests {
durable_nonce_fee: nonce.map(DurableNonceFee::from),
return_data: None,
executed_units: 0,
executed_us: 0,
accounts_data_len_delta: 0,
},
programs_modified_by_tx: Box::<LoadedProgramsForTxBatch>::default(),
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub struct TransactionExecutionDetails {
pub durable_nonce_fee: Option<DurableNonceFee>,
pub return_data: Option<TransactionReturnData>,
pub executed_units: u64,
pub executed_us: u64,
/// The change in accounts data len for this transaction.
/// NOTE: This value is valid IFF `status` is `Ok`.
pub accounts_data_len_delta: i64,
Expand Down Expand Up @@ -4281,6 +4282,7 @@ impl Bank {
durable_nonce_fee,
return_data,
executed_units,
executed_us: timings.execute_accessories.process_instructions.total_us,
accounts_data_len_delta,
},
programs_modified_by_tx: Box::new(programs_modified_by_tx),
Expand Down
1 change: 1 addition & 0 deletions runtime/src/bank/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn new_execution_result(
durable_nonce_fee: nonce.map(DurableNonceFee::from),
return_data: None,
executed_units: 0,
executed_us: 0,
accounts_data_len_delta: 0,
},
programs_modified_by_tx: Box::<LoadedProgramsForTxBatch>::default(),
Expand Down

0 comments on commit b8c709e

Please sign in to comment.