diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 3daeb6405e6..1e23a7aa80c 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -23,7 +23,7 @@ use relay_system::{ use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers}; use crate::{ protocol, CounterType, DistributionType, GaugeType, Metric, MetricNamespace, - MetricResourceIdentifier, MetricType, MetricValue, SetType, + MetricResourceIdentifier, MetricType, MetricValue, MetricsContainer, SetType, }; /// Interval for the flush cycle of the [`AggregatorService`]. @@ -843,6 +843,16 @@ impl Bucket { } } +impl MetricsContainer for Bucket { + fn name(&self) -> &str { + self.name.as_str() + } + + fn len(&self) -> usize { + self.value.len() + } +} + /// Any error that may occur during aggregation. #[derive(Debug, Fail, PartialEq)] #[fail(display = "failed to aggregate metrics: {}", kind)] diff --git a/relay-metrics/src/protocol.rs b/relay-metrics/src/protocol.rs index 728766d17b1..ee519975a51 100644 --- a/relay-metrics/src/protocol.rs +++ b/relay-metrics/src/protocol.rs @@ -586,6 +586,31 @@ impl Metric { } } +/// Common interface for `Metric` and `Bucket`. +pub trait MetricsContainer { + /// Returns the full metric name (MRI) of this container. + fn name(&self) -> &str; + + /// Returns the number of raw data points in this container. + /// See [`crate::aggregation::BucketValue::len()`]. + fn len(&self) -> usize; + + /// Returns `true` if this container contains no values. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl MetricsContainer for Metric { + fn name(&self) -> &str { + self.name.as_str() + } + + fn len(&self) -> usize { + 1 + } +} + /// Iterator over parsed metrics returned from [`Metric::parse_all`]. #[derive(Clone, Debug)] pub struct ParseMetrics<'a> { diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index a8ba0ffff91..6d5f48e87cd 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -57,7 +57,7 @@ use { crate::actors::envelopes::SendMetrics, crate::actors::project_cache::UpdateRateLimits, crate::service::ServerErrorKind, - crate::utils::{BucketLimiter, EnvelopeLimiter}, + crate::utils::{EnvelopeLimiter, MetricsLimiter}, failure::ResultExt, relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor}, relay_quotas::ItemScoping, @@ -522,7 +522,7 @@ impl EncodeEnvelope { #[cfg(feature = "processing")] #[derive(Debug)] pub struct RateLimitFlushBuckets { - pub bucket_limiter: BucketLimiter, + pub bucket_limiter: MetricsLimiter, pub partition_key: Option, } @@ -1098,7 +1098,8 @@ impl EnvelopeProcessorService { item.set_payload(ContentType::Json, &replay[..]); true } - Err(_) => { + Err(error) => { + relay_log::warn!("failed to parse replay event: {}", LogError(&error)); context.track_outcome( Outcome::Invalid(DiscardReason::InvalidReplayEvent), DataCategory::Replay, @@ -2224,7 +2225,7 @@ impl EnvelopeProcessorService { } } - let buckets = bucket_limiter.into_buckets(); + let buckets = bucket_limiter.into_metrics(); if !buckets.is_empty() { // Forward buckets to envelope manager to send them to upstream or kafka: EnvelopeManager::from_registry().send(SendMetrics { diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index f5049c6242e..10b61eb20f1 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -17,7 +17,7 @@ use relay_filter::{matches_any_origin, FiltersConfig}; use relay_general::pii::{DataScrubbingConfig, PiiConfig}; use relay_general::store::{BreakdownsConfig, MeasurementsConfig}; use relay_general::types::SpanAttribute; -use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric}; +use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric, MetricsContainer}; use relay_quotas::{Quota, RateLimits, Scoping}; use relay_sampling::SamplingConfig; use relay_statsd::metric; @@ -34,7 +34,9 @@ use crate::metrics_extraction::transactions::TransactionMetricsConfig; use crate::metrics_extraction::TaggingRule; use crate::service::Registry; use crate::statsd::RelayCounters; -use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, Response}; +use crate::utils::{ + self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, MetricsLimiter, Response, +}; /// The expiry status of a project state. Return value of [`ProjectState::check_expiry`]. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] @@ -602,13 +604,33 @@ impl Project { self.last_updated_at = Instant::now(); } + /// Applies cached rate limits to the given metrics or metrics buckets. + /// + /// This only applies the rate limits currently stored on the project. + fn rate_limit_metrics(&self, metrics: Vec) -> Vec { + match (&self.state, self.scoping()) { + (Some(state), Some(scoping)) => { + match MetricsLimiter::create(metrics, &state.config.quotas, scoping) { + Ok(mut limiter) => { + limiter.enforce_limits(Ok(&self.rate_limits)); + limiter.into_metrics() + } + Err(metrics) => metrics, + } + } + _ => metrics, + } + } + /// Inserts given [buckets](Bucket) into the metrics aggregator. /// /// The buckets will be keyed underneath this project key. pub fn merge_buckets(&mut self, buckets: Vec) { - // TODO: rate limits if self.metrics_allowed() { - Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets)); + let buckets = self.rate_limit_metrics(buckets); + if !buckets.is_empty() { + Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets)); + } } } @@ -616,9 +638,11 @@ impl Project { /// /// The metrics will be keyed underneath this project key. pub fn insert_metrics(&mut self, metrics: Vec) { - // TODO: rate limits if self.metrics_allowed() { - Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics)); + let metrics = self.rate_limit_metrics(metrics); + if !metrics.is_empty() { + Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics)); + } } } @@ -894,7 +918,9 @@ impl Drop for Project { mod tests { use std::sync::Arc; - use relay_common::{ProjectId, ProjectKey}; + use relay_common::{ProjectId, ProjectKey, UnixTimestamp}; + use relay_metrics::{Bucket, BucketValue, Metric, MetricValue}; + use serde_json::json; use super::{Config, Project, ProjectState, StateChannel}; @@ -902,7 +928,7 @@ mod tests { fn get_state_expired() { for expiry in [9999, 0] { let config = Arc::new( - Config::from_json_value(serde_json::json!( + Config::from_json_value(json!( { "cache": { "project_expiry": expiry, @@ -937,7 +963,7 @@ mod tests { #[test] fn test_stale_cache() { let config = Arc::new( - Config::from_json_value(serde_json::json!( + Config::from_json_value(json!( { "cache": { "project_expiry": 100, @@ -967,4 +993,85 @@ mod tests { // still must be the project id set. assert!(!project.state.as_ref().unwrap().invalid()); } + + fn create_project(config: Option) -> Project { + let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); + let mut project = Project::new(project_key, Arc::new(Config::default())); + let mut project_state = ProjectState::allowed(); + project_state.project_id = Some(ProjectId::new(42)); + if let Some(config) = config { + project_state.config = serde_json::from_value(config).unwrap(); + } + project.state = Some(Arc::new(project_state)); + project + } + + fn create_transaction_metric() -> Metric { + Metric { + name: "d:transactions/foo".to_string(), + value: MetricValue::Counter(1.0), + timestamp: UnixTimestamp::now(), + tags: Default::default(), + } + } + + #[test] + fn test_rate_limit_incoming_metrics() { + let project = create_project(None); + let metrics = project.rate_limit_metrics(vec![create_transaction_metric()]); + + assert!(metrics.len() == 1); + } + + #[test] + fn test_rate_limit_incoming_metrics_no_quota() { + let project = create_project(Some(json!({ + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + }))); + + let metrics = project.rate_limit_metrics(vec![create_transaction_metric()]); + + assert!(metrics.is_empty()); + } + + fn create_transaction_bucket() -> Bucket { + Bucket { + name: "d:transactions/foo".to_string(), + value: BucketValue::Counter(1.0), + timestamp: UnixTimestamp::now(), + tags: Default::default(), + width: 10, + } + } + + #[test] + fn test_rate_limit_incoming_buckets() { + let project = create_project(None); + let metrics = project.rate_limit_metrics(vec![create_transaction_bucket()]); + + assert!(metrics.len() == 1); + } + + #[test] + fn test_rate_limit_incoming_buckets_no_quota() { + let project = create_project(Some(json!({ + "quotas": [{ + "id": "foo", + "categories": ["transaction"], + "window": 3600, + "limit": 0, + "reasonCode": "foo", + }] + }))); + + let metrics = project.rate_limit_metrics(vec![create_transaction_bucket()]); + + assert!(metrics.is_empty()); + } } diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 34d7fcdc7a1..86ea928ddcf 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -24,7 +24,7 @@ use crate::actors::project_upstream::UpstreamProjectSource; use crate::envelope::Envelope; use crate::service::Registry; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::{self, BucketLimiter, EnvelopeContext, GarbageDisposal, Response}; +use crate::utils::{self, EnvelopeContext, GarbageDisposal, MetricsLimiter, Response}; #[cfg(feature = "processing")] use { @@ -629,7 +629,7 @@ impl Handler for ProjectCache { // Check rate limits if necessary: let quotas = project_state.config.quotas.clone(); - let buckets = match BucketLimiter::create(buckets, quotas, scoping) { + let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { Ok(mut bucket_limiter) => { let cached_rate_limits = project.rate_limits().clone(); #[allow(unused_variables)] @@ -646,7 +646,7 @@ impl Handler for ProjectCache { return; } - bucket_limiter.into_buckets() + bucket_limiter.into_metrics() } Err(buckets) => buckets, }; diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index a8144aa870b..b57355f65f1 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -1,46 +1,42 @@ -//! Quota and rate limiting helpers for metrics buckets. +//! Quota and rate limiting helpers for metrics and metrics buckets. use relay_common::{DataCategory, UnixTimestamp}; -use relay_metrics::{Bucket, MetricNamespace, MetricResourceIdentifier}; +use relay_metrics::{MetricNamespace, MetricResourceIdentifier, MetricsContainer}; use relay_quotas::{ItemScoping, Quota, RateLimits, Scoping}; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; -/// Holds metrics buckets with some information about their contents. +/// Contains all data necessary to rate limit metrics or metrics buckets. #[derive(Debug)] -pub struct BucketLimiter { - /// A list of metrics buckets. - buckets: Vec, +pub struct MetricsLimiter> = Vec> { + /// A list of metrics or buckets. + metrics: Vec, /// The quotas set on the current project. - quotas: Vec, + quotas: Q, /// Project information. scoping: Scoping, - /// Binary index of buckets in the transaction namespace (used to retain). + /// Binary index of metrics/buckets in the transaction namespace (used to retain). transaction_buckets: Vec, - /// The number of transactions contributing to these buckets. + /// The number of transactions contributing to these metrics. transaction_count: usize, } -impl BucketLimiter { +impl>> MetricsLimiter { /// Create a new limiter instance. /// - /// Returns Ok if `buckets` contain transaction metrics, `buckets` otherwise. - pub fn create( - buckets: Vec, - quotas: Vec, - scoping: Scoping, - ) -> Result> { + /// Returns Ok if `metrics` contain transaction metrics, `metrics` otherwise. + pub fn create(buckets: Vec, quotas: Q, scoping: Scoping) -> Result> { let transaction_counts: Vec<_> = buckets .iter() - .map(|bucket| { - let mri = match MetricResourceIdentifier::parse(bucket.name.as_str()) { + .map(|metric| { + let mri = match MetricResourceIdentifier::parse(metric.name()) { Ok(mri) => mri, Err(_) => { - relay_log::error!("Invalid MRI: {}", bucket.name); + relay_log::error!("Invalid MRI: {}", metric.name()); return None; } }; @@ -51,9 +47,9 @@ impl BucketLimiter { } if mri.name == "duration" { - // The "duration" metric is extracted exactly once for every transaction, so we - // can use it to count the number of transactions. - let count = bucket.value.len(); + // The "duration" metric is extracted exactly once for every processed + // transaction, so we can use it to count the number of transactions. + let count = metric.len(); Some(count) } else { // For any other metric in the transaction namespace, we check the limit with @@ -74,7 +70,7 @@ impl BucketLimiter { if let Some(transaction_count) = transaction_count { let transaction_buckets = transaction_counts.iter().map(Option::is_some).collect(); Ok(Self { - buckets, + metrics: buckets, quotas, scoping, transaction_buckets, @@ -102,9 +98,9 @@ impl BucketLimiter { fn drop_with_outcome(&mut self, outcome: Outcome) { // Drop transaction buckets: - let buckets = std::mem::take(&mut self.buckets); + let metrics = std::mem::take(&mut self.metrics); - self.buckets = buckets + self.metrics = metrics .into_iter() .zip(self.transaction_buckets.iter()) .filter_map(|(bucket, is_transaction_bucket)| { @@ -126,13 +122,13 @@ impl BucketLimiter { } } - // Drop transaction-related buckets and create outcomes for any active rate limits. + // Drop transaction-related metrics and create outcomes for any active rate limits. // // If rate limits could not be checked for some reason, pass an `Err` to this function. - // In this case, transaction-related metrics buckets will also be dropped, and an "internal" + // In this case, transaction-related metrics will also be dropped, and an "internal" // outcome is generated. // - // Returns true if any buckets were dropped. + // Returns true if any metrics were dropped. pub fn enforce_limits(&mut self, rate_limits: Result<&RateLimits, ()>) -> bool { let mut dropped_stuff = false; match rate_limits { @@ -141,10 +137,11 @@ impl BucketLimiter { category: DataCategory::Transaction, scoping: &self.scoping, }; - let applied_rate_limits = rate_limits.check_with_quotas(&self.quotas, item_scoping); + let active_rate_limits = + rate_limits.check_with_quotas(self.quotas.as_ref(), item_scoping); // If a rate limit is active, discard transaction buckets. - if let Some(limit) = applied_rate_limits.longest() { + if let Some(limit) = active_rate_limits.longest() { self.drop_with_outcome(Outcome::RateLimited(limit.reason_code.clone())); dropped_stuff = true; } @@ -159,8 +156,8 @@ impl BucketLimiter { dropped_stuff } - /// Consume this struct and return its buckets. - pub fn into_buckets(self) -> Vec { - self.buckets + /// Consume this struct and return the contained metrics. + pub fn into_metrics(self) -> Vec { + self.metrics } }