Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransactionScheduler: Scheduler Count and Timing metrics #33893

Merged
merged 5 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,20 +197,30 @@ impl PrioGraphScheduler {
}

/// Receive completed batches of transactions without blocking.
/// Returns (num_transactions, num_retryable_transactions) on success.
pub fn receive_completed(
&mut self,
container: &mut TransactionStateContainer,
) -> Result<(), SchedulerError> {
while self.try_receive_completed(container)? {}
Ok(())
) -> Result<(usize, usize), SchedulerError> {
let mut total_num_transactions = 0;
let mut total_num_retryable = 0;
loop {
let (num_transactions, num_retryable) = self.try_receive_completed(container)?;
if num_transactions == 0 {
break;
}
total_num_transactions += num_transactions;
total_num_retryable += num_retryable;
}
Ok((total_num_transactions, total_num_retryable))
tao-stones marked this conversation as resolved.
Show resolved Hide resolved
}

/// Receive completed batches of transactions.
/// Returns `Ok(true)` if a batch was received, `Ok(false)` if no batch was received.
/// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received.
fn try_receive_completed(
&mut self,
container: &mut TransactionStateContainer,
) -> Result<bool, SchedulerError> {
) -> Result<(usize, usize), SchedulerError> {
match self.finished_consume_work_receiver.try_recv() {
Ok(FinishedConsumeWork {
work:
Expand All @@ -222,6 +232,9 @@ impl PrioGraphScheduler {
},
retryable_indexes,
}) => {
let num_transactions = ids.len();
let num_retryable = retryable_indexes.len();

// Free the locks
self.complete_batch(batch_id, &transactions);

Expand All @@ -246,9 +259,9 @@ impl PrioGraphScheduler {
container.remove_by_id(&id);
}

Ok(true)
Ok((num_transactions, num_retryable))
}
Err(TryRecvError::Empty) => Ok(false),
Err(TryRecvError::Empty) => Ok((0, 0)),
Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel(
"finished consume work",
)),
Expand Down
217 changes: 200 additions & 17 deletions core/src/banking_stage/transaction_scheduler/scheduler_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use {
TOTAL_BUFFERED_PACKETS,
},
crossbeam_channel::RecvTimeoutError,
solana_measure::measure_us,
solana_runtime::bank_forks::BankForks,
solana_sdk::{saturating_add_assign, timing::AtomicInterval},
std::{
sync::{Arc, RwLock},
time::Duration,
Expand All @@ -36,6 +38,10 @@ pub(crate) struct SchedulerController {
container: TransactionStateContainer,
/// State for scheduling and communicating with worker threads.
scheduler: PrioGraphScheduler,
/// Metrics tracking counts on transactions in different states.
count_metrics: SchedulerCountMetrics,
/// Metrics tracking time spent in different code sections.
timing_metrics: SchedulerTimingMetrics,
}

impl SchedulerController {
Expand All @@ -52,6 +58,8 @@ impl SchedulerController {
transaction_id_generator: TransactionIdGenerator::default(),
container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS),
scheduler,
count_metrics: SchedulerCountMetrics::default(),
timing_metrics: SchedulerTimingMetrics::default(),
}
}

Expand All @@ -67,13 +75,21 @@ impl SchedulerController {
// `Forward` will drop packets from the buffer instead of forwarding.
// During receiving, since packets would be dropped from buffer anyway, we can
// bypass sanitization and buffering and immediately drop the packets.
let decision = self.decision_maker.make_consume_or_forward_decision();
let (decision, decision_time_us) =
measure_us!(self.decision_maker.make_consume_or_forward_decision());
saturating_add_assign!(self.timing_metrics.decision_time_us, decision_time_us);

self.process_transactions(&decision)?;
self.scheduler.receive_completed(&mut self.container)?;
if !self.receive_packets(&decision) {
self.receive_completed()?;
if !self.receive_and_buffer_packets(&decision) {
break;
}

// Report metrics only if there is data.
// Reset intervals when appropriate, regardless of report.
let should_report = self.count_metrics.has_data();
self.count_metrics.maybe_report_and_reset(should_report);
self.timing_metrics.maybe_report_and_reset(should_report);
}

Ok(())
Expand All @@ -86,10 +102,14 @@ impl SchedulerController {
) -> Result<(), SchedulerError> {
match decision {
BufferedPacketsDecision::Consume(_bank_start) => {
let _num_scheduled = self.scheduler.schedule(&mut self.container)?;
let (num_scheduled, schedule_time_us) =
measure_us!(self.scheduler.schedule(&mut self.container)?);
saturating_add_assign!(self.count_metrics.num_scheduled, num_scheduled);
saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us);
}
BufferedPacketsDecision::Forward => {
self.clear_container();
let (_, clear_time_us) = measure_us!(self.clear_container());
saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us);
}
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {}
}
Expand All @@ -102,11 +122,25 @@ impl SchedulerController {
fn clear_container(&mut self) {
while let Some(id) = self.container.pop() {
self.container.remove_by_id(&id.id);
saturating_add_assign!(self.count_metrics.num_dropped_on_clear, 1);
}
}

/// Receives completed transactions from the workers and updates metrics.
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
let ((num_transactions, num_retryable), receive_completed_time_us) =
measure_us!(self.scheduler.receive_completed(&mut self.container)?);
saturating_add_assign!(self.count_metrics.num_finished, num_transactions);
saturating_add_assign!(self.count_metrics.num_retryable, num_retryable);
saturating_add_assign!(
self.timing_metrics.receive_completed_time_us,
receive_completed_time_us
);
Ok(())
}

/// Returns whether the packet receiver is still connected.
fn receive_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
let remaining_queue_capacity = self.container.remaining_queue_capacity();

const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100);
Expand All @@ -125,17 +159,29 @@ impl SchedulerController {
}
};

let received_packet_results = self
let (received_packet_results, receive_time_us) = measure_us!(self
.packet_receiver
.receive_packets(recv_timeout, remaining_queue_capacity);

match (received_packet_results, should_buffer) {
(Ok(receive_packet_results), true) => {
self.buffer_packets(receive_packet_results.deserialized_packets)
.receive_packets(recv_timeout, remaining_queue_capacity));
saturating_add_assign!(self.timing_metrics.receive_time_us, receive_time_us);

match received_packet_results {
Ok(receive_packet_results) => {
let num_received_packets = receive_packet_results.deserialized_packets.len();
saturating_add_assign!(self.count_metrics.num_received, num_received_packets);
if should_buffer {
let (_, buffer_time_us) = measure_us!(
self.buffer_packets(receive_packet_results.deserialized_packets)
);
saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us);
} else {
saturating_add_assign!(
self.count_metrics.num_dropped_on_receive,
num_received_packets
);
}
}
(Ok(receive_packet_results), false) => drop(receive_packet_results),
(Err(RecvTimeoutError::Timeout), _) => {}
(Err(RecvTimeoutError::Disconnected), _) => return false,
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => return false,
}

true
Expand All @@ -151,6 +197,7 @@ impl SchedulerController {
let Some(transaction) =
packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref())
else {
saturating_add_assign!(self.count_metrics.num_dropped_on_sanitization, 1);
continue;
};

Expand All @@ -160,13 +207,149 @@ impl SchedulerController {
max_age_slot: last_slot_in_epoch,
};
let transaction_priority_details = packet.priority_details();
self.container.insert_new_transaction(
if self.container.insert_new_transaction(
transaction_id,
transaction_ttl,
transaction_priority_details,
);
) {
saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1);
}
saturating_add_assign!(self.count_metrics.num_buffered, 1);
}
}
}

#[derive(Default)]
struct SchedulerCountMetrics {
interval: AtomicInterval,

/// Number of packets received.
num_received: usize,
/// Number of packets buffered.
num_buffered: usize,

/// Number of transactions scheduled.
num_scheduled: usize,
/// Number of completed transactions received from workers.
num_finished: usize,
/// Number of transactions that were retryable.
num_retryable: usize,

/// Number of transactions that were immediately dropped on receive.
num_dropped_on_receive: usize,
/// Number of transactions that were dropped due to sanitization failure.
num_dropped_on_sanitization: usize,
/// Number of transactions that were dropped due to clearing.
num_dropped_on_clear: usize,
/// Number of transactions that were dropped due to exceeded capacity.
num_dropped_on_capacity: usize,
}
tao-stones marked this conversation as resolved.
Show resolved Hide resolved

impl SchedulerCountMetrics {
fn maybe_report_and_reset(&mut self, should_report: bool) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS) {
if should_report {
self.report();
}
self.reset();
}
}

fn report(&self) {
datapoint_info!(
"banking_stage_scheduler_counts",
("num_received", self.num_received, i64),
("num_buffered", self.num_buffered, i64),
("num_scheduled", self.num_scheduled, i64),
("num_finished", self.num_finished, i64),
("num_retryable", self.num_retryable, i64),
("num_dropped_on_receive", self.num_dropped_on_receive, i64),
(
"num_dropped_on_sanitization",
self.num_dropped_on_sanitization,
i64
),
("num_dropped_on_clear", self.num_dropped_on_clear, i64),
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64)
);
}

fn has_data(&self) -> bool {
self.num_received != 0
|| self.num_buffered != 0
|| self.num_scheduled != 0
|| self.num_finished != 0
|| self.num_retryable != 0
|| self.num_dropped_on_receive != 0
|| self.num_dropped_on_sanitization != 0
|| self.num_dropped_on_clear != 0
|| self.num_dropped_on_capacity != 0
}

fn reset(&mut self) {
self.num_received = 0;
self.num_buffered = 0;
self.num_scheduled = 0;
self.num_finished = 0;
self.num_retryable = 0;
self.num_dropped_on_receive = 0;
self.num_dropped_on_sanitization = 0;
self.num_dropped_on_clear = 0;
self.num_dropped_on_capacity = 0;
}
}

#[derive(Default)]
struct SchedulerTimingMetrics {
interval: AtomicInterval,
/// Time spent making processing decisions.
decision_time_us: u64,
/// Time spent receiving packets.
receive_time_us: u64,
/// Time spent buffering packets.
buffer_time_us: u64,
/// Time spent scheduling transactions.
schedule_time_us: u64,
/// Time spent clearing transactions from the container.
clear_time_us: u64,
/// Time spent receiving completed transactions.
receive_completed_time_us: u64,
}

impl SchedulerTimingMetrics {
fn maybe_report_and_reset(&mut self, should_report: bool) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS) {
if should_report {
self.report();
}
self.reset();
}
}

fn report(&self) {
datapoint_info!(
"banking_stage_scheduler_timing",
("decision_time", self.decision_time_us, i64),
("receive_time", self.receive_time_us, i64),
("buffer_time", self.buffer_time_us, i64),
("schedule_time", self.schedule_time_us, i64),
("clear_time", self.clear_time_us, i64),
(
"receive_completed_time",
self.receive_completed_time_us,
i64
)
);
}

fn reset(&mut self) {
self.receive_time_us = 0;
self.buffer_time_us = 0;
self.schedule_time_us = 0;
self.receive_completed_time_us = 0;
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ impl TransactionStateContainer {
}

/// Insert a new transaction into the container's queues and maps.
/// Returns `true` if a packet was dropped due to capacity limits.
pub(crate) fn insert_new_transaction(
&mut self,
transaction_id: TransactionId,
transaction_ttl: SanitizedTransactionTTL,
transaction_priority_details: TransactionPriorityDetails,
) {
) -> bool {
let priority_id =
TransactionPriorityId::new(transaction_priority_details.priority, transaction_id);
self.id_to_transaction_state.insert(
Expand All @@ -151,12 +152,15 @@ impl TransactionStateContainer {

/// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority
/// transaction will be dropped (removed from the queue and map).
pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) {
/// Returns `true` if a packet was dropped due to capacity limits.
pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool {
if self.remaining_queue_capacity() == 0 {
let popped_id = self.priority_queue.push_pop_min(priority_id);
self.remove_by_id(&popped_id.id);
true
} else {
self.priority_queue.push(priority_id);
false
}
}

Expand Down