Skip to content

Commit

Permalink
feat(server): Send pre-aggregated metrics in batches (#966)
Browse files Browse the repository at this point in the history
Uses the metrics aggregator from #958 to send batches of pre-aggregated
buckets to the upstream instead of forwarding individual metric values.
If sending fails for any reason, the metrics are merged back into the
aggregator, which will retry flushing after the next interval.

Metric envelopes are not queued like regular envelopes. Instead, they go
straight to the EventProcessor worker pool, where they are parsed,
normalized and sent to the project's aggregator. This ensures that
metric requests do not create long running futures that would slow down
the system. For mixed envelopes, metric items are split off and handled
separately.

Metrics aggregators are spawned on the projects thread, which runs the
project cache and manages all project state access. In the future,
metrics aggregation will have to be moved to a separate resource to
ensure that project state requests remain instant.
  • Loading branch information
jjbayer authored Mar 30, 2021
1 parent fe21fcc commit e2f04d6
Show file tree
Hide file tree
Showing 18 changed files with 898 additions and 221 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
**Internal**:

- Emit the `quantity` field for outcomes of events. This field describes the total size in bytes for attachments or the event count for all other categories. A separate outcome is emitted for attachments in a rejected envelope, if any, in addition to the event outcome. ([#942](https://github.com/getsentry/relay/pull/942))
- Add experimental metrics ingestion with bucketing or pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958))
- Add experimental metrics ingestion with bucketing and pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948), [#952](https://github.com/getsentry/relay/pull/952), [#958](https://github.com/getsentry/relay/pull/958), [#966](https://github.com/getsentry/relay/pull/966))
- Change HTTP response for upstream timeouts from 502 to 504. ([#859](https://github.com/getsentry/relay/pull/859))
- Add rule id to outcomes coming from transaction sampling. ([#953](https://github.com/getsentry/relay/pull/953))
- Add support for breakdowns ingestion. ([#934](https://github.com/getsentry/relay/pull/934))
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ num_cpus = "1.13.0"
relay-auth = { path = "../relay-auth" }
relay-common = { path = "../relay-common" }
relay-log = { path = "../relay-log", features = ["init"] }
relay-metrics = { path = "../relay-metrics" }
relay-redis = { path = "../relay-redis" }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.55"
Expand Down
8 changes: 8 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};

use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey};
use relay_common::Uuid;
use relay_metrics::AggregatorConfig;
use relay_redis::RedisConfig;

use crate::byte_size::ByteSize;
Expand Down Expand Up @@ -813,6 +814,8 @@ struct ConfigValues {
processing: Processing,
#[serde(default)]
outcomes: Outcomes,
#[serde(default)]
aggregator: AggregatorConfig,
}

impl ConfigObject for ConfigValues {
Expand Down Expand Up @@ -1421,6 +1424,11 @@ impl Config {
pub fn max_rate_limit(&self) -> Option<u64> {
self.values.processing.max_rate_limit.map(u32::into)
}

/// Returns configuration for the metrics [aggregator](relay_metrics::Aggregator).
pub fn aggregator_config(&self) -> AggregatorConfig {
self.values.aggregator.clone()
}
}

impl Default for Config {
Expand Down
173 changes: 168 additions & 5 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use relay_common::{MonotonicResult, UnixTimestamp};
use crate::{Metric, MetricType, MetricValue};

/// The [aggregated value](Bucket::value) of a metric bucket.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", content = "value")]
pub enum BucketValue {
/// Aggregates [`MetricValue::Counter`] values by adding them into a single value.
Expand Down Expand Up @@ -195,7 +195,7 @@ pub struct Bucket {
/// A list of tags adding dimensions to the metric for filtering and aggregation.
///
/// See [`Metric::tags`]. Every combination of tags results in a different bucket.
#[serde(skip_serializing_if = "BTreeMap::is_empty")]
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub tags: BTreeMap<String, String>,
}

Expand Down Expand Up @@ -721,7 +721,170 @@ mod tests {
}

#[test]
fn test_merge_counters() {
fn test_parse_buckets() {
let json = r#"[
{
"name": "endpoint.response_time",
"unit": "ms",
"value": [36, 49, 57, 68],
"type": "d",
"timestamp": 1615889440,
"tags": {
"route": "user_index"
}
}
]"#;

// TODO: This should parse the unit.
let buckets = Bucket::parse_all(json.as_bytes()).unwrap();
insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
timestamp: UnixTimestamp(1615889440),
name: "endpoint.response_time",
value: Distribution(
[
36.0,
49.0,
57.0,
68.0,
],
),
tags: {
"route": "user_index",
},
},
]
"###);
}

#[test]
fn test_parse_bucket_defaults() {
let json = r#"[
{
"name": "endpoint.hits",
"value": 4,
"type": "c",
"timestamp": 1615889440
}
]"#;

let buckets = Bucket::parse_all(json.as_bytes()).unwrap();
insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
timestamp: UnixTimestamp(1615889440),
name: "endpoint.hits",
value: Counter(
4.0,
),
tags: {},
},
]
"###);
}

#[test]
fn test_buckets_roundtrip() {
let json = r#"[
{
"timestamp": 1615889440,
"name": "endpoint.response_time",
"type": "d",
"value": [
36.0,
49.0,
57.0,
68.0
],
"tags": {
"route": "user_index"
}
},
{
"timestamp": 1615889440,
"name": "endpoint.hits",
"type": "c",
"value": 4.0,
"tags": {
"route": "user_index"
}
}
]"#;

let buckets = Bucket::parse_all(json.as_bytes()).unwrap();
let serialized = serde_json::to_string_pretty(&buckets).unwrap();
assert_eq!(json, serialized);
}

#[test]
fn test_bucket_value_merge_counter() {
let mut value = BucketValue::Counter(42.);
BucketValue::Counter(43.).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Counter(85.));
}

#[test]
fn test_bucket_value_merge_distribution() {
let mut value = BucketValue::Distribution(vec![1., 2., 3.]);
BucketValue::Distribution(vec![2., 4.])
.merge_into(&mut value)
.unwrap();
// TODO: This should be ordered
assert_eq!(value, BucketValue::Distribution(vec![1., 2., 3., 2., 4.]));
}

#[test]
fn test_bucket_value_merge_set() {
let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
BucketValue::Set(vec![2, 3].into_iter().collect())
.merge_into(&mut value)
.unwrap();
assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
}

#[test]
fn test_bucket_value_merge_gauge() {
let mut value = BucketValue::Gauge(42.);
BucketValue::Gauge(43.).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Gauge(43.));
}

#[test]
fn test_bucket_value_insert_counter() {
let mut value = BucketValue::Counter(42.);
MetricValue::Counter(43.).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Counter(85.));
}

#[test]
fn test_bucket_value_insert_distribution() {
let mut value = BucketValue::Distribution(vec![1., 2., 3.]);
MetricValue::Distribution(2.0)
.merge_into(&mut value)
.unwrap();
// TODO: This should be ordered
assert_eq!(value, BucketValue::Distribution(vec![1., 2., 3., 2.]));
}

#[test]
fn test_bucket_value_insert_set() {
let mut value = BucketValue::Set(vec![1, 2].into_iter().collect());
MetricValue::Set(3).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
MetricValue::Set(2).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Set(vec![1, 2, 3].into_iter().collect()));
}

#[test]
fn test_bucket_value_insert_gauge() {
let mut value = BucketValue::Gauge(42.);
MetricValue::Gauge(43.).merge_into(&mut value).unwrap();
assert_eq!(value, BucketValue::Gauge(43.));
}

#[test]
fn test_aggregator_merge_counters() {
relay_test::setup();

let config = AggregatorConfig::default();
Expand Down Expand Up @@ -750,7 +913,7 @@ mod tests {
}

#[test]
fn test_merge_similar_timestamps() {
fn test_aggregator_merge_timestamps() {
relay_test::setup();
let config = AggregatorConfig {
bucket_interval: 10,
Expand Down Expand Up @@ -801,7 +964,7 @@ mod tests {
}

#[test]
fn test_mixup_types() {
fn test_aggregator_mixup_types() {
relay_test::setup();
let config = AggregatorConfig {
bucket_interval: 10,
Expand Down
33 changes: 33 additions & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,39 @@
//! }
//! ]
//! ```
//!
//! # Ingestion
//!
//! Processing Relays write aggregate buckets into the ingestion Kafka stream. The schema is similar
//! to the aggregation payload, with the addition of scoping information:
//!
//! ```json
//! [
//! {
//! "org_id": 1,
//! "project_id": 42,
//! "name": "endpoint.response_time",
//! "unit": "ms",
//! "value": [36, 49, 57, 68],
//! "type": "d",
//! "timestamp": 1615889440,
//! "tags": {
//! "route": "user_index"
//! }
//! },
//! {
//! "org_id": 1,
//! "project_id": 42,
//! "name": "endpoint.hits",
//! "value": 4,
//! "type": "c",
//! "timestamp": 1615889440,
//! "tags": {
//! "route": "user_index"
//! }
//! }
//! ]
//! ```
#![warn(missing_docs)]

mod aggregation;
Expand Down
6 changes: 6 additions & 0 deletions relay-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ fn main() {
env::var("CARGO_PKG_VERSION").unwrap()
)
.unwrap();
writeln!(
f,
"pub const CLIENT: &str = \"sentry.relay/{}\";",
env::var("CARGO_PKG_VERSION").unwrap()
)
.unwrap();
println!("cargo:rerun-if-changed=build.rs\n");
println!("cargo:rerun-if-changed=Cargo.toml\n");
}
Loading

0 comments on commit e2f04d6

Please sign in to comment.