Skip to content

Commit

Permalink
ref(metrics): Improve distribution value performance (#2483)
Browse files Browse the repository at this point in the history
Represents distribution metrics as plain unsorted vectors instead of
deduplicated btree maps. This offers greatly improved performance
characteristics during insertion, but comes at the expense of
potentially more memory usage if there are a lot of duplicate values.

During benchmarks, this change reveals approximately 500x speedup for
metrics with 10k to 1M data points per bucket. In practice, large
buckets are much more prevalent than duplicate values.

The new implementation uses `SmallVec`. Since distribution values are
always stored within a `BucketValue` that also contains gauges. Since
`GaugeValue` is a larger struct, that leaves space to inline three float
values before allocating a vector for distributions. Usual production
traffic indicates that this covers the majority of distribution buckets,
so this is a worthwhile optimization.
  • Loading branch information
jan-auer authored Sep 6, 2023
1 parent 56a0ba7 commit 4841abe
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 683 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Support ingestion of custom metrics when the `organizations:custom-metrics` feature flag is enabled. ([#2443](https://github.com/getsentry/relay/pull/2443))
- Merge span metrics and standalone spans extraction options. ([#2447](https://github.com/getsentry/relay/pull/2447))
- Support parsing aggregated metric buckets directly from statsd payloads. ([#2468](https://github.com/getsentry/relay/pull/2468), [#2472](https://github.com/getsentry/relay/pull/2472))
- Improve performance when ingesting distribution metrics with a large number of data points. ([#2483](https://github.com/getsentry/relay/pull/2483))
- Rename the envelope item type for StatsD payloads to "statsd". ([#2470](https://github.com/getsentry/relay/pull/2470))
- Add a nanojoule unit for profile measurements. ([#2478](https://github.com/getsentry/relay/pull/2478))

Expand Down
9 changes: 2 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures = { version = "0.3", default-features = false, features = ["std"] }
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
itertools = "0.10.5"
once_cell = "1.13.1"
rand = "0.8.5"
regex = "1.9.1"
serde = { version = "1.0.159", features = ["derive"] }
serde_json = "1.0.93"
Expand Down
2 changes: 1 addition & 1 deletion relay-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ chrono = { workspace = true }
data-encoding = "2.3.3"
ed25519-dalek = { version = "2.0.0", features = ["rand_core"] }
hmac = "0.12.1"
rand = "0.8.5"
rand = { workspace = true }
relay-common = { path = "../relay-common" }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ publish = false

[dependencies]
bytecount = "0.6.0"
float-ord = "0.3.1"
fnv = "1.0.7"
hash32 = "0.3.1"
itertools = { workspace = true }
Expand All @@ -22,17 +21,19 @@ relay-statsd = { path = "../relay-statsd" }
relay-system = { path = "../relay-system" }
serde = { workspace = true }
serde_json = { workspace = true }
smallvec = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["time"] }

[dev-dependencies]
criterion = { workspace = true }
insta = { workspace = true }
rand = { workspace = true }
relay-statsd = { path = "../relay-statsd", features = ["test"] }
relay-test = { path = "../relay-test" }
similar-asserts = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }

[[bench]]
name = "aggregator"
name = "benches"
harness = false
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::BTreeMap;
use std::fmt;

use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{AggregatorConfig, AggregatorService, Metric, MetricValue};
use relay_metrics::{AggregatorConfig, AggregatorService, DistributionValue, Metric, MetricValue};

/// Struct representing a testcase for which insert + flush are timed.
struct MetricInput {
Expand Down Expand Up @@ -159,5 +159,22 @@ fn bench_insert_and_flush(c: &mut Criterion) {
}
}

criterion_group!(benches, bench_insert_and_flush);
fn bench_distribution(c: &mut Criterion) {
let mut group = c.benchmark_group("DistributionValue");

for size in [1, 10, 100, 1000, 10_000, 100_000, 1_000_000] {
let values = std::iter::from_fn(|| Some(rand::random()))
.take(size as usize)
.collect::<Vec<f64>>();

group.throughput(criterion::Throughput::Elements(size));
group.bench_with_input(BenchmarkId::from_parameter(size), &values, |b, values| {
b.iter(|| DistributionValue::from_iter(black_box(values.iter().copied())))
});
}

group.finish();
}

criterion_group!(benches, bench_insert_and_flush, bench_distribution);
criterion_main!(benches);
42 changes: 19 additions & 23 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::statsd::{
MetricTimers,
};
use crate::{
protocol, Bucket, BucketValue, Metric, MetricNamespace, MetricResourceIdentifier, MetricValue,
protocol, Bucket, BucketValue, DistributionValue, Metric, MetricNamespace,
MetricResourceIdentifier, MetricValue,
};

/// Interval for the flush cycle of the [`AggregatorService`].
Expand All @@ -47,12 +48,12 @@ trait MergeValue: Into<BucketValue> {
}

impl MergeValue for BucketValue {
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError> {
fn merge_into(self, bucket_value: &mut Self) -> Result<(), AggregateMetricsError> {
match (bucket_value, self) {
(BucketValue::Counter(lhs), BucketValue::Counter(rhs)) => *lhs += rhs,
(BucketValue::Distribution(lhs), BucketValue::Distribution(rhs)) => lhs.extend(&rhs),
(BucketValue::Set(lhs), BucketValue::Set(rhs)) => lhs.extend(rhs),
(BucketValue::Gauge(lhs), BucketValue::Gauge(rhs)) => lhs.merge(rhs),
(Self::Counter(lhs), Self::Counter(rhs)) => *lhs += rhs,
(Self::Distribution(lhs), Self::Distribution(rhs)) => lhs.extend_from_slice(&rhs),
(Self::Set(lhs), Self::Set(rhs)) => lhs.extend(rhs),
(Self::Gauge(lhs), Self::Gauge(rhs)) => lhs.merge(rhs),
_ => return Err(AggregateMetricsErrorKind::InvalidTypes.into()),
}

Expand All @@ -67,7 +68,7 @@ impl MergeValue for MetricValue {
*counter += value;
}
(BucketValue::Distribution(distribution), MetricValue::Distribution(value)) => {
distribution.insert(value);
distribution.push(value);
}
(BucketValue::Set(set), MetricValue::Set(value)) => {
set.insert(value);
Expand Down Expand Up @@ -125,12 +126,14 @@ fn split_at(mut bucket: Bucket, size: usize) -> (Option<Bucket>, Option<Bucket>)
match bucket.value {
BucketValue::Counter(_) => (None, Some(bucket)),
BucketValue::Distribution(ref mut distribution) => {
let org = std::mem::take(distribution);
let mut org = std::mem::take(distribution);

let mut new_bucket = bucket.clone();
new_bucket.value =
BucketValue::Distribution(DistributionValue::from_slice(&org[split_at..]));

let mut iter = org.iter_values();
bucket.value = BucketValue::Distribution((&mut iter).take(split_at).collect());
new_bucket.value = BucketValue::Distribution(iter.collect());
org.truncate(split_at);
bucket.value = BucketValue::Distribution(org);

(Some(bucket), Some(new_bucket))
}
Expand Down Expand Up @@ -1509,7 +1512,7 @@ mod tests {
BucketValue::Distribution(dist![2., 4.])
.merge_into(&mut value)
.unwrap();
assert_eq!(value, BucketValue::Distribution(dist![1., 2., 2., 3., 4.]));
assert_eq!(value, BucketValue::Distribution(dist![1., 2., 3., 2., 4.]));
}

#[test]
Expand Down Expand Up @@ -1596,10 +1599,7 @@ mod tests {
expected_bucket_value_size + 5 * expected_set_entry_size
);
let distribution = BucketValue::Distribution(dist![1., 2., 3.]);
assert_eq!(
distribution.cost(),
expected_bucket_value_size + 3 * (8 + 4)
);
assert_eq!(distribution.cost(), expected_bucket_value_size + 3 * 8);
let gauge = BucketValue::Gauge(GaugeValue {
last: 43.,
min: 42.,
Expand Down Expand Up @@ -1852,14 +1852,10 @@ mod tests {
(
"d:transactions/foo@none",
MetricValue::Distribution(1.0),
fixed_cost + 12,
fixed_cost + 8,
), // New bucket + 1 element
("d:transactions/foo@none", MetricValue::Distribution(1.0), 0), // no new element
(
"d:transactions/foo@none",
MetricValue::Distribution(2.0),
12,
), // 1 new element
("d:transactions/foo@none", MetricValue::Distribution(1.0), 8), // duplicate element
("d:transactions/foo@none", MetricValue::Distribution(2.0), 8), // 1 new element
(
"g:transactions/foo@none",
MetricValue::Gauge(0.3),
Expand Down
Loading

0 comments on commit 4841abe

Please sign in to comment.