Skip to content

Commit

Permalink
Bugfix: MultiIterator batch priority guard (#33454)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Oct 5, 2023
1 parent 9c2663f commit 2a17be0
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 140 deletions.
123 changes: 116 additions & 7 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,13 +757,17 @@ mod tests {
self,
state::{AddressLookupTable, LookupTableMeta},
},
compute_budget,
instruction::InstructionError,
message::{v0, v0::MessageAddressTableLookup, MessageHeader, VersionedMessage},
message::{
v0::{self, MessageAddressTableLookup},
Message, MessageHeader, VersionedMessage,
},
poh_config::PohConfig,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
system_transaction,
system_instruction, system_transaction,
transaction::{MessageHash, Transaction, VersionedTransaction},
},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
Expand Down Expand Up @@ -862,10 +866,11 @@ mod tests {
Arc<Bank>,
Arc<RwLock<PohRecorder>>,
Receiver<WorkingBankEntry>,
GenesisConfigInfo,
JoinHandle<()>,
) {
Blockstore::destroy(ledger_path).unwrap();
let genesis_config_info = create_slow_genesis_config(10_000);
let genesis_config_info = create_slow_genesis_config(100_000_000);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
Expand Down Expand Up @@ -905,6 +910,7 @@ mod tests {
bank,
poh_recorder,
entry_receiver,
genesis_config_info,
poh_simulator,
)
}
Expand Down Expand Up @@ -1830,9 +1836,9 @@ mod tests {
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
Expand Down Expand Up @@ -1903,7 +1909,7 @@ mod tests {
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (mut transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
Expand Down Expand Up @@ -1959,7 +1965,7 @@ mod tests {
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
Expand Down Expand Up @@ -2048,6 +2054,109 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_consume_buffered_packets_batch_priority_guard() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (_, bank, poh_recorder, _entry_receiver, genesis_config_info, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();

// Setup transactions:
// [(AB), (BC), (CD)]
// (AB) and (BC) are conflicting, and cannot go into the same batch.
// (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC).
let keypair_a = Keypair::new();
let keypair_b = Keypair::new();
let keypair_c = Keypair::new();
let keypair_d = Keypair::new();
for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] {
bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey())
.unwrap();
}

let make_prioritized_transfer =
|from: &Keypair, to, lamports, priority| -> Transaction {
let ixs = vec![
system_instruction::transfer(&from.pubkey(), to, lamports),
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from.pubkey()));
Transaction::new(&[from], message, bank.last_blockhash())
};

let transactions = vec![
make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3),
make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2),
make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1),
];

let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets,
num_conflicting_transactions,
),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
4 + num_conflicting_transactions as u64 // 4 for funding transfers
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();
Expand Down
Loading

0 comments on commit 2a17be0

Please sign in to comment.