Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: MultiIterator batch priority guard #33454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 116 additions & 7 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,13 +757,17 @@ mod tests {
self,
state::{AddressLookupTable, LookupTableMeta},
},
compute_budget,
instruction::InstructionError,
message::{v0, v0::MessageAddressTableLookup, MessageHeader, VersionedMessage},
message::{
v0::{self, MessageAddressTableLookup},
Message, MessageHeader, VersionedMessage,
},
poh_config::PohConfig,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
system_transaction,
system_instruction, system_transaction,
transaction::{MessageHash, Transaction, VersionedTransaction},
},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
Expand Down Expand Up @@ -862,10 +866,11 @@ mod tests {
Arc<Bank>,
Arc<RwLock<PohRecorder>>,
Receiver<WorkingBankEntry>,
GenesisConfigInfo,
JoinHandle<()>,
) {
Blockstore::destroy(ledger_path).unwrap();
let genesis_config_info = create_slow_genesis_config(10_000);
let genesis_config_info = create_slow_genesis_config(100_000_000);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
Expand Down Expand Up @@ -905,6 +910,7 @@ mod tests {
bank,
poh_recorder,
entry_receiver,
genesis_config_info,
poh_simulator,
)
}
Expand Down Expand Up @@ -1830,9 +1836,9 @@ mod tests {
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
Expand Down Expand Up @@ -1903,7 +1909,7 @@ mod tests {
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (mut transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
Expand Down Expand Up @@ -1959,7 +1965,7 @@ mod tests {
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
Expand Down Expand Up @@ -2048,6 +2054,109 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_consume_buffered_packets_batch_priority_guard() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test case makes sense to me.

let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (_, bank, poh_recorder, _entry_receiver, genesis_config_info, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();

// Setup transactions:
// [(AB), (BC), (CD)]
// (AB) and (BC) are conflicting, and cannot go into the same batch.
// (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC).
let keypair_a = Keypair::new();
let keypair_b = Keypair::new();
let keypair_c = Keypair::new();
let keypair_d = Keypair::new();
for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] {
bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey())
.unwrap();
}

let make_prioritized_transfer =
|from: &Keypair, to, lamports, priority| -> Transaction {
let ixs = vec![
system_instruction::transfer(&from.pubkey(), to, lamports),
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from.pubkey()));
Transaction::new(&[from], message, bank.last_blockhash())
};

let transactions = vec![
make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3),
make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2),
make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1),
];

let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets,
num_conflicting_transactions,
),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
4 + num_conflicting_transactions as u64 // 4 for funding transfers
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();
Expand Down
Loading