Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: MultiIterator batch priority guard #33454

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 117 additions & 7 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl Consumer {
// Clear payload for next iteration
payload.sanitized_transactions.clear();
payload.account_locks.clear();
payload.batch_priority_guard.clear();

let ProcessTransactionsSummary {
reached_max_poh_height,
Expand Down Expand Up @@ -757,13 +758,17 @@ mod tests {
self,
state::{AddressLookupTable, LookupTableMeta},
},
compute_budget,
instruction::InstructionError,
message::{v0, v0::MessageAddressTableLookup, MessageHeader, VersionedMessage},
message::{
v0::{self, MessageAddressTableLookup},
Message, MessageHeader, VersionedMessage,
},
poh_config::PohConfig,
pubkey::Pubkey,
signature::Keypair,
signer::Signer,
system_transaction,
system_instruction, system_transaction,
transaction::{MessageHash, Transaction, VersionedTransaction},
},
solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta},
Expand Down Expand Up @@ -862,10 +867,11 @@ mod tests {
Arc<Bank>,
Arc<RwLock<PohRecorder>>,
Receiver<WorkingBankEntry>,
GenesisConfigInfo,
JoinHandle<()>,
) {
Blockstore::destroy(ledger_path).unwrap();
let genesis_config_info = create_slow_genesis_config(10_000);
let genesis_config_info = create_slow_genesis_config(100_000_000);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
Expand Down Expand Up @@ -905,6 +911,7 @@ mod tests {
bank,
poh_recorder,
entry_receiver,
genesis_config_info,
poh_simulator,
)
}
Expand Down Expand Up @@ -1830,9 +1837,9 @@ mod tests {
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
Expand Down Expand Up @@ -1903,7 +1910,7 @@ mod tests {
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (mut transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
Expand Down Expand Up @@ -1959,7 +1966,7 @@ mod tests {
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();
let num_conflicting_transactions = transactions.len();
Expand Down Expand Up @@ -2048,6 +2055,109 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_consume_buffered_packets_batch_priority_guard() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test case makes sense to me.

let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (_, bank, poh_recorder, _entry_receiver, genesis_config_info, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let recorder = poh_recorder.read().unwrap().new_recorder();

// Setup transactions:
// [(AB), (BC), (CD)]
// (AB) and (BC) are conflicting, and cannot go into the same batch.
// (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC).
let keypair_a = Keypair::new();
let keypair_b = Keypair::new();
let keypair_c = Keypair::new();
let keypair_d = Keypair::new();
for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] {
bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey())
.unwrap();
}

let make_prioritized_transfer =
|from: &Keypair, to, lamports, priority| -> Transaction {
let ixs = vec![
system_instruction::transfer(&from.pubkey(), to, lamports),
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority),
];
let message = Message::new(&ixs, Some(&from.pubkey()));
Transaction::new(&[from], message, bank.last_blockhash())
};

let transactions = vec![
make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3),
make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2),
make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1),
];

let num_conflicting_transactions = transactions.len();
let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets,
num_conflicting_transactions,
),
ThreadType::Transactions,
);

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);

// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
4 + num_conflicting_transactions as u64 // 4 for funding transfers
);

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_accumulate_execute_units_and_time() {
let mut execute_timings = ExecuteTimings::default();
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_stage/read_write_account_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ReadWriteAccountSet {
}

/// Check if a sanitized message's account locks are available.
fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool {
pub(crate) fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool {
!message
.account_keys()
.iter()
Expand All @@ -64,7 +64,7 @@ impl ReadWriteAccountSet {
}

/// Insert the read and write locks for a sanitized message.
fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) {
pub(crate) fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) {
message
.account_keys()
.iter()
Expand Down
49 changes: 33 additions & 16 deletions core/src/banking_stage/unprocessed_transaction_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
},
itertools::Itertools,
min_max_heap::MinMaxHeap,
solana_measure::measure,
solana_measure::{measure, measure_us},
solana_runtime::bank::Bank,
solana_sdk::{
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, feature_set::FeatureSet, hash::Hash,
Expand Down Expand Up @@ -133,6 +133,7 @@ fn filter_processed_packets<'a, F>(
pub struct ConsumeScannerPayload<'a> {
pub reached_end_of_slot: bool,
pub account_locks: ReadWriteAccountSet,
pub batch_priority_guard: ReadWriteAccountSet,
pub sanitized_transactions: Vec<SanitizedTransaction>,
pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
pub message_hash_to_transaction: &'a mut HashMap<Hash, DeserializedPacket>,
Expand All @@ -149,18 +150,11 @@ fn consume_scan_should_process_packet(
return ProcessingDecision::Now;
}

// Before sanitization, let's quickly check the static keys (performance optimization)
let message = &packet.transaction().get_message().message;
if !payload.account_locks.check_static_account_locks(message) {
return ProcessingDecision::Later;
tao-stones marked this conversation as resolved.
Show resolved Hide resolved
}

// Try to deserialize the packet
let (maybe_sanitized_transaction, sanitization_time) = measure!(
// Try to sanitize the packet
let (maybe_sanitized_transaction, sanitization_time_us) = measure_us!(
packet.build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank)
);

let sanitization_time_us = sanitization_time.as_us();
payload
.slot_metrics_tracker
.increment_transactions_from_packets_us(sanitization_time_us);
Expand All @@ -181,13 +175,35 @@ fn consume_scan_should_process_packet(
payload
.message_hash_to_transaction
.remove(packet.message_hash());
ProcessingDecision::Never
} else if payload.account_locks.try_locking(message) {
payload.sanitized_transactions.push(sanitized_transaction);
ProcessingDecision::Now
} else {
ProcessingDecision::Later
return ProcessingDecision::Never;
}

if !payload
.account_locks
.check_sanitized_message_account_locks(message)
{
payload
.batch_priority_guard
.add_sanitized_message_account_locks(message);
return ProcessingDecision::Later;
}

if !payload
.batch_priority_guard
.check_sanitized_message_account_locks(message)
{
payload
.batch_priority_guard
.add_sanitized_message_account_locks(message);
return ProcessingDecision::Later;
}

payload
.account_locks
.add_sanitized_message_account_locks(message);

payload.sanitized_transactions.push(sanitized_transaction);
ProcessingDecision::Now
} else {
payload
.message_hash_to_transaction
Expand All @@ -212,6 +228,7 @@ where
let payload = ConsumeScannerPayload {
reached_end_of_slot: false,
account_locks: ReadWriteAccountSet::default(),
batch_priority_guard: ReadWriteAccountSet::default(),
sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE),
slot_metrics_tracker,
message_hash_to_transaction,
Expand Down