Skip to content

Commit

Permalink
fix(metrics): Add units to metric buckets (#969)
Browse files Browse the repository at this point in the history
Treats units as part of the metric signature, in addition to the type and name.
In future, we will convert between compatible units and allow merging values
with varying precision. For now, all unit precisions are handled in separate
buckets.
  • Loading branch information
jan-auer authored Mar 31, 2021
1 parent 620c8f1 commit d42e85d
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
**Internal**:

- Emit the `quantity` field for outcomes of events. This field describes the total size in bytes for attachments or the event count for all other categories. A separate outcome is emitted for attachments in a rejected envelope, if any, in addition to the event outcome. ([#942](https://github.com/getsentry/relay/pull/942))
- Add experimental metrics ingestion with bucketing and pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958), [#966](https://github.com/getsentry/relay/pull/966))
- Add experimental metrics ingestion with bucketing and pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958), [#966](https://github.com/getsentry/relay/pull/966), [#969](https://github.com/getsentry/relay/pull/969))
- Change HTTP response for upstream timeouts from 502 to 504. ([#859](https://github.com/getsentry/relay/pull/859))
- Add rule id to outcomes coming from transaction sampling. ([#953](https://github.com/getsentry/relay/pull/953))
- Add support for breakdowns ingestion. ([#934](https://github.com/getsentry/relay/pull/934))
Expand Down
64 changes: 57 additions & 7 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};

use relay_common::{MonotonicResult, UnixTimestamp};

use crate::{Metric, MetricType, MetricValue};
use crate::{Metric, MetricType, MetricUnit, MetricValue};

/// The [aggregated value](Bucket::value) of a metric bucket.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -185,6 +185,11 @@ pub struct Bucket {
///
/// See [`Metric::name`].
pub name: String,
/// The unit of the metric value.
///
/// See [`Metric::unit`].
#[serde(default, skip_serializing_if = "MetricUnit::is_none")]
pub unit: MetricUnit,
/// The aggregated values in this bucket.
///
/// [Counters](BucketValue::Counter) and [gauges](BucketValue::Gauge) store a single value while
Expand All @@ -204,6 +209,7 @@ impl Bucket {
Self {
timestamp: key.timestamp,
name: key.metric_name,
unit: key.metric_unit,
value,
tags: key.tags,
}
Expand Down Expand Up @@ -242,6 +248,7 @@ struct BucketKey {
timestamp: UnixTimestamp,
metric_name: String,
metric_type: MetricType,
metric_unit: MetricUnit,
tags: BTreeMap<String, String>,
}
/// Parameters used by the [`Aggregator`].
Expand Down Expand Up @@ -392,8 +399,8 @@ impl Message for FlushBuckets {
///
/// # Aggregation
///
/// Each metric is dispatched into the a [`Bucket`] depending on its name, type, tags and timestamp.
/// The bucket timestamp is rounded to the precision declared by the
/// Each metric is dispatched into the a [`Bucket`] depending on its name, type, unit, tags and
/// timestamp. The bucket timestamp is rounded to the precision declared by the
/// [`bucket_interval`](AggregatorConfig::bucket_interval) configuration.
///
/// Each bucket stores the accumulated value of submitted metrics:
Expand All @@ -403,6 +410,12 @@ impl Message for FlushBuckets {
/// - `Set`: A unique set of hashed values.
/// - `Gauge`: The latest submitted value.
///
/// # Conflicts
///
/// Metrics are uniquely identified by the combination of their name, type and unit. It is allowed
/// to send metrics of different types and units under the same name. For example, sending a metric
/// once as set and once as distribution will result in two actual metrics being recorded.
///
/// # Flushing
///
/// Buckets are flushed to a receiver after their time window and a grace period have passed.
Expand Down Expand Up @@ -494,6 +507,7 @@ impl Aggregator {
timestamp: self.get_bucket_timestamp(metric.timestamp),
metric_name: metric.name,
metric_type: metric.value.ty(),
metric_unit: metric.unit,
tags: metric.tags,
};
self.merge_in(key, metric.value)
Expand All @@ -507,6 +521,7 @@ impl Aggregator {
timestamp: bucket.timestamp,
metric_name: bucket.name,
metric_type: bucket.value.ty(),
metric_unit: bucket.unit,
tags: bucket.tags,
};
self.merge_in(key, bucket.value)
Expand Down Expand Up @@ -653,7 +668,8 @@ mod tests {
use std::sync::{Arc, RwLock};

use super::*;
use crate::MetricUnit;

use crate::{DurationPrecision, MetricUnit};

struct BucketCountInquiry;

Expand Down Expand Up @@ -735,13 +751,15 @@ mod tests {
}
]"#;

// TODO: This should parse the unit.
let buckets = Bucket::parse_all(json.as_bytes()).unwrap();
insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
timestamp: UnixTimestamp(1615889440),
name: "endpoint.response_time",
unit: Duration(
MilliSecond,
),
value: Distribution(
[
36.0,
Expand Down Expand Up @@ -775,6 +793,7 @@ mod tests {
Bucket {
timestamp: UnixTimestamp(1615889440),
name: "endpoint.hits",
unit: None,
value: Counter(
4.0,
),
Expand Down Expand Up @@ -904,6 +923,7 @@ mod tests {
timestamp: UnixTimestamp(4710),
metric_name: "foo",
metric_type: Counter,
metric_unit: None,
tags: {},
}: Counter(
85.0,
Expand Down Expand Up @@ -942,6 +962,7 @@ mod tests {
timestamp: UnixTimestamp(4710),
metric_name: "foo",
metric_type: Counter,
metric_unit: None,
tags: {},
},
Counter(
Expand All @@ -953,6 +974,7 @@ mod tests {
timestamp: UnixTimestamp(4720),
metric_name: "foo",
metric_type: Counter,
metric_unit: None,
tags: {},
},
Counter(
Expand All @@ -964,12 +986,14 @@ mod tests {
}

#[test]
fn test_aggregator_mixup_types() {
fn test_aggregator_mixed_types() {
relay_test::setup();

let config = AggregatorConfig {
bucket_interval: 10,
..AggregatorConfig::default()
};

let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(config, receiver);

Expand All @@ -980,7 +1004,33 @@ mod tests {

// It's OK to have same name for different types:
aggregator.insert(metric1).unwrap();
assert!(matches!(aggregator.insert(metric2), Ok(_)));
aggregator.insert(metric2).unwrap();
assert_eq!(aggregator.buckets.len(), 2);
}

#[test]
fn test_aggregator_mixed_units() {
relay_test::setup();

let config = AggregatorConfig {
bucket_interval: 10,
..AggregatorConfig::default()
};

let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(config, receiver);

let metric1 = some_metric();

let mut metric2 = metric1.clone();
metric2.unit = MetricUnit::Duration(DurationPrecision::Second);

// It's OK to have same metric with different units:
aggregator.insert(metric1).unwrap();
aggregator.insert(metric2).unwrap();

// TODO: This should convert if units are convertible
assert_eq!(aggregator.buckets.len(), 2);
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
//! endpoint.hits:1|c|'1615889449|#route:user_index
//! ```
//!
//! The metric type is part of its signature just like the unit. Therefore, it is allowed to reuse a
//! metric name for multiple metric types, which will result in multiple metrics being recorded.
//!
//! # Aggregation
//!
//! Relay accumulates all metrics in [time buckets](Bucket) before sending them onwards. Aggregation
Expand Down
4 changes: 2 additions & 2 deletions relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use relay_common::UnixTimestamp;
/// Time duration units used in [`MetricUnit::Duration`].
///
/// Defaults to `ms`.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum DurationPrecision {
/// Nanosecond (`"ns"`).
NanoSecond,
Expand Down Expand Up @@ -43,7 +43,7 @@ impl fmt::Display for DurationPrecision {
/// measurements.
///
/// Units and their precisions are uniquely represented by a string identifier.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum MetricUnit {
/// A time duration, defaulting to milliseconds (`"ms"`).
Duration(DurationPrecision),
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl StoreForwarder {
org_id,
project_id,
name: bucket.name,
unit: MetricUnit::default(),
unit: bucket.unit,
value: bucket.value,
timestamp: bucket.timestamp,
tags: bucket.tags,
Expand Down
15 changes: 10 additions & 5 deletions tests/integration/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con
mini_sentry.add_full_project_config(project_id)

timestamp = int(datetime.now(tz=timezone.utc).timestamp())
metrics_payload = f"foo:42|c|'{timestamp}\nbar:17|c|'{timestamp}"
metrics_payload = f"foo:42|c|'{timestamp}\nbar@s:17|c|'{timestamp}"
relay.send_metrics(project_id, metrics_payload)

metric = metrics_consumer.get_metric()
Expand All @@ -59,12 +59,14 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con
"org_id": 1,
"project_id": project_id,
"name": "bar",
"unit": "",
"unit": "s",
"value": 17.0,
"type": "c",
"timestamp": timestamp,
}

metrics_consumer.assert_empty()


def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consumer):
metrics_consumer = metrics_consumer()
Expand All @@ -85,10 +87,11 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume
mini_sentry.add_full_project_config(project_id)

# Send two events to downstream and one to upstream
downstream.send_metrics(project_id, "foo:7|c")
downstream.send_metrics(project_id, "foo:5|c")
timestamp = int(datetime.now(tz=timezone.utc).timestamp())
downstream.send_metrics(project_id, f"foo:7|c|'{timestamp}")
downstream.send_metrics(project_id, f"foo:5|c|'{timestamp}")

upstream.send_metrics(project_id, "foo:3|c")
upstream.send_metrics(project_id, f"foo:3|c|'{timestamp}")

metric = metrics_consumer.get_metric(timeout=4)
metric.pop("timestamp")
Expand All @@ -100,3 +103,5 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume
"value": 15.0,
"type": "c",
}

metrics_consumer.assert_empty()

0 comments on commit d42e85d

Please sign in to comment.