Skip to content

Commit

Permalink
feat(metric-meta): Add support for metric metadata (#2751)
Browse files Browse the repository at this point in the history
Implements `metric_meta` envelope item and storage in Redis.

Epic: getsentry/sentry#60260
  • Loading branch information
Dav1dde authored Nov 27, 2023
1 parent 5e74635 commit c44dde2
Show file tree
Hide file tree
Showing 24 changed files with 875 additions and 57 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

- `normalize_performance_score` now handles `PerformanceScoreProfile` configs with zero weight components and component weight sums of any number greater than 0. ([#2756](https://github.com/getsentry/relay/pull/2756))

**Internal**:

- Add support for metric metadata. ([#2751](https://github.com/getsentry/relay/pull/2751))

## 23.11.1

**Features**:
Expand Down
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ debug = true

[workspace.dependencies]
anyhow = "1.0.66"
chrono = { version = "0.4.29", default-features = false, features = [
chrono = { version = "0.4.31", default-features = false, features = [
"std",
"serde",
] }
clap = { version = "4.4.6" }
criterion = "0.5"
futures = { version = "0.3", default-features = false, features = ["std"] }
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
hash32 = "0.3.1"
hashbrown = "0.13.2"
itertools = "0.10.5"
once_cell = "1.13.1"
rand = "0.8.5"
Expand All @@ -32,7 +34,7 @@ serde_json = "1.0.93"
serde_yaml = "0.9.17"
schemars = { version = "=0.8.10", features = ["uuid1", "chrono"] }
similar-asserts = "1.4.2"
smallvec = { version = "1.10.0", features = ["serde"] }
smallvec = { version = "1.11.2", features = ["serde"] }
thiserror = "1.0.38"
tokio = { version = "1.28.0", features = ["macros", "sync", "tracing"] }
url = "2.1.1"
Expand Down
20 changes: 20 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ struct Metrics {
/// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
/// Defaults to `1.0` (100%).
sample_rate: f32,
/// Code locations expiry in seconds.
///
/// Defaults to 15 days.
meta_locations_expiry: u64,
/// Maximum amount of code locations to store per metric.
///
/// Defaults to 5.
meta_locations_max: usize,
}

impl Default for Metrics {
Expand All @@ -513,6 +521,8 @@ impl Default for Metrics {
hostname_tag: None,
buffering: true,
sample_rate: 1.0,
meta_locations_expiry: 15 * 24 * 60 * 60,
meta_locations_max: 5,
}
}
}
Expand Down Expand Up @@ -1758,6 +1768,16 @@ impl Config {
self.values.metrics.sample_rate
}

/// Returns the maximum amount of code locations per metric.
pub fn metrics_meta_locations_max(&self) -> usize {
self.values.metrics.meta_locations_max
}

/// Returns the expiry for code locations.
pub fn metrics_meta_locations_expiry(&self) -> Duration {
Duration::from_secs(self.values.metrics.meta_locations_expiry)
}

/// Returns the default timeout for all upstream HTTP requests.
pub fn http_timeout(&self) -> Duration {
Duration::from_secs(self.values.http.timeout.into())
Expand Down
3 changes: 3 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum Feature {
/// Enable processing profiles
#[serde(rename = "organizations:profiling")]
Profiling,
/// Enable metric metadata.
#[serde(rename = "organizations:metric-meta")]
MetricMeta,

/// Deprecated, still forwarded for older downstream Relays.
#[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")]
Expand Down
8 changes: 7 additions & 1 deletion relay-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ edition = "2021"
license-file = "../LICENSE.md"
publish = false

[features]
redis = ["relay-redis/impl"]

[dependencies]
bytecount = "0.6.0"
chrono = { workspace = true }
fnv = "1.0.7"
hash32 = "0.3.1"
hash32 = { workspace = true }
hashbrown = { workspace = true }
itertools = { workspace = true }
relay-base-schema = { path = "../relay-base-schema" }
relay-common = { path = "../relay-common" }
relay-log = { path = "../relay-log" }
relay-redis = { path = "../relay-redis", optional = true }
relay-statsd = { path = "../relay-statsd" }
relay-system = { path = "../relay-system" }
serde = { workspace = true }
Expand Down
25 changes: 1 addition & 24 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,29 +399,6 @@ fn parse_gauge(string: &str) -> Option<GaugeValue> {
})
}

/// Parses an MRI from a string and a separate type.
///
/// The given string must be a part of the MRI, including the following components:
/// - (optional) The namespace. If missing, it is defaulted to `"custom"`
/// - (required) The metric name.
/// - (optional) The unit. If missing, it is defaulted to "none".
///
/// The metric type is never part of this string and must be supplied separately.
fn parse_mri(string: &str, ty: MetricType) -> Option<MetricResourceIdentifier<'_>> {
let (name_and_namespace, unit) = protocol::parse_name_unit(string)?;

let (raw_namespace, name) = name_and_namespace
.split_once('/')
.unwrap_or(("custom", name_and_namespace));

Some(MetricResourceIdentifier {
ty,
name: name.into(),
namespace: raw_namespace.parse().ok()?,
unit,
})
}

/// Parses tags in the format `tag1,tag2:value`.
///
/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
Expand Down Expand Up @@ -640,7 +617,7 @@ impl Bucket {
let (mri_str, values_str) = components.next()?.split_once(':')?;
let ty = components.next().and_then(|s| s.parse().ok())?;

let mri = parse_mri(mri_str, ty)?;
let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
let value = match ty {
MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),
Expand Down
4 changes: 4 additions & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
)]

pub mod aggregator;
pub mod meta;

mod aggregatorservice;
mod bucket;
Expand All @@ -78,5 +79,8 @@ mod statsd;

pub use aggregatorservice::*;
pub use bucket::*;
#[cfg(feature = "redis")]
pub use meta::RedisMetricMetaStore;
pub use meta::{MetaAggregator, MetricMeta};
pub use protocol::*;
pub use router::*;
148 changes: 148 additions & 0 deletions relay-metrics/src/meta/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::{
collections::{HashMap, HashSet},
hash::Hash,
};

use relay_base_schema::project::ProjectKey;

use super::{Item, Location, MetricMeta, StartOfDayUnixTimestamp};
use crate::{statsd::MetricCounters, MetricResourceIdentifier};

/// A metrics meta aggregator.
///
/// Aggregates metric metadata based on their scope (project, mri, timestamp) and
/// only keeps the most relevant entries.
///
/// Currently we track the first N amount of unique metric meta elements we get.
///
/// This should represent the actual adoption rate of different code versions.
///
/// This aggregator is purely in memeory and will lose its state on restart,
/// which may cause multiple different items being emitted after restarts.
/// For this we have de-deuplication in the storage and the volume overall
/// of this happening is small enough to just add it to the storage worst case.
#[derive(Debug)]
pub struct MetaAggregator {
/// All tracked code locations.
locations: hashbrown::HashMap<Scope, HashSet<Location>>,

/// Maximum tracked locations.
max_locations: usize,
}

impl MetaAggregator {
/// Creates a new metrics meta aggregator.
pub fn new(max_locations: usize) -> Self {
Self {
locations: hashbrown::HashMap::new(),
max_locations,
}
}

/// Adds a new meta item to the aggregator.
///
/// Returns a new [`MetricMeta`] element when the element should be stored
/// or sent upstream for storage.
///
/// Returns `None` when the meta item was already seen or is not considered relevant.
pub fn add(&mut self, project_key: ProjectKey, meta: MetricMeta) -> Option<MetricMeta> {
let mut send_upstream = HashMap::new();

for (mri, items) in meta.mapping {
let scope = Scope {
timestamp: meta.timestamp,
project_key,
mri,
};

if let Some(items) = self.add_scoped(&scope, items) {
send_upstream.insert(scope.mri, items);
}
}

if send_upstream.is_empty() {
return None;
}

relay_statsd::metric!(counter(MetricCounters::MetaAggregatorUpdate) += 1);
Some(MetricMeta {
timestamp: meta.timestamp,
mapping: send_upstream,
})
}

/// Retrieves all currently relevant metric meta for a project.
pub fn get_all_relevant(&self, project_key: ProjectKey) -> impl Iterator<Item = MetricMeta> {
let locations = self
.locations
.iter()
.filter(|(scope, _)| scope.project_key == project_key);

let mut result = HashMap::new();

for (scope, locations) in locations {
result
.entry(scope.timestamp)
.or_insert_with(|| MetricMeta {
timestamp: scope.timestamp,
mapping: HashMap::new(),
})
.mapping
.entry(scope.mri.clone()) // This clone sucks
.or_insert_with(Vec::new)
.extend(locations.iter().cloned().map(Item::Location));
}

result.into_values()
}

/// Remove all contained state related to a project.
pub fn clear(&mut self, project_key: ProjectKey) {
self.locations
.retain(|scope, _| scope.project_key != project_key);
}

fn add_scoped(&mut self, scope: &Scope, items: Vec<Item>) -> Option<Vec<Item>> {
// Entry ref needs hashbrown, we would have to clone the scope without or do a separate lookup.
let locations = self.locations.entry_ref(scope).or_default();
let mut send_upstream = Vec::new();

for item in items {
match item {
Item::Location(location) => {
if locations.len() > self.max_locations {
break;
}

if !locations.contains(&location) {
locations.insert(location.clone());
send_upstream.push(Item::Location(location));
}
}
Item::Unknown => {}
}
}

(!send_upstream.is_empty()).then_some(send_upstream)
}
}

/// The metadata scope.
///
/// We scope metadata by project, mri and day,
/// represented as a unix timestamp at the beginning of the day.
///
/// The technical scope (e.g. redis key) also includes the organization id, but this
/// can be inferred from the project.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Scope {
pub timestamp: StartOfDayUnixTimestamp,
pub project_key: ProjectKey,
pub mri: MetricResourceIdentifier<'static>,
}

impl From<&Scope> for Scope {
fn from(value: &Scope) -> Self {
value.clone()
}
}
11 changes: 11 additions & 0 deletions relay-metrics/src/meta/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! Functionality for aggregating and storing of metrics metadata.
mod aggregator;
mod protocol;
#[cfg(feature = "redis")]
mod redis;

pub use self::aggregator::*;
pub use self::protocol::*;
#[cfg(feature = "redis")]
pub use self::redis::*;
Loading

0 comments on commit c44dde2

Please sign in to comment.