From ae7186475a0397406c4589c80a69d6940fcca9d2 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 4 Sep 2023 17:09:15 +0200 Subject: [PATCH 1/2] ref(metrics): Move Bucket type to new module --- relay-metrics/src/aggregation.rs | 1004 +++--------------------------- relay-metrics/src/bucket.rs | 865 +++++++++++++++++++++++++ relay-metrics/src/lib.rs | 2 + 3 files changed, 943 insertions(+), 928 deletions(-) create mode 100644 relay-metrics/src/bucket.rs diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index a610eda270..7a300b791c 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -1,12 +1,11 @@ use std::collections::hash_map::Entry; -use std::collections::{btree_map, BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::error::Error; use std::hash::Hasher; use std::iter::{FromIterator, FusedIterator}; use std::time::Duration; use std::{fmt, mem}; -use float_ord::FloatOrd; use fnv::FnvHasher; use relay_base_schema::project::ProjectKey; use relay_common::time::{MonotonicResult, UnixTimestamp}; @@ -23,8 +22,7 @@ use crate::statsd::{ MetricTimers, }; use crate::{ - protocol, CounterType, DistributionType, GaugeType, Metric, MetricNamespace, - MetricResourceIdentifier, MetricType, MetricValue, MetricsContainer, SetType, + protocol, Bucket, BucketValue, Metric, MetricNamespace, MetricResourceIdentifier, MetricValue, }; /// Interval for the flush cycle of the [`AggregatorService`]. @@ -38,552 +36,6 @@ const AVG_VALUE_SIZE: usize = 8; /// and buckets larger will be split up. const BUCKET_SPLIT_FACTOR: usize = 32; -/// A snapshot of values within a [`Bucket`]. -#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)] -pub struct GaugeValue { - /// The maximum value reported in the bucket. - pub max: GaugeType, - /// The minimum value reported in the bucket. - pub min: GaugeType, - /// The sum of all values reported in the bucket. - pub sum: GaugeType, - /// The last value reported in the bucket. - /// - /// This aggregation is not commutative. - pub last: GaugeType, - /// The number of times this bucket was updated with a new value. - pub count: u64, -} - -impl GaugeValue { - /// Creates a gauge snapshot from a single value. - pub fn single(value: GaugeType) -> Self { - Self { - max: value, - min: value, - sum: value, - last: value, - count: 1, - } - } - - /// Inserts a new value into the gauge. - pub fn insert(&mut self, value: GaugeType) { - self.max = self.max.max(value); - self.min = self.min.min(value); - self.sum += value; - self.last = value; - self.count += 1; - } - - /// Merges two gauge snapshots. - pub fn merge(&mut self, other: Self) { - self.max = self.max.max(other.max); - self.min = self.min.min(other.min); - self.sum += other.sum; - self.last = other.last; - self.count += other.count; - } - - /// Returns the average of all values reported in this bucket. - pub fn avg(&self) -> GaugeType { - if self.count > 0 { - self.sum / (self.count as GaugeType) - } else { - 0.0 - } - } -} - -/// Type for counting duplicates in distributions. -type Count = u32; - -/// A distribution of values within a [`Bucket`]. -/// -/// Distributions store a histogram of values. It allows to iterate both the distribution with -/// [`iter`](Self::iter) and individual values with [`iter_values`](Self::iter_values). -/// -/// Based on individual reported values, distributions allow to query the maximum, minimum, or -/// average of the reported values, as well as statistical quantiles. -/// -/// # Example -/// -/// ```rust -/// use relay_metrics::dist; -/// -/// let mut dist = dist![1.0, 1.0, 1.0, 2.0]; -/// dist.insert(5.0); -/// dist.insert_multi(3.0, 7); -/// ``` -/// -/// Logically, this distribution is equivalent to this visualization: -/// -/// ```plain -/// value | count -/// 1.0 | *** -/// 2.0 | * -/// 3.0 | ******* -/// 4.0 | -/// 5.0 | * -/// ``` -/// -/// # Serialization -/// -/// Distributions serialize as sorted lists of floating point values. The list contains one entry -/// for each value in the distribution, including duplicates. -#[derive(Clone, Default, PartialEq)] -pub struct DistributionValue { - values: BTreeMap, Count>, - length: Count, -} - -impl DistributionValue { - /// Makes a new, empty `DistributionValue`. - /// - /// Does not allocate anything on its own. - pub fn new() -> Self { - Self::default() - } - - /// Returns the number of values in the map. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::DistributionValue; - /// - /// let mut dist = DistributionValue::new(); - /// assert_eq!(dist.len(), 0); - /// dist.insert(1.0); - /// dist.insert(1.0); - /// assert_eq!(dist.len(), 2); - /// ``` - pub fn len(&self) -> Count { - self.length - } - - /// Returns `true` if the map contains no elements. - pub fn is_empty(&self) -> bool { - self.length == 0 - } - - /// Adds a value to the distribution. - /// - /// Returns the number this value occurs in the distribution after inserting. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::DistributionValue; - /// - /// let mut dist = DistributionValue::new(); - /// assert_eq!(dist.insert(1.0), 1); - /// assert_eq!(dist.insert(1.0), 2); - /// assert_eq!(dist.insert(2.0), 1); - /// ``` - pub fn insert(&mut self, value: DistributionType) -> Count { - self.insert_multi(value, 1) - } - - /// Adds a value multiple times to the distribution. - /// - /// Returns the number this value occurs in the distribution after inserting. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::DistributionValue; - /// - /// let mut dist = DistributionValue::new(); - /// assert_eq!(dist.insert_multi(1.0, 2), 2); - /// assert_eq!(dist.insert_multi(1.0, 3), 5); - /// ``` - pub fn insert_multi(&mut self, value: DistributionType, count: Count) -> Count { - self.length += count; - if count == 0 { - return 0; - } - - *self - .values - .entry(FloatOrd(value)) - .and_modify(|c| *c += count) - .or_insert(count) - } - - /// Returns `true` if the set contains a value. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::dist; - /// - /// let dist = dist![1.0]; - /// - /// assert_eq!(dist.contains(1.0), true); - /// assert_eq!(dist.contains(2.0), false); - /// ``` - pub fn contains(&self, value: impl std::borrow::Borrow) -> bool { - self.values.contains_key(&FloatOrd(*value.borrow())) - } - - /// Returns how often the given value occurs in the distribution. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::dist; - /// - /// let dist = dist![1.0, 1.0]; - /// - /// assert_eq!(dist.get(1.0), 2); - /// assert_eq!(dist.get(2.0), 0); - /// ``` - pub fn get(&self, value: impl std::borrow::Borrow) -> Count { - let value = &FloatOrd(*value.borrow()); - self.values.get(value).copied().unwrap_or(0) - } - - /// Gets an iterator that visits unique values in the `DistributionValue` in ascending order. - /// - /// The iterator yields pairs of values and their count in the distribution. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::dist; - /// - /// let dist = dist![2.0, 1.0, 3.0, 2.0]; - /// - /// let mut iter = dist.iter(); - /// assert_eq!(iter.next(), Some((1.0, 1))); - /// assert_eq!(iter.next(), Some((2.0, 2))); - /// assert_eq!(iter.next(), Some((3.0, 1))); - /// assert_eq!(iter.next(), None); - /// ``` - pub fn iter(&self) -> DistributionIter<'_> { - DistributionIter { - inner: self.values.iter(), - } - } - - /// Gets an iterator that visits the values in the `DistributionValue` in ascending order. - /// - /// # Examples - /// - /// ``` - /// use relay_metrics::dist; - /// - /// let dist = dist![2.0, 1.0, 3.0, 2.0]; - /// - /// let mut iter = dist.iter_values(); - /// assert_eq!(iter.next(), Some(1.0)); - /// assert_eq!(iter.next(), Some(2.0)); - /// assert_eq!(iter.next(), Some(2.0)); - /// assert_eq!(iter.next(), Some(3.0)); - /// assert_eq!(iter.next(), None); - /// ``` - pub fn iter_values(&self) -> DistributionValuesIter<'_> { - DistributionValuesIter { - inner: self.iter(), - current: 0f64, - remaining: 0, - total: self.length, - } - } -} - -impl<'a> IntoIterator for &'a DistributionValue { - type Item = (DistributionType, Count); - type IntoIter = DistributionIter<'a>; - - fn into_iter(self) -> Self::IntoIter { - self.iter() - } -} - -impl fmt::Debug for DistributionValue { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_map().entries(self.iter()).finish() - } -} - -impl Extend for DistributionValue { - fn extend>(&mut self, iter: T) { - for value in iter.into_iter() { - self.insert(value); - } - } -} - -impl Extend<(DistributionType, Count)> for DistributionValue { - fn extend>(&mut self, iter: T) { - for (value, count) in iter.into_iter() { - self.insert_multi(value, count); - } - } -} - -impl FromIterator for DistributionValue { - fn from_iter>(iter: T) -> Self { - let mut value = Self::default(); - value.extend(iter); - value - } -} - -impl FromIterator<(DistributionType, Count)> for DistributionValue { - fn from_iter>(iter: T) -> Self { - let mut value = Self::default(); - value.extend(iter); - value - } -} - -impl Serialize for DistributionValue { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.collect_seq(self.iter_values()) - } -} - -impl<'de> Deserialize<'de> for DistributionValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct DistributionVisitor; - - impl<'d> serde::de::Visitor<'d> for DistributionVisitor { - type Value = DistributionValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "a list of floating point values") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'d>, - { - let mut distribution = DistributionValue::new(); - - while let Some(value) = seq.next_element()? { - distribution.insert(value); - } - - Ok(distribution) - } - } - - deserializer.deserialize_seq(DistributionVisitor) - } -} - -/// An iterator over distribution entries in a [`DistributionValue`]. -/// -/// This struct is created by the [`iter`](DistributionValue::iter) method on -/// `DistributionValue`. See its documentation for more. -#[derive(Clone)] -pub struct DistributionIter<'a> { - inner: btree_map::Iter<'a, FloatOrd, Count>, -} - -impl Iterator for DistributionIter<'_> { - type Item = (DistributionType, Count); - - fn next(&mut self) -> Option { - let (value, count) = self.inner.next()?; - Some((value.0, *count)) - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -impl ExactSizeIterator for DistributionIter<'_> {} - -impl fmt::Debug for DistributionIter<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_list().entries(self.clone()).finish() - } -} - -/// An iterator over all individual values in a [`DistributionValue`]. -/// -/// This struct is created by the [`iter_values`](DistributionValue::iter_values) method on -/// `DistributionValue`. See its documentation for more. -#[derive(Clone)] -pub struct DistributionValuesIter<'a> { - inner: DistributionIter<'a>, - current: DistributionType, - remaining: Count, - total: Count, -} - -impl Iterator for DistributionValuesIter<'_> { - type Item = DistributionType; - - fn next(&mut self) -> Option { - if self.remaining > 0 { - self.remaining -= 1; - self.total -= 1; - return Some(self.current); - } - - let (value, count) = self.inner.next()?; - - self.current = value; - self.remaining = count - 1; - self.total -= 1; - Some(self.current) - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.total as usize; - (len, Some(len)) - } -} - -impl ExactSizeIterator for DistributionValuesIter<'_> {} - -impl fmt::Debug for DistributionValuesIter<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_list().entries(self.clone()).finish() - } -} - -/// Creates a [`DistributionValue`] containing the given arguments. -/// -/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions. -/// -/// # Example -/// -/// ``` -/// let dist = relay_metrics::dist![1.0, 2.0]; -/// ``` -#[macro_export] -macro_rules! dist { - () => { - $crate::DistributionValue::new() - }; - ($($x:expr),+ $(,)?) => {{ - let mut distribution = $crate::DistributionValue::new(); - $( distribution.insert($x); )* - distribution - }}; -} - -/// The [aggregated value](Bucket::value) of a metric bucket. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(tag = "type", content = "value")] -pub enum BucketValue { - /// Aggregates [`MetricValue::Counter`] values by adding them into a single value. - /// - /// ```text - /// 2, 1, 3, 2 => 8 - /// ``` - /// - /// This variant serializes to a double precision float. - #[serde(rename = "c")] - Counter(CounterType), - /// Aggregates [`MetricValue::Distribution`] values by collecting their values. - /// - /// ```text - /// 2, 1, 3, 2 => [1, 2, 2, 3] - /// ``` - /// - /// This variant serializes to a list of double precision floats, see [`DistributionValue`]. - #[serde(rename = "d")] - Distribution(DistributionValue), - /// Aggregates [`MetricValue::Set`] values by storing their hash values in a set. - /// - /// ```text - /// 2, 1, 3, 2 => {1, 2, 3} - /// ``` - /// - /// This variant serializes to a list of 32-bit integers. - #[serde(rename = "s")] - Set(BTreeSet), - /// Aggregates [`MetricValue::Gauge`] values always retaining the maximum, minimum, and last - /// value, as well as the sum and count of all values. - /// - /// **Note**: The "last" component of this aggregation is not commutative. - /// - /// ```text - /// 1, 2, 3, 2 => { - /// max: 3, - /// min: 1, - /// sum: 8, - /// last: 2 - /// count: 4, - /// } - /// ``` - /// - /// This variant serializes to a structure, see [`GaugeValue`]. - #[serde(rename = "g")] - Gauge(GaugeValue), -} - -impl BucketValue { - /// Returns the type of this value. - pub fn ty(&self) -> MetricType { - match self { - Self::Counter(_) => MetricType::Counter, - Self::Distribution(_) => MetricType::Distribution, - Self::Set(_) => MetricType::Set, - Self::Gauge(_) => MetricType::Gauge, - } - } - - /// Returns the number of raw data points in this value. - pub fn len(&self) -> usize { - match self { - BucketValue::Counter(_) => 1, - BucketValue::Distribution(distribution) => distribution.len() as usize, - BucketValue::Set(set) => set.len(), - BucketValue::Gauge(_) => 5, - } - } - - /// Returns `true` if this bucket contains no values. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Estimates the number of bytes needed to encode the bucket value. - /// - /// Note that this does not necessarily match the exact memory footprint of the value, - /// because data structures have a memory overhead. - pub fn cost(&self) -> usize { - // Beside the size of [`BucketValue`], we also need to account for the cost of values - // allocated dynamically. - let allocated_cost = match self { - Self::Counter(_) => 0, - Self::Set(s) => mem::size_of::() * s.len(), - Self::Gauge(_) => 0, - Self::Distribution(m) => { - m.values.len() * (mem::size_of::() + mem::size_of::()) - } - }; - - mem::size_of::() + allocated_cost - } -} - -impl From for BucketValue { - fn from(value: MetricValue) -> Self { - match value { - MetricValue::Counter(value) => Self::Counter(value), - MetricValue::Distribution(value) => Self::Distribution(dist![value]), - MetricValue::Set(value) => Self::Set(std::iter::once(value).collect()), - MetricValue::Gauge(value) => Self::Gauge(GaugeValue::single(value)), - } - } -} - /// A value that can be merged into a [`BucketValue`]. /// /// Currently either a [`MetricValue`] or another `BucketValue`. @@ -640,178 +92,77 @@ fn tags_cost(tags: &BTreeMap) -> usize { tags.iter().map(|(k, v)| k.capacity() + v.capacity()).sum() } -/// Error returned when parsing or serializing a [`Bucket`]. -#[derive(Debug, Error)] -#[error("failed to parse metric bucket")] -pub struct ParseBucketError(#[source] serde_json::Error); - -/// An aggregation of metric values by the [`Aggregator`]. -/// -/// As opposed to single metric values, bucket aggregations can carry multiple values. See -/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated -/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the -/// same name even if their types differ. -/// -/// See the [crate documentation](crate) for general information on Metrics. -/// -/// # Values -/// -/// The contents of a bucket, especially their representation and serialization, depend on the -/// metric type: -/// -/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point. -/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of -/// reported values. -/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`]. -/// -/// # Submission Protocol +/// Splits this bucket if its estimated serialization size exceeds a threshold. /// -/// Buckets are always represented as JSON. The data type of the `value` field is determined by the -/// metric type. +/// There are three possible return values: +/// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split. +/// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no +/// split, the entire bucket is moved. +/// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket +/// with all other information cloned. /// -/// ```json -#[doc = include_str!("../tests/fixtures/buckets.json")] -/// ``` -/// -/// To parse a submission payload, use [`Bucket::parse_all`]. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Bucket { - /// The start time of the time window. - pub timestamp: UnixTimestamp, - /// The length of the time window in seconds. - pub width: u64, - /// The MRI (metric resource identifier). - /// - /// See [`Metric::name`]. - pub name: String, - /// The type and aggregated values of this bucket. - /// - /// See [`Metric::value`] for a mapping to inbound data. - #[serde(flatten)] - pub value: BucketValue, - /// A list of tags adding dimensions to the metric for filtering and aggregation. - /// - /// See [`Metric::tags`]. Every combination of tags results in a different bucket. - #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] - pub tags: BTreeMap, -} - -impl Bucket { - fn from_parts(key: BucketKey, bucket_interval: u64, value: BucketValue) -> Self { - Self { - timestamp: key.timestamp, - width: bucket_interval, - name: key.metric_name, - value, - tags: key.tags, - } - } - - /// Parses a single metric bucket from the JSON protocol. - pub fn parse(slice: &[u8]) -> Result { - serde_json::from_slice(slice).map_err(ParseBucketError) - } - - /// Parses a set of metric bucket from the JSON protocol. - pub fn parse_all(slice: &[u8]) -> Result, ParseBucketError> { - serde_json::from_slice(slice).map_err(ParseBucketError) - } - - /// Serializes the given buckets to the JSON protocol. - pub fn serialize_all(buckets: &[Self]) -> Result { - serde_json::to_string(&buckets).map_err(ParseBucketError) - } - - /// Splits this bucket if its estimated serialization size exceeds a threshold. - /// - /// There are three possible return values: - /// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split. - /// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no - /// split, the entire bucket is moved. - /// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket - /// with all other information cloned. - /// - /// This is an approximate function. The bucket is not actually serialized, but rather its - /// footprint is estimated through the number of data points contained. See - /// [`estimated_size`](Self::estimated_size) for more information. - fn split_at(mut self, size: usize) -> (Option, Option) { - // If there's enough space for the entire bucket, do not perform a split. - if size >= self.estimated_size() { - return (Some(self), None); +/// This is an approximate function. The bucket is not actually serialized, but rather its +/// footprint is estimated through the number of data points contained. See +/// [`estimated_size`](Self::estimated_size) for more information. +fn split_at(mut bucket: Bucket, size: usize) -> (Option, Option) { + // If there's enough space for the entire bucket, do not perform a split. + if size >= estimate_size(&bucket) { + return (Some(bucket), None); + } + + // If the bucket key can't even fit into the remaining length, move the entire bucket into + // the right-hand side. + let own_size = estimate_base_size(&bucket); + if size < (own_size + AVG_VALUE_SIZE) { + // split_at must not be zero + return (None, Some(bucket)); + } + + // Perform a split with the remaining space after adding the key. We assume an average + // length of 8 bytes per value and compute the number of items fitting into the left side. + let split_at = (size - own_size) / AVG_VALUE_SIZE; + + match bucket.value { + BucketValue::Counter(_) => (None, Some(bucket)), + BucketValue::Distribution(ref mut distribution) => { + let org = std::mem::take(distribution); + let mut new_bucket = bucket.clone(); + + let mut iter = org.iter_values(); + bucket.value = BucketValue::Distribution((&mut iter).take(split_at).collect()); + new_bucket.value = BucketValue::Distribution(iter.collect()); + + (Some(bucket), Some(new_bucket)) } + BucketValue::Set(ref mut set) => { + let org = std::mem::take(set); + let mut new_bucket = bucket.clone(); - // If the bucket key can't even fit into the remaining length, move the entire bucket into - // the right-hand side. - let own_size = self.estimated_base_size(); - if size < (own_size + AVG_VALUE_SIZE) { - // split_at must not be zero - return (None, Some(self)); - } - - // Perform a split with the remaining space after adding the key. We assume an average - // length of 8 bytes per value and compute the number of items fitting into the left side. - let split_at = (size - own_size) / AVG_VALUE_SIZE; - - match self.value { - BucketValue::Counter(_) => (None, Some(self)), - BucketValue::Distribution(ref mut distribution) => { - let org = std::mem::take(distribution); - let mut new_bucket = self.clone(); - - let mut iter = org.iter_values(); - self.value = BucketValue::Distribution((&mut iter).take(split_at).collect()); - new_bucket.value = BucketValue::Distribution(iter.collect()); - - (Some(self), Some(new_bucket)) - } - BucketValue::Set(ref mut set) => { - let org = std::mem::take(set); - let mut new_bucket = self.clone(); - - let mut iter = org.into_iter(); - self.value = BucketValue::Set((&mut iter).take(split_at).collect()); - new_bucket.value = BucketValue::Set(iter.collect()); + let mut iter = org.into_iter(); + bucket.value = BucketValue::Set((&mut iter).take(split_at).collect()); + new_bucket.value = BucketValue::Set(iter.collect()); - (Some(self), Some(new_bucket)) - } - BucketValue::Gauge(_) => (None, Some(self)), + (Some(bucket), Some(new_bucket)) } - } - - /// Estimates the number of bytes needed to serialize the bucket without value. - /// - /// Note that this does not match the exact size of the serialized payload. Instead, the size is - /// approximated through tags and a static overhead. - fn estimated_base_size(&self) -> usize { - 50 + self.name.len() + tags_cost(&self.tags) - } - - /// Estimates the number of bytes needed to serialize the bucket. - /// - /// Note that this does not match the exact size of the serialized payload. Instead, the size is - /// approximated through the number of contained values, assuming an average size of serialized - /// values. - fn estimated_size(&self) -> usize { - self.estimated_base_size() + self.value.len() * AVG_VALUE_SIZE + BucketValue::Gauge(_) => (None, Some(bucket)), } } -impl MetricsContainer for Bucket { - fn name(&self) -> &str { - self.name.as_str() - } - - fn len(&self) -> usize { - self.value.len() - } - - fn tag(&self, name: &str) -> Option<&str> { - self.tags.get(name).map(|s| s.as_str()) - } +/// Estimates the number of bytes needed to serialize the bucket without value. +/// +/// Note that this does not match the exact size of the serialized payload. Instead, the size is +/// approximated through tags and a static overhead. +fn estimate_base_size(bucket: &Bucket) -> usize { + 50 + bucket.name.len() + tags_cost(&bucket.tags) +} - fn remove_tag(&mut self, name: &str) { - self.tags.remove(name); - } +/// Estimates the number of bytes needed to serialize the bucket. +/// +/// Note that this does not match the exact size of the serialized payload. Instead, the size is +/// approximated through the number of contained values, assuming an average size of serialized +/// values. +fn estimate_size(bucket: &Bucket) -> usize { + estimate_base_size(bucket) + bucket.value.len() * AVG_VALUE_SIZE } /// Any error that may occur during aggregation. @@ -1295,7 +646,7 @@ impl> Iterator for CappedBucketIter { let mut remaining_bytes = self.max_flush_bytes; while let Some(bucket) = self.next_bucket.take() { - let bucket_size = bucket.estimated_size(); + let bucket_size = estimate_size(&bucket); if bucket_size <= remaining_bytes { // the bucket fits remaining_bytes -= bucket_size; @@ -1307,7 +658,7 @@ impl> Iterator for CappedBucketIter { break; } else { // the bucket is big enough to split - let (left, right) = bucket.split_at(remaining_bytes); + let (left, right) = split_at(bucket, remaining_bytes); if let Some(left) = left { current_batch.push(left); } @@ -1834,7 +1185,14 @@ impl AggregatorService { *bucket_count += 1; *item_count += value.len(); - let bucket = Bucket::from_parts(key.clone(), bucket_interval, value); + let bucket = Bucket { + timestamp: key.timestamp, + width: bucket_interval, + name: key.metric_name.clone(), + value, + tags: key.tags.clone(), + }; + buckets .entry(key.project_key) .or_default() @@ -2055,11 +1413,13 @@ impl Drop for AggregatorService { #[cfg(test)] mod tests { + use std::collections::BTreeSet; use std::sync::{Arc, RwLock}; use similar_asserts::assert_eq; use super::*; + use crate::{dist, GaugeValue}; #[derive(Default)] struct ReceivedData { @@ -2136,218 +1496,6 @@ mod tests { } } - #[test] - fn test_distribution_insert() { - let mut distribution = DistributionValue::new(); - assert_eq!(distribution.insert(2f64), 1); - assert_eq!(distribution.insert(1f64), 1); - assert_eq!(distribution.insert(2f64), 2); - - assert_eq!(distribution.len(), 3); - - assert!(!distribution.contains(0f64)); - assert!(distribution.contains(1f64)); - assert!(distribution.contains(2f64)); - - assert_eq!(distribution.get(0f64), 0); - assert_eq!(distribution.get(1f64), 1); - assert_eq!(distribution.get(2f64), 2); - } - - #[test] - fn test_distribution_insert_multi() { - let mut distribution = DistributionValue::new(); - assert_eq!(distribution.insert_multi(0f64, 0), 0); - assert_eq!(distribution.insert_multi(2f64, 2), 2); - assert_eq!(distribution.insert_multi(1f64, 1), 1); - assert_eq!(distribution.insert_multi(3f64, 1), 1); - assert_eq!(distribution.insert_multi(3f64, 2), 3); - - assert_eq!(distribution.len(), 6); - - assert!(!distribution.contains(0f64)); - assert!(distribution.contains(1f64)); - assert!(distribution.contains(2f64)); - assert!(distribution.contains(3f64)); - - assert_eq!(distribution.get(0f64), 0); - assert_eq!(distribution.get(1f64), 1); - assert_eq!(distribution.get(2f64), 2); - assert_eq!(distribution.get(3f64), 3); - } - - #[test] - fn test_distribution_iter_values() { - let distribution = dist![2f64, 1f64, 2f64]; - - let mut iter = distribution.iter_values(); - assert_eq!(iter.len(), 3); - assert_eq!(iter.next(), Some(1f64)); - assert_eq!(iter.len(), 2); - assert_eq!(iter.next(), Some(2f64)); - assert_eq!(iter.len(), 1); - assert_eq!(iter.next(), Some(2f64)); - assert_eq!(iter.len(), 0); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_distribution_iter_values_empty() { - let distribution = DistributionValue::new(); - let mut iter = distribution.iter_values(); - assert_eq!(iter.len(), 0); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_distribution_iter() { - let distribution = dist![2f64, 1f64, 2f64]; - - let mut iter = distribution.iter(); - assert_eq!(iter.next(), Some((1f64, 1))); - assert_eq!(iter.next(), Some((2f64, 2))); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_parse_buckets() { - let json = r#"[ - { - "name": "endpoint.response_time", - "unit": "millisecond", - "value": [36, 49, 57, 68], - "type": "d", - "timestamp": 1615889440, - "width": 10, - "tags": { - "route": "user_index" - } - } - ]"#; - - let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); - insta::assert_debug_snapshot!(buckets, @r#" - [ - Bucket { - timestamp: UnixTimestamp(1615889440), - width: 10, - name: "endpoint.response_time", - value: Distribution( - { - 36.0: 1, - 49.0: 1, - 57.0: 1, - 68.0: 1, - }, - ), - tags: { - "route": "user_index", - }, - }, - ] - "#); - } - - #[test] - fn test_parse_bucket_defaults() { - let json = r#"[ - { - "name": "endpoint.hits", - "value": 4, - "type": "c", - "timestamp": 1615889440, - "width": 10 - } - ]"#; - - let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); - insta::assert_debug_snapshot!(buckets, @r#" - [ - Bucket { - timestamp: UnixTimestamp(1615889440), - width: 10, - name: "endpoint.hits", - value: Counter( - 4.0, - ), - tags: {}, - }, - ] - "#); - } - - #[test] - fn test_buckets_roundtrip() { - let json = r#"[ - { - "timestamp": 1615889440, - "width": 10, - "name": "endpoint.response_time", - "type": "d", - "value": [ - 36.0, - 49.0, - 57.0, - 68.0 - ], - "tags": { - "route": "user_index" - } - }, - { - "timestamp": 1615889440, - "width": 10, - "name": "endpoint.hits", - "type": "c", - "value": 4.0, - "tags": { - "route": "user_index" - } - }, - { - "timestamp": 1615889440, - "width": 10, - "name": "endpoint.parallel_requests", - "type": "g", - "value": { - "max": 42.0, - "min": 17.0, - "sum": 2210.0, - "last": 25.0, - "count": 85 - } - }, - { - "timestamp": 1615889440, - "width": 10, - "name": "endpoint.users", - "type": "s", - "value": [ - 3182887624, - 4267882815 - ], - "tags": { - "route": "user_index" - } - } -]"#; - - let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); - let serialized = serde_json::to_string_pretty(&buckets).unwrap(); - assert_eq!(json, serialized); - } - - #[test] - fn test_bucket_docs_roundtrip() { - let json = include_str!("../tests/fixtures/buckets.json") - .trim_end() - .replace("\r\n", "\n"); - let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); - - let serialized = serde_json::to_string_pretty(&buckets).unwrap(); - assert_eq!(json, serialized); - } - #[test] fn test_bucket_value_merge_counter() { let mut value = BucketValue::Counter(42.); @@ -2442,7 +1590,7 @@ mod tests { let counter = BucketValue::Counter(123.0); assert_eq!(counter.cost(), expected_bucket_value_size); - let set = BucketValue::Set(BTreeSet::::from([1, 2, 3, 4, 5])); + let set = BucketValue::Set([1, 2, 3, 4, 5].into()); assert_eq!( set.cost(), expected_bucket_value_size + 5 * expected_set_entry_size diff --git a/relay-metrics/src/bucket.rs b/relay-metrics/src/bucket.rs new file mode 100644 index 0000000000..32fcf482ba --- /dev/null +++ b/relay-metrics/src/bucket.rs @@ -0,0 +1,865 @@ +use std::collections::{btree_map, BTreeMap, BTreeSet}; +use std::{fmt, mem}; + +use float_ord::FloatOrd; +use relay_common::time::UnixTimestamp; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::{ + CounterType, DistributionType, GaugeType, MetricType, MetricValue, MetricsContainer, SetType, +}; + +/// A snapshot of values within a [`Bucket`]. +#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)] +pub struct GaugeValue { + /// The maximum value reported in the bucket. + pub max: GaugeType, + /// The minimum value reported in the bucket. + pub min: GaugeType, + /// The sum of all values reported in the bucket. + pub sum: GaugeType, + /// The last value reported in the bucket. + /// + /// This aggregation is not commutative. + pub last: GaugeType, + /// The number of times this bucket was updated with a new value. + pub count: u64, +} + +impl GaugeValue { + /// Creates a gauge snapshot from a single value. + pub fn single(value: GaugeType) -> Self { + Self { + max: value, + min: value, + sum: value, + last: value, + count: 1, + } + } + + /// Inserts a new value into the gauge. + pub fn insert(&mut self, value: GaugeType) { + self.max = self.max.max(value); + self.min = self.min.min(value); + self.sum += value; + self.last = value; + self.count += 1; + } + + /// Merges two gauge snapshots. + pub fn merge(&mut self, other: Self) { + self.max = self.max.max(other.max); + self.min = self.min.min(other.min); + self.sum += other.sum; + self.last = other.last; + self.count += other.count; + } + + /// Returns the average of all values reported in this bucket. + pub fn avg(&self) -> GaugeType { + if self.count > 0 { + self.sum / (self.count as GaugeType) + } else { + 0.0 + } + } +} + +/// Type for counting duplicates in distributions. +type Count = u32; + +/// A distribution of values within a [`Bucket`]. +/// +/// Distributions store a histogram of values. It allows to iterate both the distribution with +/// [`iter`](Self::iter) and individual values with [`iter_values`](Self::iter_values). +/// +/// Based on individual reported values, distributions allow to query the maximum, minimum, or +/// average of the reported values, as well as statistical quantiles. +/// +/// # Example +/// +/// ```rust +/// use relay_metrics::dist; +/// +/// let mut dist = dist![1.0, 1.0, 1.0, 2.0]; +/// dist.insert(5.0); +/// dist.insert_multi(3.0, 7); +/// ``` +/// +/// Logically, this distribution is equivalent to this visualization: +/// +/// ```plain +/// value | count +/// 1.0 | *** +/// 2.0 | * +/// 3.0 | ******* +/// 4.0 | +/// 5.0 | * +/// ``` +/// +/// # Serialization +/// +/// Distributions serialize as sorted lists of floating point values. The list contains one entry +/// for each value in the distribution, including duplicates. +#[derive(Clone, Default, PartialEq)] +pub struct DistributionValue { + values: BTreeMap, Count>, + length: Count, +} + +impl DistributionValue { + /// Makes a new, empty `DistributionValue`. + /// + /// Does not allocate anything on its own. + pub fn new() -> Self { + Self::default() + } + + /// Returns the number of values in the map. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::DistributionValue; + /// + /// let mut dist = DistributionValue::new(); + /// assert_eq!(dist.len(), 0); + /// dist.insert(1.0); + /// dist.insert(1.0); + /// assert_eq!(dist.len(), 2); + /// ``` + pub fn len(&self) -> Count { + self.length + } + + /// Returns `true` if the map contains no elements. + pub fn is_empty(&self) -> bool { + self.length == 0 + } + + /// Adds a value to the distribution. + /// + /// Returns the number this value occurs in the distribution after inserting. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::DistributionValue; + /// + /// let mut dist = DistributionValue::new(); + /// assert_eq!(dist.insert(1.0), 1); + /// assert_eq!(dist.insert(1.0), 2); + /// assert_eq!(dist.insert(2.0), 1); + /// ``` + pub fn insert(&mut self, value: DistributionType) -> Count { + self.insert_multi(value, 1) + } + + /// Adds a value multiple times to the distribution. + /// + /// Returns the number this value occurs in the distribution after inserting. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::DistributionValue; + /// + /// let mut dist = DistributionValue::new(); + /// assert_eq!(dist.insert_multi(1.0, 2), 2); + /// assert_eq!(dist.insert_multi(1.0, 3), 5); + /// ``` + pub fn insert_multi(&mut self, value: DistributionType, count: Count) -> Count { + self.length += count; + if count == 0 { + return 0; + } + + *self + .values + .entry(FloatOrd(value)) + .and_modify(|c| *c += count) + .or_insert(count) + } + + /// Returns `true` if the set contains a value. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::dist; + /// + /// let dist = dist![1.0]; + /// + /// assert_eq!(dist.contains(1.0), true); + /// assert_eq!(dist.contains(2.0), false); + /// ``` + pub fn contains(&self, value: impl std::borrow::Borrow) -> bool { + self.values.contains_key(&FloatOrd(*value.borrow())) + } + + /// Returns how often the given value occurs in the distribution. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::dist; + /// + /// let dist = dist![1.0, 1.0]; + /// + /// assert_eq!(dist.get(1.0), 2); + /// assert_eq!(dist.get(2.0), 0); + /// ``` + pub fn get(&self, value: impl std::borrow::Borrow) -> Count { + let value = &FloatOrd(*value.borrow()); + self.values.get(value).copied().unwrap_or(0) + } + + /// Gets an iterator that visits unique values in the `DistributionValue` in ascending order. + /// + /// The iterator yields pairs of values and their count in the distribution. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::dist; + /// + /// let dist = dist![2.0, 1.0, 3.0, 2.0]; + /// + /// let mut iter = dist.iter(); + /// assert_eq!(iter.next(), Some((1.0, 1))); + /// assert_eq!(iter.next(), Some((2.0, 2))); + /// assert_eq!(iter.next(), Some((3.0, 1))); + /// assert_eq!(iter.next(), None); + /// ``` + pub fn iter(&self) -> DistributionIter<'_> { + DistributionIter { + inner: self.values.iter(), + } + } + + /// Gets an iterator that visits the values in the `DistributionValue` in ascending order. + /// + /// # Examples + /// + /// ``` + /// use relay_metrics::dist; + /// + /// let dist = dist![2.0, 1.0, 3.0, 2.0]; + /// + /// let mut iter = dist.iter_values(); + /// assert_eq!(iter.next(), Some(1.0)); + /// assert_eq!(iter.next(), Some(2.0)); + /// assert_eq!(iter.next(), Some(2.0)); + /// assert_eq!(iter.next(), Some(3.0)); + /// assert_eq!(iter.next(), None); + /// ``` + pub fn iter_values(&self) -> DistributionValuesIter<'_> { + DistributionValuesIter { + inner: self.iter(), + current: 0f64, + remaining: 0, + total: self.length, + } + } +} + +impl<'a> IntoIterator for &'a DistributionValue { + type Item = (DistributionType, Count); + type IntoIter = DistributionIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl fmt::Debug for DistributionValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_map().entries(self.iter()).finish() + } +} + +impl Extend for DistributionValue { + fn extend>(&mut self, iter: T) { + for value in iter.into_iter() { + self.insert(value); + } + } +} + +impl Extend<(DistributionType, Count)> for DistributionValue { + fn extend>(&mut self, iter: T) { + for (value, count) in iter.into_iter() { + self.insert_multi(value, count); + } + } +} + +impl FromIterator for DistributionValue { + fn from_iter>(iter: T) -> Self { + let mut value = Self::default(); + value.extend(iter); + value + } +} + +impl FromIterator<(DistributionType, Count)> for DistributionValue { + fn from_iter>(iter: T) -> Self { + let mut value = Self::default(); + value.extend(iter); + value + } +} + +impl Serialize for DistributionValue { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.collect_seq(self.iter_values()) + } +} + +impl<'de> Deserialize<'de> for DistributionValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct DistributionVisitor; + + impl<'d> serde::de::Visitor<'d> for DistributionVisitor { + type Value = DistributionValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a list of floating point values") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'d>, + { + let mut distribution = DistributionValue::new(); + + while let Some(value) = seq.next_element()? { + distribution.insert(value); + } + + Ok(distribution) + } + } + + deserializer.deserialize_seq(DistributionVisitor) + } +} + +/// An iterator over distribution entries in a [`DistributionValue`]. +/// +/// This struct is created by the [`iter`](DistributionValue::iter) method on +/// `DistributionValue`. See its documentation for more. +#[derive(Clone)] +pub struct DistributionIter<'a> { + inner: btree_map::Iter<'a, FloatOrd, Count>, +} + +impl Iterator for DistributionIter<'_> { + type Item = (DistributionType, Count); + + fn next(&mut self) -> Option { + let (value, count) = self.inner.next()?; + Some((value.0, *count)) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl ExactSizeIterator for DistributionIter<'_> {} + +impl fmt::Debug for DistributionIter<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.clone()).finish() + } +} + +/// An iterator over all individual values in a [`DistributionValue`]. +/// +/// This struct is created by the [`iter_values`](DistributionValue::iter_values) method on +/// `DistributionValue`. See its documentation for more. +#[derive(Clone)] +pub struct DistributionValuesIter<'a> { + inner: DistributionIter<'a>, + current: DistributionType, + remaining: Count, + total: Count, +} + +impl Iterator for DistributionValuesIter<'_> { + type Item = DistributionType; + + fn next(&mut self) -> Option { + if self.remaining > 0 { + self.remaining -= 1; + self.total -= 1; + return Some(self.current); + } + + let (value, count) = self.inner.next()?; + + self.current = value; + self.remaining = count - 1; + self.total -= 1; + Some(self.current) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.total as usize; + (len, Some(len)) + } +} + +impl ExactSizeIterator for DistributionValuesIter<'_> {} + +impl fmt::Debug for DistributionValuesIter<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.clone()).finish() + } +} + +/// Creates a [`DistributionValue`] containing the given arguments. +/// +/// `dist!` allows `DistributionValue` to be defined with the same syntax as array expressions. +/// +/// # Example +/// +/// ``` +/// let dist = relay_metrics::dist![1.0, 2.0]; +/// ``` +#[macro_export] +macro_rules! dist { + () => { + $crate::DistributionValue::new() + }; + ($($x:expr),+ $(,)?) => {{ + let mut distribution = $crate::DistributionValue::new(); + $( distribution.insert($x); )* + distribution + }}; +} + +/// The [aggregated value](Bucket::value) of a metric bucket. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", content = "value")] +pub enum BucketValue { + /// Aggregates [`MetricValue::Counter`] values by adding them into a single value. + /// + /// ```text + /// 2, 1, 3, 2 => 8 + /// ``` + /// + /// This variant serializes to a double precision float. + #[serde(rename = "c")] + Counter(CounterType), + /// Aggregates [`MetricValue::Distribution`] values by collecting their values. + /// + /// ```text + /// 2, 1, 3, 2 => [1, 2, 2, 3] + /// ``` + /// + /// This variant serializes to a list of double precision floats, see [`DistributionValue`]. + #[serde(rename = "d")] + Distribution(DistributionValue), + /// Aggregates [`MetricValue::Set`] values by storing their hash values in a set. + /// + /// ```text + /// 2, 1, 3, 2 => {1, 2, 3} + /// ``` + /// + /// This variant serializes to a list of 32-bit integers. + #[serde(rename = "s")] + Set(BTreeSet), + /// Aggregates [`MetricValue::Gauge`] values always retaining the maximum, minimum, and last + /// value, as well as the sum and count of all values. + /// + /// **Note**: The "last" component of this aggregation is not commutative. + /// + /// ```text + /// 1, 2, 3, 2 => { + /// max: 3, + /// min: 1, + /// sum: 8, + /// last: 2 + /// count: 4, + /// } + /// ``` + /// + /// This variant serializes to a structure, see [`GaugeValue`]. + #[serde(rename = "g")] + Gauge(GaugeValue), +} + +impl BucketValue { + /// Returns the type of this value. + pub fn ty(&self) -> MetricType { + match self { + Self::Counter(_) => MetricType::Counter, + Self::Distribution(_) => MetricType::Distribution, + Self::Set(_) => MetricType::Set, + Self::Gauge(_) => MetricType::Gauge, + } + } + + /// Returns the number of raw data points in this value. + pub fn len(&self) -> usize { + match self { + BucketValue::Counter(_) => 1, + BucketValue::Distribution(distribution) => distribution.len() as usize, + BucketValue::Set(set) => set.len(), + BucketValue::Gauge(_) => 5, + } + } + + /// Returns `true` if this bucket contains no values. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Estimates the number of bytes needed to encode the bucket value. + /// + /// Note that this does not necessarily match the exact memory footprint of the value, + /// because data structures have a memory overhead. + pub fn cost(&self) -> usize { + // Beside the size of [`BucketValue`], we also need to account for the cost of values + // allocated dynamically. + let allocated_cost = match self { + Self::Counter(_) => 0, + Self::Set(s) => mem::size_of::() * s.len(), + Self::Gauge(_) => 0, + Self::Distribution(m) => { + m.values.len() * (mem::size_of::() + mem::size_of::()) + } + }; + + mem::size_of::() + allocated_cost + } +} + +impl From for BucketValue { + fn from(value: MetricValue) -> Self { + match value { + MetricValue::Counter(value) => Self::Counter(value), + MetricValue::Distribution(value) => Self::Distribution(dist![value]), + MetricValue::Set(value) => Self::Set(std::iter::once(value).collect()), + MetricValue::Gauge(value) => Self::Gauge(GaugeValue::single(value)), + } + } +} + +/// Error returned when parsing or serializing a [`Bucket`]. +#[derive(Debug, Error)] +#[error("failed to parse metric bucket")] +pub struct ParseBucketError(#[source] serde_json::Error); + +/// An aggregation of metric values by the [`Aggregator`]. +/// +/// As opposed to single metric values, bucket aggregations can carry multiple values. See +/// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated +/// by metric name, type, time window, and all tags. Particularly, this allows metrics to have the +/// same name even if their types differ. +/// +/// See the [crate documentation](crate) for general information on Metrics. +/// +/// # Values +/// +/// The contents of a bucket, especially their representation and serialization, depend on the +/// metric type: +/// +/// - [Counters](BucketValue::Counter) store a single value, serialized as floating point. +/// - [Distributions](MetricType::Distribution) and [sets](MetricType::Set) store the full set of +/// reported values. +/// - [Gauges](BucketValue::Gauge) store a snapshot of reported values, see [`GaugeValue`]. +/// +/// # Submission Protocol +/// +/// Buckets are always represented as JSON. The data type of the `value` field is determined by the +/// metric type. +/// +/// ```json +#[doc = include_str!("../tests/fixtures/buckets.json")] +/// ``` +/// +/// To parse a submission payload, use [`Bucket::parse_all`]. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Bucket { + /// The start time of the time window. + pub timestamp: UnixTimestamp, + /// The length of the time window in seconds. + pub width: u64, + /// The MRI (metric resource identifier). + /// + /// See [`Metric::name`]. + pub name: String, + /// The type and aggregated values of this bucket. + /// + /// See [`Metric::value`] for a mapping to inbound data. + #[serde(flatten)] + pub value: BucketValue, + /// A list of tags adding dimensions to the metric for filtering and aggregation. + /// + /// See [`Metric::tags`]. Every combination of tags results in a different bucket. + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub tags: BTreeMap, +} + +impl Bucket { + /// Parses a single metric bucket from the JSON protocol. + pub fn parse(slice: &[u8]) -> Result { + serde_json::from_slice(slice).map_err(ParseBucketError) + } + + /// Parses a set of metric bucket from the JSON protocol. + pub fn parse_all(slice: &[u8]) -> Result, ParseBucketError> { + serde_json::from_slice(slice).map_err(ParseBucketError) + } + + /// Serializes the given buckets to the JSON protocol. + pub fn serialize_all(buckets: &[Self]) -> Result { + serde_json::to_string(&buckets).map_err(ParseBucketError) + } +} + +impl MetricsContainer for Bucket { + fn name(&self) -> &str { + self.name.as_str() + } + + fn len(&self) -> usize { + self.value.len() + } + + fn tag(&self, name: &str) -> Option<&str> { + self.tags.get(name).map(|s| s.as_str()) + } + + fn remove_tag(&mut self, name: &str) { + self.tags.remove(name); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_distribution_insert() { + let mut distribution = DistributionValue::new(); + assert_eq!(distribution.insert(2f64), 1); + assert_eq!(distribution.insert(1f64), 1); + assert_eq!(distribution.insert(2f64), 2); + + assert_eq!(distribution.len(), 3); + + assert!(!distribution.contains(0f64)); + assert!(distribution.contains(1f64)); + assert!(distribution.contains(2f64)); + + assert_eq!(distribution.get(0f64), 0); + assert_eq!(distribution.get(1f64), 1); + assert_eq!(distribution.get(2f64), 2); + } + + #[test] + fn test_distribution_insert_multi() { + let mut distribution = DistributionValue::new(); + assert_eq!(distribution.insert_multi(0f64, 0), 0); + assert_eq!(distribution.insert_multi(2f64, 2), 2); + assert_eq!(distribution.insert_multi(1f64, 1), 1); + assert_eq!(distribution.insert_multi(3f64, 1), 1); + assert_eq!(distribution.insert_multi(3f64, 2), 3); + + assert_eq!(distribution.len(), 6); + + assert!(!distribution.contains(0f64)); + assert!(distribution.contains(1f64)); + assert!(distribution.contains(2f64)); + assert!(distribution.contains(3f64)); + + assert_eq!(distribution.get(0f64), 0); + assert_eq!(distribution.get(1f64), 1); + assert_eq!(distribution.get(2f64), 2); + assert_eq!(distribution.get(3f64), 3); + } + + #[test] + fn test_distribution_iter_values() { + let distribution = dist![2f64, 1f64, 2f64]; + + let mut iter = distribution.iter_values(); + assert_eq!(iter.len(), 3); + assert_eq!(iter.next(), Some(1f64)); + assert_eq!(iter.len(), 2); + assert_eq!(iter.next(), Some(2f64)); + assert_eq!(iter.len(), 1); + assert_eq!(iter.next(), Some(2f64)); + assert_eq!(iter.len(), 0); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_distribution_iter_values_empty() { + let distribution = DistributionValue::new(); + let mut iter = distribution.iter_values(); + assert_eq!(iter.len(), 0); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_distribution_iter() { + let distribution = dist![2f64, 1f64, 2f64]; + + let mut iter = distribution.iter(); + assert_eq!(iter.next(), Some((1f64, 1))); + assert_eq!(iter.next(), Some((2f64, 2))); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_parse_buckets() { + let json = r#"[ + { + "name": "endpoint.response_time", + "unit": "millisecond", + "value": [36, 49, 57, 68], + "type": "d", + "timestamp": 1615889440, + "width": 10, + "tags": { + "route": "user_index" + } + } + ]"#; + + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + insta::assert_debug_snapshot!(buckets, @r#" + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 10, + name: "endpoint.response_time", + value: Distribution( + { + 36.0: 1, + 49.0: 1, + 57.0: 1, + 68.0: 1, + }, + ), + tags: { + "route": "user_index", + }, + }, + ] + "#); + } + + #[test] + fn test_parse_bucket_defaults() { + let json = r#"[ + { + "name": "endpoint.hits", + "value": 4, + "type": "c", + "timestamp": 1615889440, + "width": 10 + } + ]"#; + + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + insta::assert_debug_snapshot!(buckets, @r#" + [ + Bucket { + timestamp: UnixTimestamp(1615889440), + width: 10, + name: "endpoint.hits", + value: Counter( + 4.0, + ), + tags: {}, + }, + ] + "#); + } + + #[test] + fn test_buckets_roundtrip() { + let json = r#"[ + { + "timestamp": 1615889440, + "width": 10, + "name": "endpoint.response_time", + "type": "d", + "value": [ + 36.0, + 49.0, + 57.0, + 68.0 + ], + "tags": { + "route": "user_index" + } + }, + { + "timestamp": 1615889440, + "width": 10, + "name": "endpoint.hits", + "type": "c", + "value": 4.0, + "tags": { + "route": "user_index" + } + }, + { + "timestamp": 1615889440, + "width": 10, + "name": "endpoint.parallel_requests", + "type": "g", + "value": { + "max": 42.0, + "min": 17.0, + "sum": 2210.0, + "last": 25.0, + "count": 85 + } + }, + { + "timestamp": 1615889440, + "width": 10, + "name": "endpoint.users", + "type": "s", + "value": [ + 3182887624, + 4267882815 + ], + "tags": { + "route": "user_index" + } + } +]"#; + + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + let serialized = serde_json::to_string_pretty(&buckets).unwrap(); + assert_eq!(json, serialized); + } + + #[test] + fn test_bucket_docs_roundtrip() { + let json = include_str!("../tests/fixtures/buckets.json") + .trim_end() + .replace("\r\n", "\n"); + let buckets = Bucket::parse_all(json.as_bytes()).unwrap(); + + let serialized = serde_json::to_string_pretty(&buckets).unwrap(); + assert_eq!(json, serialized); + } +} diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index 867e6166d7..9a745b8a58 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -67,10 +67,12 @@ )] mod aggregation; +mod bucket; mod protocol; mod router; mod statsd; pub use aggregation::*; +pub use bucket::*; pub use protocol::*; pub use router::*; From 7aca151523ea8ceab2bcb85bc480a93b2a5a03a4 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 4 Sep 2023 17:34:50 +0200 Subject: [PATCH 2/2] fix(metrics): Update docs references --- relay-metrics/src/aggregation.rs | 4 ++-- relay-metrics/src/bucket.rs | 9 +++++---- relay-metrics/src/protocol.rs | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 7a300b791c..9820005541 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -103,7 +103,7 @@ fn tags_cost(tags: &BTreeMap) -> usize { /// /// This is an approximate function. The bucket is not actually serialized, but rather its /// footprint is estimated through the number of data points contained. See -/// [`estimated_size`](Self::estimated_size) for more information. +/// `estimate_size` for more information. fn split_at(mut bucket: Bucket, size: usize) -> (Option, Option) { // If there's enough space for the entire bucket, do not perform a split. if size >= estimate_size(&bucket) { @@ -825,7 +825,7 @@ impl FromMessage for Aggregator { /// - `Counter`: Sum of values. /// - `Distribution`: A list of values. /// - `Set`: A unique set of hashed values. -/// - `Gauge`: A summary of the reported values, see [`GaugeValue`]. +/// - `Gauge`: A summary of the reported values, see [`GaugeValue`](crate::GaugeValue). /// /// # Conflicts /// diff --git a/relay-metrics/src/bucket.rs b/relay-metrics/src/bucket.rs index 32fcf482ba..369f7ae00d 100644 --- a/relay-metrics/src/bucket.rs +++ b/relay-metrics/src/bucket.rs @@ -561,7 +561,7 @@ impl From for BucketValue { #[error("failed to parse metric bucket")] pub struct ParseBucketError(#[source] serde_json::Error); -/// An aggregation of metric values by the [`Aggregator`]. +/// An aggregation of metric values. /// /// As opposed to single metric values, bucket aggregations can carry multiple values. See /// [`MetricType`] for a description on how values are aggregated in buckets. Values are aggregated @@ -598,16 +598,17 @@ pub struct Bucket { pub width: u64, /// The MRI (metric resource identifier). /// - /// See [`Metric::name`]. + /// See [`Metric::name`](crate::Metric::name). pub name: String, /// The type and aggregated values of this bucket. /// - /// See [`Metric::value`] for a mapping to inbound data. + /// See [`Metric::value`](crate::Metric::value) for a mapping to inbound data. #[serde(flatten)] pub value: BucketValue, /// A list of tags adding dimensions to the metric for filtering and aggregation. /// - /// See [`Metric::tags`]. Every combination of tags results in a different bucket. + /// See [`Metric::tags`](crate::Metric::tags). Every combination of tags results in a different + /// bucket. #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] pub tags: BTreeMap, } diff --git a/relay-metrics/src/protocol.rs b/relay-metrics/src/protocol.rs index 9834eaf481..5dbf826c60 100644 --- a/relay-metrics/src/protocol.rs +++ b/relay-metrics/src/protocol.rs @@ -576,7 +576,8 @@ pub trait MetricsContainer { fn name(&self) -> &str; /// Returns the number of raw data points in this container. - /// See [`crate::aggregation::BucketValue::len()`]. + /// + /// See [`BucketValue::len`](crate::BucketValue::len). fn len(&self) -> usize; /// Returns `true` if this container contains no values.