Skip to content

Commit

Permalink
metrics: add rates of queries per second
Browse files Browse the repository at this point in the history
Implement rates similar to those available in cpp-driver.
Adjust documentation.
  • Loading branch information
QuerthDP committed Jan 7, 2025
1 parent e996f7e commit f9a7153
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/source/metrics/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ They can be accessed at any moment using `Session::get_metrics()`
* Number of errors during paged queries
* Number of retries
* Latency histogram statistics (min, max, mean, standard deviation, percentiles)
* Rates of queries per second in various time frames

### Example
```rust
Expand Down Expand Up @@ -44,6 +45,11 @@ println!("95th percentile: {}", snapshot.percentile_95);
println!("98th percentile: {}", snapshot.percentile_98);
println!("99th percentile: {}", snapshot.percentile_99);
println!("99.9th percentile: {}", snapshot.percentile_99_9);

println!("Mean rate: {}", metrics.get_mean_rate());
println!("One minute rate: {}", metrics.get_one_minute_rate());
println!("Five minute rate: {}", metrics.get_five_minute_rate());
println!("Fifteen minute rate: {}", metrics.get_fifteen_minute_rate());
# Ok(())
# }
```
5 changes: 5 additions & 0 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ async fn main() -> Result<()> {
println!("99th percentile: {}", snapshot.percentile_99);
println!("99.9th percentile: {}", snapshot.percentile_99_9);

println!("Mean rate: {}", metrics.get_mean_rate());
println!("One minute rate: {}", metrics.get_one_minute_rate());
println!("Five minute rate: {}", metrics.get_five_minute_rate());
println!("Fifteen minute rate: {}", metrics.get_fifteen_minute_rate());

println!("Ok.");

Ok(())
Expand Down
164 changes: 163 additions & 1 deletion scylla/src/transport/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::transport::lock_free_histogram::{LFError, LockFreeHistogram};
use histogram::Histogram;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;

Expand Down Expand Up @@ -33,6 +33,144 @@ pub struct Snapshot {
pub percentile_99_9: u64,
}

const INTERVAL: u64 = 5;

#[derive(Debug)]
struct ExponentiallyWeightedMovingAverage {
alpha: f64,
uncounted: AtomicU64,
is_initialized: AtomicBool,
rate: AtomicU64,
}

impl ExponentiallyWeightedMovingAverage {
fn new(alpha: f64) -> Self {
Self {
alpha,
uncounted: AtomicU64::new(0),
is_initialized: AtomicBool::new(false),
rate: AtomicU64::new(0),
}
}

fn rate(&self) -> f64 {
f64::from_bits(self.rate.load(Ordering::Acquire))
}

fn update(&self) {
self.uncounted.fetch_add(1, ORDER_TYPE);
}

fn tick(&self) {
let count = self.uncounted.swap(0, ORDER_TYPE);
let instant_rate = count as f64 / INTERVAL as f64;

if self.is_initialized.load(Ordering::Acquire) {
let rate = f64::from_bits(self.rate.load(Ordering::Acquire));
self.rate.store(
f64::to_bits(rate + self.alpha * (instant_rate - rate)),
Ordering::Release,
);
} else {
self.rate
.store(f64::to_bits(instant_rate), Ordering::Release);
self.is_initialized.store(true, Ordering::Release);
}
}
}

#[derive(Debug)]
struct Meter {
one_minute_rate: ExponentiallyWeightedMovingAverage,
five_minute_rate: ExponentiallyWeightedMovingAverage,
fifteen_minute_rate: ExponentiallyWeightedMovingAverage,
count: AtomicU64,
start_time: std::time::Instant,
last_tick: AtomicU64,
}

impl Meter {
fn new() -> Self {
let now = std::time::Instant::now();
Self {
one_minute_rate: ExponentiallyWeightedMovingAverage::new(
1.0 - (-(INTERVAL as f64) / 60.0 / 1.0).exp(),
),
five_minute_rate: ExponentiallyWeightedMovingAverage::new(
1.0 - (-(INTERVAL as f64) / 60.0 / 5.0).exp(),
),
fifteen_minute_rate: ExponentiallyWeightedMovingAverage::new(
1.0 - (-(INTERVAL as f64) / 60.0 / 15.0).exp(),
),
count: AtomicU64::new(0),
start_time: now,
last_tick: AtomicU64::new(now.elapsed().as_nanos() as u64),
}
}

fn mark(&self) {
self.tick_if_necessary();
self.count.fetch_add(1, ORDER_TYPE);
self.one_minute_rate.update();
self.five_minute_rate.update();
self.fifteen_minute_rate.update();
}

fn one_minute_rate(&self) -> f64 {
self.one_minute_rate.rate()
}

fn five_minute_rate(&self) -> f64 {
self.five_minute_rate.rate()
}

fn fifteen_minute_rate(&self) -> f64 {
self.fifteen_minute_rate.rate()
}

fn mean_rate(&self) -> f64 {
let count = self.count();
if count == 0 {
0.0
} else {
let elapsed = self.start_time.elapsed().as_secs_f64();
count as f64 / elapsed
}
}

fn count(&self) -> u64 {
self.count.load(ORDER_TYPE)
}

fn tick_if_necessary(&self) {
let old_tick = self.last_tick.load(ORDER_TYPE);
let new_tick = self.start_time.elapsed().as_nanos() as u64;
let elapsed = new_tick - old_tick;

if elapsed > INTERVAL * 1_000_000_000 {
let new_interval_start_tick = new_tick - elapsed % (INTERVAL * 1_000_000_000);
if self
.last_tick
.compare_exchange(old_tick, new_interval_start_tick, ORDER_TYPE, ORDER_TYPE)
.is_ok()
{
let required_ticks = elapsed / (INTERVAL * 1_000_000_000);
for _ in 0..required_ticks {
self.one_minute_rate.tick();
self.five_minute_rate.tick();
self.fifteen_minute_rate.tick();
}
}
}
}
}

impl Default for Meter {
fn default() -> Self {
Self::new()
}
}

#[derive(Default, Debug)]
pub struct Metrics {
errors_num: AtomicU64,
Expand All @@ -41,6 +179,7 @@ pub struct Metrics {
queries_iter_num: AtomicU64,
retries_num: AtomicU64,
histogram: Arc<LockFreeHistogram>,
meter: Meter,
}

impl Metrics {
Expand All @@ -52,6 +191,7 @@ impl Metrics {
queries_iter_num: AtomicU64::new(0),
retries_num: AtomicU64::new(0),
histogram: Arc::new(LockFreeHistogram::default()),
meter: Meter::new(),
}
}

Expand All @@ -63,6 +203,7 @@ impl Metrics {
/// Increments counter for nonpaged queries.
pub(crate) fn inc_total_nonpaged_queries(&self) {
self.queries_num.fetch_add(1, ORDER_TYPE);
self.meter.mark();
}

/// Increments counter for errors that occurred in paged queries.
Expand All @@ -74,6 +215,7 @@ impl Metrics {
/// If query_iter would return 4 pages then this counter should be incremented 4 times.
pub(crate) fn inc_total_paged_queries(&self) {
self.queries_iter_num.fetch_add(1, ORDER_TYPE);
self.meter.mark();
}

/// Increments counter measuring how many times a retry policy has decided to retry a query
Expand Down Expand Up @@ -162,6 +304,26 @@ impl Metrics {
self.retries_num.load(ORDER_TYPE)
}

/// Returns mean rate of queries per second
pub fn get_mean_rate(&self) -> f64 {
self.meter.mean_rate()
}

/// Returns one-minute rate of queries per second
pub fn get_one_minute_rate(&self) -> f64 {
self.meter.one_minute_rate()
}

/// Returns five-minute rate of queries per second
pub fn get_five_minute_rate(&self) -> f64 {
self.meter.five_minute_rate()
}

/// Returns fifteen-minute rate of queries per second
pub fn get_fifteen_minute_rate(&self) -> f64 {
self.meter.fifteen_minute_rate()
}

/// Metric implementations
fn mean(h: Histogram) -> Result<u64, MetricsError> {
Expand Down

0 comments on commit f9a7153

Please sign in to comment.