Skip to content

Commit

Permalink
track banking stage performance using random txn mask
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Jan 16, 2024
1 parent 4252ce8 commit 37a6730
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
20 changes: 20 additions & 0 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,26 @@ impl Consumer {
.slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);

// Now we track the performance for the interested transactions which is not in the retryable_transaction_indexes
// We assume the retryable_transaction_indexes is already sorted, double check
for (index, packet) in packets_to_process.iter().enumerate() {
if packet.original_packet().meta().is_tracer_packet() {
if let Some(start_time) = packet.start_time() {
if retryable_transaction_indexes.binary_search(&index).is_err() {
let duration = Instant::now() - start_time.clone();

debug!(
"Banking stage processing took {duration:?} for transaction {:?}",
packet.transaction().get_signatures().get(0)
);
inc_new_counter_info!(
"txn-metrics-banking-stage-process-us",
duration.as_micros() as usize
);
}
}
}
}
Some(retryable_transaction_indexes)
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/banking_stage/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
VersionedTransaction,
},
},
std::{cmp::Ordering, mem::size_of, sync::Arc},
std::{cmp::Ordering, mem::size_of, sync::Arc, time::Instant},
thiserror::Error,
};

Expand Down Expand Up @@ -43,10 +43,16 @@ pub struct ImmutableDeserializedPacket {
message_hash: Hash,
is_simple_vote: bool,
priority_details: TransactionPriorityDetails,
banking_stage_start_time: Option<Instant>,
}

impl ImmutableDeserializedPacket {
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
let banking_stage_start_time = if packet.meta().is_tracer_packet() {
Some(Instant::now())
} else {
None
};
let versioned_transaction: VersionedTransaction = packet.deserialize_slice(..)?;
let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?;
let message_bytes = packet_message(&packet)?;
Expand All @@ -69,6 +75,7 @@ impl ImmutableDeserializedPacket {
message_hash,
is_simple_vote,
priority_details,
banking_stage_start_time,
})
}

Expand Down Expand Up @@ -100,6 +107,10 @@ impl ImmutableDeserializedPacket {
self.priority_details.clone()
}

pub fn start_time(&self) -> &Option<Instant> {
&self.banking_stage_start_time
}

// This function deserializes packets into transactions, computes the blake3 hash of transaction
// messages, and verifies secp256k1 instructions.
pub fn build_sanitized_transaction(
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/transaction/versioned/sanitized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl SanitizedVersionedTransaction {
&self.message
}

pub fn get_signatures(&self) -> &Vec<Signature> {
&self.signatures
}

/// Consumes the SanitizedVersionedTransaction, returning the fields individually.
pub fn destruct(self) -> (Vec<Signature>, SanitizedVersionedMessage) {
(self.signatures, self.message)
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ fn track_streamer_fetch_packet_performance(
if let Some(start_time) = packet_perf_measure.remove(signature) {
let duration = Instant::now() - start_time;
debug!(
"QUIC streamer fetch stage takes {duration:?} for transaction {:?}",
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(signature.clone())
);
inc_new_counter_info!(
Expand Down

0 comments on commit 37a6730

Please sign in to comment.