Skip to content

Commit

Permalink
Add new received forwarded packets metric to banking stage (#33414)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Sep 28, 2023
1 parent cc4e928 commit e3cd13e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
47 changes: 27 additions & 20 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct BankingStageStats {
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
dropped_forward_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize,
newly_buffered_forwarded_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
Expand Down Expand Up @@ -147,109 +148,115 @@ impl BankingStageStats {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id as i64, i64),
("id", self.id, i64),
(
"receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"dropped_packets_count",
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64,
self.dropped_packets_count.swap(0, Ordering::Relaxed),
i64
),
(
"dropped_duplicated_packets_count",
self.dropped_duplicated_packets_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"dropped_forward_packets_count",
self.dropped_forward_packets_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed),
i64
),
(
"newly_buffered_forwarded_packets_count",
self.newly_buffered_forwarded_packets_count
.swap(0, Ordering::Relaxed),
i64
),
(
"current_buffered_packets_count",
self.current_buffered_packets_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"rebuffered_packets_count",
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64,
self.rebuffered_packets_count.swap(0, Ordering::Relaxed),
i64
),
(
"consumed_buffered_packets_count",
self.consumed_buffered_packets_count
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"forwarded_transaction_count",
self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64,
self.forwarded_transaction_count.swap(0, Ordering::Relaxed),
i64
),
(
"forwarded_vote_count",
self.forwarded_vote_count.swap(0, Ordering::Relaxed) as i64,
self.forwarded_vote_count.swap(0, Ordering::Relaxed),
i64
),
(
"consume_buffered_packets_elapsed",
self.consume_buffered_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"receive_and_buffer_packets_elapsed",
self.receive_and_buffer_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"filter_pending_packets_elapsed",
self.filter_pending_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"packet_conversion_elapsed",
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed),
i64
),
(
"transaction_processing_elapsed",
self.transaction_processing_elapsed
.swap(0, Ordering::Relaxed) as i64,
.swap(0, Ordering::Relaxed),
i64
),
(
"packet_batch_indices_len_min",
self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64,
self.batch_packet_indexes_len.minimum().unwrap_or(0),
i64
),
(
"packet_batch_indices_len_max",
self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64,
self.batch_packet_indexes_len.maximum().unwrap_or(0),
i64
),
(
"packet_batch_indices_len_mean",
self.batch_packet_indexes_len.mean().unwrap_or(0) as i64,
self.batch_packet_indexes_len.mean().unwrap_or(0),
i64
),
(
"packet_batch_indices_len_90pct",
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64,
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0),
i64
)
);
Expand Down
7 changes: 7 additions & 0 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ impl PacketReceiver {

let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
let mut newly_buffered_forwarded_packets_count = 0;
Self::push_unprocessed(
unprocessed_transaction_storage,
deserialized_packets,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
&mut newly_buffered_forwarded_packets_count,
banking_stage_stats,
slot_metrics_tracker,
tracer_packet_stats,
Expand All @@ -144,6 +146,7 @@ impl PacketReceiver {
deserialized_packets: Vec<ImmutableDeserializedPacket>,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
newly_buffered_forwarded_packets_count: &mut usize,
banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
Expand All @@ -154,6 +157,10 @@ impl PacketReceiver {
.increment(deserialized_packets.len() as u64);

*newly_buffered_packets_count += deserialized_packets.len();
*newly_buffered_forwarded_packets_count += deserialized_packets
.iter()
.filter(|p| p.original_packet().meta().forwarded())
.count();
slot_metrics_tracker
.increment_newly_buffered_packets_count(deserialized_packets.len() as u64);

Expand Down

0 comments on commit e3cd13e

Please sign in to comment.