Skip to content

Commit

Permalink
Move MinMax type into mz-ore
Browse files Browse the repository at this point in the history
This commit moves the `MinMax` type supporting the compute controller
metrics into the mz-ore crate, as it might be generally useful. The type
is also renamed to `SlidingMinMax` to make its purpose more obvious
outside the a metrics context.
  • Loading branch information
teskje committed Aug 26, 2024
1 parent 65f0874 commit f41f2b7
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 157 deletions.
160 changes: 3 additions & 157 deletions src/compute-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec,
HistogramVec, IntCounterVec, MetricVecExt, MetricsRegistry,
};
use mz_ore::stats::histogram_seconds_buckets;
use mz_ore::stats::{histogram_seconds_buckets, SlidingMinMax};
use mz_repr::GlobalId;
use mz_service::codec::StatsCollector;
use prometheus::core::{AtomicF64, AtomicU64};
Expand Down Expand Up @@ -436,7 +436,7 @@ impl ReplicaMetrics {
.metrics
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = MinMax::new(60);
let wallclock_lag_minmax = SlidingMinMax::new(60);

Some(ReplicaCollectionMetrics {
initial_output_duration_seconds,
Expand Down Expand Up @@ -483,7 +483,7 @@ pub(crate) struct ReplicaCollectionMetrics {
wallclock_lag_seconds_count: IntCounter,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: MinMax<f32>,
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl ReplicaCollectionMetrics {
Expand All @@ -504,129 +504,6 @@ impl ReplicaCollectionMetrics {
}
}

/// Keeps track of a the minimum and maximum value over a fixed-size sliding window of samples.
///
/// Inspired by the [`movning_min_max`](https://crates.io/crates/moving_min_max) crate, but
/// adapted to provide:
///
/// * both minimum and maximum at the same time
/// * a fixed window size, rather than manual popping of samples
/// * lower memory usage, due to merged push and pop stacks
#[derive(Debug)]
struct MinMax<T> {
/// The push stack and the pop stack, merged into one allocation.
///
/// The push stack is the first `push_stack_len` items, the pop stack is the rest.
/// The push stack grows to the left, the pop stack grows to the right.
///
/// +--------------------------------+
/// | push stack --> | <-- pop stack |
/// +----------------^---------------^
/// push_stack_len
///
/// We keep both the push stack and the pop stack in the same `Vec`, to guarantee that we only
/// allocate `size_of::<T>() * 3 * window_size` bytes. If we kept two `Vec`s we'd need twice
/// that much.
stacks: Vec<(T, T, T)>,
/// The length of the push stack.
push_stack_len: usize,
}

impl<T> MinMax<T>
where
T: Clone + PartialOrd,
{
/// Creates a new `MinMax` for the given sample window size.
fn new(window_size: usize) -> Self {
Self {
stacks: Vec::with_capacity(window_size),
push_stack_len: 0,
}
}

/// Returns a reference to the item at the top of the push stack.
fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
}

/// Returns a reference to the item at the top of the pop stack.
fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
self.stacks.get(self.push_stack_len)
}

/// Adds the given sample.
fn add_sample(&mut self, sample: T) {
if self.push_stack_len == self.stacks.capacity() {
self.flip_stacks();
}

let (min, max) = match self.top_of_push_stack() {
Some((_, min, max)) => {
let min = po_min(min, &sample).clone();
let max = po_max(max, &sample).clone();
(min, max)
}
None => (sample.clone(), sample.clone()),
};

if self.stacks.len() <= self.push_stack_len {
self.stacks.push((sample, min, max));
} else {
self.stacks[self.push_stack_len] = (sample, min, max);
}
self.push_stack_len += 1;
}

/// Drains the push stack into the pop stack.
fn flip_stacks(&mut self) {
let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
return;
};

self.push_stack_len -= 1;
self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);

while let Some((sample, _, _)) = self.top_of_push_stack() {
let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
let sample = sample.clone();
let min = po_min(min, &sample).clone();
let max = po_max(max, &sample).clone();

self.push_stack_len -= 1;
self.stacks[self.push_stack_len] = (sample, min, max);
}
}

/// Returns the current minimum and maximum values.
fn get(&self) -> Option<(&T, &T)> {
match (self.top_of_push_stack(), self.top_of_pop_stack()) {
(None, None) => None,
(None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
(Some((_, min1, max1)), Some((_, min2, max2))) => {
Some((po_min(min1, min2), po_max(max1, max2)))
}
}
}
}

/// Like `std::cmp::min`, but works with `PartialOrd` values.
fn po_min<T: PartialOrd>(a: T, b: T) -> T {
if a < b {
a
} else {
b
}
}

/// Like `std::cmp::max`, but works with `PartialOrd` values.
fn po_max<T: PartialOrd>(a: T, b: T) -> T {
if a > b {
a
} else {
b
}
}

/// Metrics keyed by `ComputeCommand` type.
#[derive(Debug)]
pub struct CommandMetrics<M> {
Expand Down Expand Up @@ -810,34 +687,3 @@ impl<M> PeekMetrics<M> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[mz_ore::test]
fn minmax() {
let mut minmax = MinMax::new(5);

assert_eq!(minmax.get(), None);

let mut push_and_check = |x, expected| {
minmax.add_sample(x);
let actual = minmax.get().map(|(min, max)| (*min, *max));
assert_eq!(actual, Some(expected), "{minmax:?}");
};

push_and_check(5, (5, 5));
push_and_check(1, (1, 5));
push_and_check(10, (1, 10));
push_and_check(2, (1, 10));
push_and_check(9, (1, 10));
push_and_check(3, (1, 10));
push_and_check(8, (2, 10));
push_and_check(5, (2, 9));
push_and_check(5, (3, 9));
push_and_check(5, (3, 8));
push_and_check(5, (5, 8));
push_and_check(5, (5, 5));
}
}
168 changes: 168 additions & 0 deletions src/ore/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,171 @@ pub const HISTOGRAM_BYTE_BUCKETS: [f64; 7] = [
67108864.0,
1073741824.0,
];

/// Keeps track of the minimum and maximum value over a fixed-size sliding window of samples.
///
/// Inspired by the [`moving_min_max`] crate, see that crate's documentation for a description of
/// the high-level algorithm used here.
///
/// There are two major differences to [`moving_min_max`]:
/// * `SlidingMinMax` tracks both the minimum and the maximum value at the same time.
/// * `SlidingMinMax` assumes a fixed-size window. Pushing new samples automatically pops old ones
/// and there is no support for manually popping samples.
///
/// The memory required for a `SlidingMinMax` value is `size_of::<T> * 3 * window_size`, plus a
/// small constant overhead.
#[derive(Debug)]
pub struct SlidingMinMax<T> {
/// The push stack and the pop stack, merged into one allocation to optimize memory usage.
///
/// The push stack is the first `push_stack_len` items, the pop stack is the rest.
/// The push stack grows to the left, the pop stack grows to the right.
///
/// +--------------------------------+
/// | push stack --> | <-- pop stack |
/// +----------------^---------------^
/// push_stack_len
///
/// New samples are pushed to the push stack, together with the current minimum and maximum
/// values. If the pop stack is not empty, each push implicitly pops an element from the pop
/// stack, by increasing `push_stack_len`. Once the push stack has reached the window size
/// (i.e. the capacity of `stacks`), we "flip" the stacks by converting the push stack into a
/// full pop stack with an inverted order of samples and min/max values. After the flip,
/// `push_stack_len` is zero again, and new samples can be pushed to the push stack.
stacks: Vec<(T, T, T)>,
/// The length of the push stack.
///
/// The top of the push stack is `stacks[push_stack_len - 1]`.
/// The top of the pop stack is `stacks[push_stack_len]`.
push_stack_len: usize,
}

impl<T> SlidingMinMax<T>
where
T: Clone + PartialOrd,
{
/// Creates a new `SlidingMinMax` for the given window size.
pub fn new(window_size: usize) -> Self {
Self {
stacks: Vec::with_capacity(window_size),
push_stack_len: 0,
}
}

/// Returns a reference to the item at the top of the push stack.
fn top_of_push_stack(&self) -> Option<&(T, T, T)> {
self.push_stack_len.checked_sub(1).map(|i| &self.stacks[i])
}

/// Returns a reference to the item at the top of the pop stack.
fn top_of_pop_stack(&self) -> Option<&(T, T, T)> {
self.stacks.get(self.push_stack_len)
}

/// Adds the given sample.
pub fn add_sample(&mut self, sample: T) {
if self.push_stack_len == self.stacks.capacity() {
self.flip_stacks();
}

let (min, max) = match self.top_of_push_stack() {
Some((_, min, max)) => {
let min = po_min(min, &sample).clone();
let max = po_max(max, &sample).clone();
(min, max)
}
None => (sample.clone(), sample.clone()),
};

if self.stacks.len() <= self.push_stack_len {
self.stacks.push((sample, min, max));
} else {
self.stacks[self.push_stack_len] = (sample, min, max);
}
self.push_stack_len += 1;
}

/// Drains the push stack into the pop stack.
fn flip_stacks(&mut self) {
let Some((sample, _, _)) = self.top_of_push_stack().cloned() else {
return;
};

self.push_stack_len -= 1;
self.stacks[self.push_stack_len] = (sample.clone(), sample.clone(), sample);

while let Some((sample, _, _)) = self.top_of_push_stack() {
let (_, min, max) = self.top_of_pop_stack().expect("pop stack not empty");
let sample = sample.clone();
let min = po_min(min, &sample).clone();
let max = po_max(max, &sample).clone();

self.push_stack_len -= 1;
self.stacks[self.push_stack_len] = (sample, min, max);
}
}

/// Returns the current minimum and maximum values.
pub fn get(&self) -> Option<(&T, &T)> {
match (self.top_of_push_stack(), self.top_of_pop_stack()) {
(None, None) => None,
(None, Some((_, min, max))) | (Some((_, min, max)), None) => Some((min, max)),
(Some((_, min1, max1)), Some((_, min2, max2))) => {
Some((po_min(min1, min2), po_max(max1, max2)))
}
}
}
}

/// Like `std::cmp::min`, but works with `PartialOrd` values.
///
/// If `a` and `b` are not comparable, `b` is returned.
fn po_min<T: PartialOrd>(a: T, b: T) -> T {
if a < b {
a
} else {
b
}
}

/// Like `std::cmp::max`, but works with `PartialOrd` values.
///
/// If `a` and `b` are not comparable, `b` is returned.
fn po_max<T: PartialOrd>(a: T, b: T) -> T {
if a > b {
a
} else {
b
}
}

#[cfg(test)]
mod tests {
use super::*;

#[mz_ore::test]
fn minmax() {
let mut minmax = SlidingMinMax::new(5);

assert_eq!(minmax.get(), None);

let mut push_and_check = |x, expected| {
minmax.add_sample(x);
let actual = minmax.get().map(|(min, max)| (*min, *max));
assert_eq!(actual, Some(expected), "{minmax:?}");
};

push_and_check(5, (5, 5));
push_and_check(1, (1, 5));
push_and_check(10, (1, 10));
push_and_check(2, (1, 10));
push_and_check(9, (1, 10));
push_and_check(3, (1, 10));
push_and_check(8, (2, 10));
push_and_check(5, (2, 9));
push_and_check(5, (3, 9));
push_and_check(5, (3, 8));
push_and_check(5, (5, 8));
push_and_check(5, (5, 5));
}
}

0 comments on commit f41f2b7

Please sign in to comment.