diff --git a/CHANGELOG.md b/CHANGELOG.md index b33ced2f89..f39daa7665 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## Unreleased + +**Bug Fixes**: + +- Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553)) + + +**Internal**: + +- Aggregate metrics before rate limiting. ([#3746](https://github.com/getsentry/relay/pull/3746)) + ## 24.6.0 **Bug fixes**: diff --git a/relay-server/src/metrics/rate_limits.rs b/relay-server/src/metrics/rate_limits.rs index 1bb9f5cb20..de05d00c6b 100644 --- a/relay-server/src/metrics/rate_limits.rs +++ b/relay-server/src/metrics/rate_limits.rs @@ -258,12 +258,6 @@ impl>> MetricsLimiter { false } - /// Returns a reference to the contained metrics. - #[cfg(feature = "processing")] - pub fn buckets(&self) -> impl Iterator { - self.buckets.iter().map(|s| &s.bucket) - } - /// Consume this struct and return the contained metrics. pub fn into_buckets(self) -> Vec { self.buckets.into_iter().map(|s| s.bucket).collect() diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index bed19999ab..de09527021 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -178,8 +178,6 @@ impl ServiceState { upstream_relay: upstream_relay.clone(), test_store: test_store.clone(), #[cfg(feature = "processing")] - aggregator: aggregator.clone(), - #[cfg(feature = "processing")] store_forwarder: store.clone(), }, metric_outcomes.clone(), diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index f7e5a204a7..e662a22de7 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -54,7 +54,7 @@ use { RedisSetLimiterOptions, }, relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups}, - relay_metrics::{Aggregator, MergeBuckets, RedisMetricMetaStore}, + relay_metrics::RedisMetricMetaStore, relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter}, relay_redis::RedisPool, std::iter::Chain, @@ -914,13 +914,6 @@ pub struct SubmitClientReports { pub scoping: Scoping, } -/// Applies rate limits to metrics buckets and forwards them to the [`Aggregator`]. -#[cfg(feature = "processing")] -#[derive(Debug)] -pub struct RateLimitBuckets { - pub bucket_limiter: MetricsLimiter, -} - /// CPU-intensive processing tasks for envelopes. #[derive(Debug)] pub enum EnvelopeProcessor { @@ -932,8 +925,6 @@ pub enum EnvelopeProcessor { EncodeMetricMeta(Box), SubmitEnvelope(Box), SubmitClientReports(Box), - #[cfg(feature = "processing")] - RateLimitBuckets(RateLimitBuckets), } impl EnvelopeProcessor { @@ -948,8 +939,6 @@ impl EnvelopeProcessor { EnvelopeProcessor::EncodeMetricMeta(_) => "EncodeMetricMeta", EnvelopeProcessor::SubmitEnvelope(_) => "SubmitEnvelope", EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports", - #[cfg(feature = "processing")] - EnvelopeProcessor::RateLimitBuckets(_) => "RateLimitBuckets", } } } @@ -1020,15 +1009,6 @@ impl FromMessage for EnvelopeProcessor { } } -#[cfg(feature = "processing")] -impl FromMessage for EnvelopeProcessor { - type Response = NoResponse; - - fn from_message(message: RateLimitBuckets, _: ()) -> Self { - Self::RateLimitBuckets(message) - } -} - /// Service implementing the [`EnvelopeProcessor`] interface. /// /// This service handles messages in a worker pool with configurable concurrency. @@ -1043,8 +1023,6 @@ pub struct Addrs { pub envelope_processor: Addr, pub project_cache: Addr, pub outcome_aggregator: Addr, - #[cfg(feature = "processing")] - pub aggregator: Addr, pub upstream_relay: Addr, pub test_store: Addr, #[cfg(feature = "processing")] @@ -1058,8 +1036,6 @@ impl Default for Addrs { envelope_processor: Addr::dummy(), project_cache: Addr::dummy(), outcome_aggregator: Addr::dummy(), - #[cfg(feature = "processing")] - aggregator: Addr::dummy(), upstream_relay: Addr::dummy(), test_store: Addr::dummy(), #[cfg(feature = "processing")] @@ -2302,13 +2278,69 @@ impl EnvelopeProcessorService { }); } - /// Check and apply rate limits to metrics buckets. #[cfg(feature = "processing")] - fn handle_rate_limit_buckets(&self, message: RateLimitBuckets) { - use relay_quotas::RateLimits; + fn rate_limit_buckets( + &self, + scoping: Scoping, + project_state: &ProjectState, + mut buckets: Vec, + ) -> Vec { + let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { + return buckets; + }; + + let global_config = self.inner.global_config.current(); + let namespaces = buckets + .iter() + .filter_map(|bucket| bucket.name.try_namespace()) + .counts(); + + let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas()); + for (namespace, quantity) in namespaces { + let item_scoping = scoping.metric_bucket(namespace); + + let limits = match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) { + Ok(limits) => limits, + Err(err) => { + relay_log::error!( + error = &err as &dyn std::error::Error, + "failed to check redis rate limits" + ); + break; + } + }; + + if limits.is_limited() { + let rejected; + (buckets, rejected) = utils::split_off(buckets, |bucket| { + bucket.name.try_namespace() == Some(namespace) + }); + + let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); + self.inner.metric_outcomes.track( + scoping, + &rejected, + Outcome::RateLimited(reason_code), + ); + + self.inner.addrs.project_cache.send(UpdateRateLimits::new( + item_scoping.scoping.project_key, + limits, + )); + } + } + + match MetricsLimiter::create(buckets, project_state.config.quotas.clone(), scoping) { + Err(buckets) => buckets, + Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter), + } + } + + /// Check and apply rate limits to metrics buckets for transactions and spans. + #[cfg(feature = "processing")] + fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec { relay_log::trace!("handle_rate_limit_buckets"); - let RateLimitBuckets { mut bucket_limiter } = message; let scoping = *bucket_limiter.scoping(); @@ -2377,88 +2409,7 @@ impl EnvelopeProcessorService { } } - let project_key = bucket_limiter.scoping().project_key; - let buckets = bucket_limiter.into_buckets(); - - if !buckets.is_empty() { - self.inner - .addrs - .aggregator - .send(MergeBuckets::new(project_key, buckets)); - } - } - - #[cfg(feature = "processing")] - fn rate_limit_buckets_by_namespace( - &self, - scoping: Scoping, - buckets: Vec, - quotas: CombinedQuotas<'_>, - ) -> Vec { - let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else { - return buckets; - }; - - let buckets_by_ns: HashMap> = buckets - .into_iter() - .filter_map(|bucket| Some((bucket.name.try_namespace()?, bucket))) - .into_group_map(); - - buckets_by_ns - .into_iter() - .flat_map(|(namespace, buckets)| { - let item_scoping = scoping.metric_bucket(namespace); - self.rate_limit_buckets(item_scoping, buckets, quotas, rate_limiter) - }) - .collect() - } - - /// Returns `true` if the batches should be rate limited. - #[cfg(feature = "processing")] - fn rate_limit_buckets( - &self, - item_scoping: relay_quotas::ItemScoping, - buckets: Vec, - quotas: CombinedQuotas<'_>, - rate_limiter: &RedisRateLimiter, - ) -> Vec { - let batch_size = self.inner.config.metrics_max_batch_size_bytes(); - let quantity = BucketsView::from(&buckets) - .by_size(batch_size) - .flatten() - .count(); - - // Check with redis if the throughput limit has been exceeded, while also updating - // the count so that other relays will be updated too. - match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) { - Ok(limits) if limits.is_limited() => { - relay_log::debug!("dropping {quantity} buckets due to throughput rate limit"); - - let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); - - self.inner.metric_outcomes.track( - *item_scoping.scoping, - &buckets, - Outcome::RateLimited(reason_code), - ); - - self.inner.addrs.project_cache.send(UpdateRateLimits::new( - item_scoping.scoping.project_key, - limits, - )); - - Vec::new() - } - Ok(_) => buckets, - Err(e) => { - relay_log::error!( - error = &e as &dyn std::error::Error, - "failed to check redis rate limits" - ); - - buckets - } - } + bucket_limiter.into_buckets() } /// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets. @@ -2542,16 +2493,13 @@ impl EnvelopeProcessorService { use crate::constants::DEFAULT_EVENT_RETENTION; use crate::services::store::StoreMetrics; - let global_config = self.inner.global_config.current(); - for (scoping, message) in message.scopes { let ProjectMetrics { buckets, project_state, } = message; - let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas()); - let buckets = self.rate_limit_buckets_by_namespace(scoping, buckets, quotas); + let buckets = self.rate_limit_buckets(scoping, &project_state, buckets); let limits = project_state.get_cardinality_limits(); let buckets = self.cardinality_limit_buckets(scoping, limits, buckets); @@ -2800,8 +2748,6 @@ impl EnvelopeProcessorService { EnvelopeProcessor::EncodeMetricMeta(m) => self.handle_encode_metric_meta(*m), EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m), EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m), - #[cfg(feature = "processing")] - EnvelopeProcessor::RateLimitBuckets(m) => self.handle_rate_limit_buckets(m), } }); } @@ -2828,18 +2774,6 @@ impl EnvelopeProcessorService { EnvelopeProcessor::EncodeMetricMeta(_) => AppFeature::MetricMeta.into(), EnvelopeProcessor::SubmitEnvelope(v) => AppFeature::from(v.envelope.group()).into(), EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(), - #[cfg(feature = "processing")] - EnvelopeProcessor::RateLimitBuckets(v) => { - relay_metrics::cogs::ByCount(v.bucket_limiter.buckets().filter(|b| { - // Only spans and transactions are actually rate limited at this point. - // Other metrics do not cause costs. - matches!( - b.name.try_namespace(), - Some(MetricNamespace::Spans | MetricNamespace::Transactions) - ) - })) - .into() - } } } } diff --git a/relay-server/src/services/project.rs b/relay-server/src/services/project.rs index 8f2d37e9c2..bddd8e1287 100644 --- a/relay-server/src/services/project.rs +++ b/relay-server/src/services/project.rs @@ -9,9 +9,8 @@ use relay_cardinality::CardinalityLimit; use relay_config::Config; use relay_dynamic_config::{ErrorBoundary, Feature, LimitedProjectConfig, ProjectConfig}; use relay_filter::matches_any_origin; -use relay_metrics::aggregator::AggregatorConfig; use relay_metrics::{ - aggregator, Aggregator, Bucket, MergeBuckets, MetaAggregator, MetricMeta, MetricNamespace, + Aggregator, Bucket, MergeBuckets, MetaAggregator, MetricMeta, MetricNamespace, }; use relay_quotas::{ CachedRateLimits, DataCategory, MetricNamespaceScoping, Quota, RateLimits, Scoping, @@ -27,9 +26,8 @@ use url::Url; use crate::envelope::Envelope; use crate::metrics::{MetricOutcomes, MetricsLimiter}; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; -#[cfg(feature = "processing")] -use crate::services::processor::RateLimitBuckets; -use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor, ProjectMetrics}; +use crate::services::processor::{EncodeMetricMeta, EnvelopeProcessor}; +use crate::services::project::metrics::{apply_project_state, filter_namespaces}; use crate::services::project_cache::{BucketSource, CheckedEnvelope, ProjectCache, RequestUpdate}; use crate::extractors::RequestMeta; @@ -39,8 +37,6 @@ use crate::utils::{self, EnvelopeLimiter, ManagedEnvelope, RetryBackoff}; mod metrics; -use self::metrics::{Buckets, Filtered}; - /// The expiry status of a project state. Return value of [`ProjectState::check_expiry`]. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] enum Expiry { @@ -428,31 +424,19 @@ enum GetOrFetch<'a> { #[derive(Debug)] enum State { Cached(Arc), - Pending(Box), + Pending, } impl State { fn state_value(&self) -> Option> { match self { State::Cached(state) => Some(Arc::clone(state)), - State::Pending(_) => None, + State::Pending => None, } } - /// Sets the cached state using provided `ProjectState`. - /// If the variant was pending, the buckets will be returned. - fn set_state(&mut self, state: Arc) -> Option> { - match std::mem::replace(self, Self::Cached(state)) { - State::Pending(agg) => Some(Buckets::new(agg.into_buckets())), - State::Cached(_) => None, - } - } - - fn new(config: AggregatorConfig) -> Self { - Self::Pending(Box::new(aggregator::Aggregator::named( - "metrics-buffer".to_string(), - config, - ))) + fn new() -> Self { + Self::Pending } } @@ -484,7 +468,7 @@ impl Project { next_fetch_attempt: None, last_updated_at: Instant::now(), project_key: key, - state: State::new(config.permissive_aggregator_config()), + state: State::new(), state_channel: None, rate_limits: CachedRateLimits::new(), last_no_cache: Instant::now(), @@ -495,16 +479,6 @@ impl Project { } } - /// If we know that a project is disabled, disallow metrics, too. - fn metrics_allowed(&self) -> bool { - if let Some(state) = self.valid_state() { - state.check_disabled(&self.config).is_ok() - } else { - // Projects without state go back to the original state of allowing metrics. - true - } - } - /// Returns the [`ReservoirCounters`] for the project. pub fn reservoir_counters(&self) -> ReservoirCounters { self.reservoir_counters.clone() @@ -575,103 +549,43 @@ impl Project { self.last_updated_at = Instant::now(); } - fn merge_buckets_into_aggregator( - &mut self, - aggregator: &Addr, - #[allow(unused_variables)] envelope_processor: &Addr, - outcome_aggregator: &Addr, - metric_outcomes: &MetricOutcomes, - buckets: Buckets, - ) { - let state = match self.state { - State::Cached(ref state) => { - // TODO: When the state is present but expired, we should send buckets - // to the metrics buffer instead. In practice, the project state should be - // refreshed at the time when the buckets emerge from the aggregator though. - state - } - State::Pending(ref mut inner_agg) => { - // We need to queue the metrics in a temporary aggregator until the project state becomes available. - relay_log::debug!("sending metrics to metrics-buffer"); - inner_agg.merge_all(self.project_key, buckets, None); - return; - } - }; - - let Some(scoping) = self.scoping() else { - relay_log::error!( - "there is no scoping due to missing project id: dropping {} buckets", - buckets.len() - ); - return; - }; - - // Only send if the project state is valid, otherwise drop the buckets. - if state.check_disabled(self.config.as_ref()).is_err() { - relay_log::trace!("project state invalid: dropping {} buckets", buckets.len()); - return; - } - - let buckets = buckets.apply_project_state(metric_outcomes, state, scoping); - - if buckets.is_empty() { - return; - } - - // Check rate limits if necessary. - let quotas = state.config.quotas.clone(); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { - Ok(mut bucket_limiter) => { - let current_limits = self.rate_limits.current_limits(); - #[allow(unused_variables)] - let was_rate_limited = bucket_limiter.enforce_limits( - current_limits, - metric_outcomes, - outcome_aggregator, - ); - - #[cfg(feature = "processing")] - if !was_rate_limited && self.config.processing_enabled() { - // If there were no cached rate limits active, let the processor check redis: - envelope_processor.send(RateLimitBuckets { bucket_limiter }); - return; - } - - bucket_limiter.into_buckets() - } - Err(buckets) => buckets, - }; - - if !buckets.is_empty() { - aggregator.send(MergeBuckets::new(self.project_key, buckets)); - }; - } - - /// Inserts given [buckets](Bucket) into the metrics aggregator. + /// Validates and inserts given [buckets](Bucket) into the metrics aggregator. /// /// The buckets will be keyed underneath this project key. pub fn merge_buckets( &mut self, aggregator: &Addr, - outcome_aggregator: &Addr, metric_outcomes: &MetricOutcomes, - envelope_processor: &Addr, + outcome_aggregator: &Addr, buckets: Vec, source: BucketSource, ) { - if !self.metrics_allowed() { - relay_log::debug!("dropping metric buckets, project disabled"); - return; - } + // Best effort check for rate limits and project state. Continue if there is no project state. + let buckets = match self.check_buckets(metric_outcomes, outcome_aggregator, buckets) { + CheckedBuckets::NoProject(buckets) => buckets, + CheckedBuckets::Checked { buckets, .. } => buckets, + CheckedBuckets::Dropped => return, + }; - let buckets = Buckets::new(buckets).filter_namespaces(source); - self.merge_buckets_into_aggregator( - aggregator, - envelope_processor, - outcome_aggregator, - metric_outcomes, - buckets, - ); + let buckets = filter_namespaces(buckets, source); + + aggregator.send(MergeBuckets::new( + self.project_key, + buckets.into_iter().collect(), + )); + } + + /// Returns a list of buckets back to the aggregator. + /// + /// This is used to return flushed buckets back to the aggregator if the project has not been + /// loaded at the time of flush. + /// + /// Buckets at this stage are expected to be validated already. + pub fn return_buckets(&self, aggregator: &Addr, buckets: Vec) { + aggregator.send(MergeBuckets::new( + self.project_key, + buckets.into_iter().collect(), + )); } pub fn add_metric_meta( @@ -877,32 +791,6 @@ impl Project { } } - fn set_state( - &mut self, - state: Arc, - aggregator: &Addr, - envelope_processor: &Addr, - outcome_aggregator: &Addr, - metric_outcomes: &MetricOutcomes, - ) { - let project_enabled = state.check_disabled(self.config.as_ref()).is_ok(); - let buckets = self.state.set_state(state.clone()); - - if let Some(buckets) = buckets { - if project_enabled && !buckets.is_empty() { - relay_log::debug!("sending metrics from metricsbuffer to aggregator"); - - self.merge_buckets_into_aggregator( - aggregator, - envelope_processor, - outcome_aggregator, - metric_outcomes, - buckets, - ); - } - } - } - /// Ensures the project state gets updated. /// /// This first checks if the state needs to be updated. This is the case if the project state @@ -930,11 +818,8 @@ impl Project { pub fn update_state( &mut self, project_cache: &Addr, - aggregator: &Addr, mut state: Arc, envelope_processor: &Addr, - outcome_aggregator: &Addr, - metric_outcomes: &MetricOutcomes, no_cache: bool, ) { // Initiate the backoff if the incoming state is invalid. Reset it otherwise. @@ -961,13 +846,7 @@ impl Project { // If the new state is invalid but the old one still usable, keep the old one. ExpiryState::Updated(old) | ExpiryState::Stale(old) if state.invalid() => state = old, // If the new state is valid or the old one is expired, always use the new one. - _ => self.set_state( - state.clone(), - aggregator, - envelope_processor, - outcome_aggregator, - metric_outcomes, - ), + _ => self.state = State::Cached(Arc::clone(&state)), } // If the state is still invalid, return back the taken channel and schedule state update. @@ -1084,24 +963,19 @@ impl Project { /// Drops metrics buckets if they are not allowed for this project. /// /// Reasons for dropping can be rate limits or a disabled project. - /// Returns `Some` if metrics are currently allowed. pub fn check_buckets( &mut self, metric_outcomes: &MetricOutcomes, - mut buckets: Vec, - ) -> Option<(Scoping, ProjectMetrics)> { + outcome_aggregator: &Addr, + buckets: Vec, + ) -> CheckedBuckets { let Some(project_state) = self.valid_state() else { - relay_log::error!( - tags.project_key = self.project_key.as_str(), - "there is no project state: dropping {} buckets", - buckets.len(), - ); - return None; + return CheckedBuckets::NoProject(buckets); }; if project_state.invalid() || project_state.disabled() { relay_log::debug!("dropping {} buckets for disabled project", buckets.len()); - return None; + return CheckedBuckets::Dropped; } let Some(scoping) = self.scoping() else { @@ -1110,9 +984,11 @@ impl Project { "there is no scoping: dropping {} buckets", buckets.len(), ); - return None; + return CheckedBuckets::Dropped; }; + let mut buckets = apply_project_state(buckets, metric_outcomes, &project_state, scoping); + let namespaces: BTreeSet = buckets .iter() .filter_map(|bucket| bucket.name.try_namespace()) @@ -1136,23 +1012,53 @@ impl Project { } } + let quotas = project_state.config.quotas.clone(); + let buckets = match MetricsLimiter::create(buckets, quotas, scoping) { + Ok(mut bucket_limiter) => { + bucket_limiter.enforce_limits(current_limits, metric_outcomes, outcome_aggregator); + bucket_limiter.into_buckets() + } + Err(buckets) => buckets, + }; + if buckets.is_empty() { - return None; + return CheckedBuckets::Dropped; } - let project_metrics = ProjectMetrics { - buckets, + CheckedBuckets::Checked { + scoping, project_state, - }; - - Some((scoping, project_metrics)) + buckets, + } } } +/// Return value of [`Project::check_buckets`]. +#[derive(Debug)] +pub enum CheckedBuckets { + /// There is no project state available for these metrics yet. + /// + /// The metrics should be returned to the aggregator until the project state becomes available. + NoProject(Vec), + /// The buckets have been validated and can be processed. + Checked { + /// Project scoping. + scoping: Scoping, + /// Project state. + project_state: Arc, + /// List of buckets. + buckets: Vec, + }, + /// All buckets have been dropped. + /// + /// Can happen for multiple reasons: + /// - The project is disabled or not valid. + /// - All metrics have been filtered. + Dropped, +} + #[cfg(test)] mod tests { - use std::sync::Mutex; - use crate::metrics::MetricStats; use relay_common::time::UnixTimestamp; use relay_metrics::BucketValue; @@ -1200,11 +1106,7 @@ mod tests { #[tokio::test] async fn test_stale_cache() { let (addr, _) = mock_service("project_cache", (), |&mut (), _| {}); - let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); - let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (envelope_processor, _) = mock_service("envelope_processor", (), |&mut (), _| {}); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); let config = Arc::new( Config::from_json_value(json!( @@ -1235,11 +1137,8 @@ mod tests { // Try to update project with errored project state. project.update_state( &addr, - &aggregator, Arc::new(ProjectState::err()), &envelope_processor, - &outcome_aggregator, - &metric_outcomes, false, ); // Since we got invalid project state we still keep the old one meaning there @@ -1259,11 +1158,8 @@ mod tests { project.state_channel = Some(channel); project.update_state( &addr, - &aggregator, Arc::new(ProjectState::err()), &envelope_processor, - &outcome_aggregator, - &metric_outcomes, false, ); project.fetch_state(addr, false); @@ -1281,134 +1177,70 @@ mod tests { project } - fn create_transaction_metric() -> Bucket { + fn create_metric(name: &str) -> Bucket { Bucket { - name: "d:transactions/foo".into(), + name: name.into(), width: 0, value: BucketValue::counter(1.into()), - timestamp: UnixTimestamp::now(), + timestamp: UnixTimestamp::from_secs(1000), tags: Default::default(), metadata: Default::default(), } } - /// Checks that the project doesn't send buckets to the aggregator from its metricsbuffer - /// if it haven't received a project state. - #[tokio::test] - async fn test_metrics_buffer_no_flush_without_state() { - // Project without project state. - let mut project = Project { - state: State::new(Config::default().permissive_aggregator_config()), - ..create_project(None) - }; - - let bucket_state = Arc::new(Mutex::new(false)); - let (aggregator, handle) = mock_service("aggregator", bucket_state.clone(), |state, _| { - *state.lock().unwrap() = true; - }); - - let buckets = vec![create_transaction_bucket()]; - let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); - let (envelope_processor, _) = mock_service("envelope_processor", (), |&mut (), _| {}); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); - - project.merge_buckets( - &aggregator, - &outcome_aggregator, - &metric_outcomes, - &envelope_processor, - buckets, - BucketSource::Internal, - ); - drop(aggregator); - handle.await.unwrap(); - - let buckets_received = *bucket_state.lock().unwrap(); - assert!(!buckets_received); - } - - /// Checks that the metrics-buffer flushes buckets to the aggregator when the project - /// receives a project state. - #[tokio::test] - async fn test_metrics_buffer_flush_with_state() { - // Project without project state. - let mut project = Project { - state: State::new(Config::default().permissive_aggregator_config()), - ..create_project(None) - }; - - let bucket_state = Arc::new(Mutex::new(false)); - let (aggregator, handle) = mock_service("aggregator", bucket_state.clone(), |state, _| { - *state.lock().unwrap() = true; - }); + #[test] + fn test_check_buckets_no_project() { + let (outcome_aggregator, _) = Addr::custom(); + let (metric_stats, mut metric_stats_rx) = MetricStats::test(); + let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); - let buckets = vec![create_transaction_bucket()]; - let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); - let (envelope_processor, _) = mock_service("envelope_processor", (), |&mut (), _| {}); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); + let mut project = create_project(None); + project.state = State::Pending; + let buckets = vec![create_metric("d:transactions/foo")]; + let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); - project.merge_buckets( - &aggregator, - &outcome_aggregator, - &metric_outcomes, - &envelope_processor, - buckets.clone(), - BucketSource::Internal, - ); - let mut project_state = ProjectState::allowed(); - project_state.project_id = Some(ProjectId::new(1)); - // set_state should trigger flushing from the metricsbuffer to aggregator. - project.set_state( - Arc::new(project_state), - &aggregator, - &envelope_processor, - &outcome_aggregator, - &metric_outcomes, - ); - drop(aggregator); - handle.await.unwrap(); // state isnt updated until we await. + match cb { + CheckedBuckets::NoProject(b) => { + assert_eq!(b, buckets) + } + cb => panic!("{cb:?}"), + } - let buckets_received = *bucket_state.lock().unwrap(); - assert!(buckets_received); + drop(metric_outcomes); + assert!(metric_stats_rx.blocking_recv().is_none()); } #[test] - fn test_rate_limit_incoming_metrics() { - let (aggregator, mut aggregator_rx) = Addr::custom(); - let (envelope_processor, _) = Addr::custom(); + fn test_check_buckets_rate_limit() { let (outcome_aggregator, _) = Addr::custom(); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); + let (metric_stats, mut metric_stats_rx) = MetricStats::test(); + let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); let mut project = create_project(None); - project.merge_buckets_into_aggregator( - &aggregator, - &envelope_processor, - &outcome_aggregator, - &metric_outcomes, - Buckets::test(vec![create_transaction_metric()]), - ); - drop(aggregator); - - let value = aggregator_rx.blocking_recv().unwrap(); - let Aggregator::MergeBuckets(merge_buckets) = value else { - panic!(); - }; - assert_eq!(merge_buckets.buckets().len(), 1); + let buckets = vec![create_metric("d:transactions/foo")]; + let cb = project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets.clone()); + + match cb { + CheckedBuckets::Checked { + scoping, + project_state: _, + buckets: b, + } => { + assert_eq!(scoping, project.scoping().unwrap()); + assert_eq!(b, buckets) + } + cb => panic!("{cb:?}"), + } - let value = aggregator_rx.blocking_recv(); - assert!(value.is_none()); + drop(metric_outcomes); + assert!(metric_stats_rx.blocking_recv().is_none()); } #[test] - fn test_rate_limit_incoming_metrics_no_quota() { - let (aggregator, mut aggregator_rx) = Addr::custom(); - let (envelope_processor, _) = Addr::custom(); + fn test_check_buckets_rate_limit_no_quota() { let (outcome_aggregator, _) = Addr::custom(); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); + let (metric_stats, mut metric_stats_rx) = MetricStats::test(); + let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); let mut project = create_project(Some(json!({ "quotas": [{ @@ -1419,66 +1251,23 @@ mod tests { "reasonCode": "foo", }] }))); - project.merge_buckets_into_aggregator( - &aggregator, - &envelope_processor, - &outcome_aggregator, + let cb = project.check_buckets( &metric_outcomes, - Buckets::test(vec![create_transaction_metric()]), - ); - drop(aggregator); - - let value = aggregator_rx.blocking_recv(); - assert!(value.is_none()); - } - - fn create_transaction_bucket() -> Bucket { - Bucket { - name: "d:transactions/foo".into(), - value: BucketValue::Counter(1.into()), - timestamp: UnixTimestamp::now(), - tags: Default::default(), - width: 10, - metadata: Default::default(), - } - } - - #[test] - fn test_rate_limit_incoming_buckets() { - let (aggregator, mut aggregator_rx) = Addr::custom(); - let (envelope_processor, _) = Addr::custom(); - let (outcome_aggregator, _) = Addr::custom(); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); - - let mut project = create_project(None); - project.merge_buckets_into_aggregator( - &aggregator, - &envelope_processor, &outcome_aggregator, - &metric_outcomes, - Buckets::test(vec![create_transaction_bucket()]), + vec![create_metric("d:transactions/foo")], ); - drop(aggregator); - drop(metric_outcomes); - let value = aggregator_rx.blocking_recv().unwrap(); - let Aggregator::MergeBuckets(merge_buckets) = value else { - panic!(); - }; - assert_eq!(merge_buckets.buckets().len(), 1); + assert!(matches!(cb, CheckedBuckets::Dropped)); - let value = aggregator_rx.blocking_recv(); - assert!(value.is_none()); + drop(metric_outcomes); + assert!(metric_stats_rx.blocking_recv().is_none()); } #[test] - fn test_rate_limit_incoming_buckets_no_quota() { - let (aggregator, mut aggregator_rx) = Addr::custom(); - let (envelope_processor, _) = Addr::custom(); + fn test_check_buckets_rate_limit_mixed_no_quota() { let (outcome_aggregator, _) = Addr::custom(); - let metric_outcomes = - MetricOutcomes::new(MetricStats::test().0, outcome_aggregator.clone()); + let (metric_stats, mut metric_stats_rx) = MetricStats::test(); + let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); let mut project = create_project(Some(json!({ "quotas": [{ @@ -1489,16 +1278,52 @@ mod tests { "reasonCode": "foo", }] }))); - project.merge_buckets_into_aggregator( - &aggregator, - &envelope_processor, + let cb = project.check_buckets( + &metric_outcomes, &outcome_aggregator, + vec![ + create_metric("d:transactions/foo"), + create_metric("d:profiles/foo"), + ], + ); + + match cb { + CheckedBuckets::Checked { + scoping, + project_state: _, + buckets, + } => { + assert_eq!(scoping, project.scoping().unwrap()); + assert_eq!(buckets, vec![create_metric("d:profiles/foo")]) + } + cb => panic!("{cb:?}"), + } + + drop(metric_outcomes); + assert!(metric_stats_rx.blocking_recv().is_none()); + } + + #[test] + fn test_check_buckets_project_state_filter() { + let (outcome_aggregator, _) = Addr::custom(); + let (metric_stats, mut metric_stats_rx) = MetricStats::test(); + let metric_outcomes = MetricOutcomes::new(metric_stats, outcome_aggregator.clone()); + + let mut project = create_project(None); + let cb = project.check_buckets( &metric_outcomes, - Buckets::test(vec![create_transaction_bucket()]), + &outcome_aggregator, + vec![create_metric("d:custom/foo")], ); - drop(aggregator); - let value = aggregator_rx.blocking_recv(); - assert!(value.is_none()); + assert!(matches!(cb, CheckedBuckets::Dropped)); + + drop(metric_outcomes); + let value = metric_stats_rx.blocking_recv().unwrap(); + let Aggregator::MergeBuckets(merge_buckets) = value else { + panic!(); + }; + assert_eq!(merge_buckets.buckets().len(), 1); + assert!(metric_stats_rx.blocking_recv().is_none()); } } diff --git a/relay-server/src/services/project/metrics.rs b/relay-server/src/services/project/metrics.rs index 8187b10564..cabbbc2478 100644 --- a/relay-server/src/services/project/metrics.rs +++ b/relay-server/src/services/project/metrics.rs @@ -1,6 +1,3 @@ -use std::marker::PhantomData; -use std::ops::Deref; - use relay_dynamic_config::{ErrorBoundary, Feature, Metrics}; use relay_filter::FilterStatKey; use relay_metrics::{Bucket, MetricNamespace}; @@ -11,114 +8,69 @@ use crate::services::outcome::Outcome; use crate::services::project::ProjectState; use crate::services::project_cache::BucketSource; -pub struct Filtered; -pub struct WithProjectState; - -/// Container for a vector of buckets. -#[derive(Debug)] -pub struct Buckets { - buckets: Vec, - state: PhantomData, -} - -impl Buckets { - /// Creates a list of buckets in their initial state. - pub fn new(buckets: Vec) -> Buckets { - Self { - buckets, - state: PhantomData, - } - } -} - -impl Deref for Buckets { - type Target = [Bucket]; - - fn deref(&self) -> &Self::Target { - &self.buckets - } -} - -impl IntoIterator for Buckets { - type Item = Bucket; - type IntoIter = std::vec::IntoIter; +pub fn filter_namespaces(mut buckets: Vec, source: BucketSource) -> Vec { + buckets.retain(|bucket| match bucket.name.namespace() { + MetricNamespace::Sessions => true, + MetricNamespace::Transactions => true, + MetricNamespace::Spans => true, + MetricNamespace::Profiles => true, + MetricNamespace::Custom => true, + MetricNamespace::Stats => source == BucketSource::Internal, + MetricNamespace::Unsupported => false, + }); - fn into_iter(self) -> Self::IntoIter { - self.buckets.into_iter() - } + buckets } -impl Buckets<()> { - pub fn filter_namespaces(mut self, source: BucketSource) -> Buckets { - self.buckets.retain(|bucket| match bucket.name.namespace() { - MetricNamespace::Sessions => true, - MetricNamespace::Transactions => true, - MetricNamespace::Spans => true, - MetricNamespace::Profiles => true, - MetricNamespace::Custom => true, - MetricNamespace::Stats => source == BucketSource::Internal, - MetricNamespace::Unsupported => false, - }); - - Buckets::new(self.buckets) - } -} +pub fn apply_project_state( + mut buckets: Vec, + metric_outcomes: &MetricOutcomes, + project_state: &ProjectState, + scoping: Scoping, +) -> Vec { + let mut denied_buckets = Vec::new(); + let mut disabled_namespace_buckets = Vec::new(); + + buckets = buckets + .into_iter() + .filter_map(|mut bucket| { + if !is_metric_namespace_valid(project_state, bucket.name.namespace()) { + relay_log::trace!(mri = &*bucket.name, "dropping metric in disabled namespace"); + disabled_namespace_buckets.push(bucket); + return None; + }; -impl Buckets { - pub fn apply_project_state( - mut self, - metric_outcomes: &MetricOutcomes, - project_state: &ProjectState, - scoping: Scoping, - ) -> Buckets { - let mut denied_buckets = Vec::new(); - let mut disabled_namespace_buckets = Vec::new(); - - self.buckets = self - .buckets - .into_iter() - .filter_map(|mut bucket| { - if !is_metric_namespace_valid(project_state, bucket.name.namespace()) { - relay_log::trace!(mri = &*bucket.name, "dropping metric in disabled namespace"); - disabled_namespace_buckets.push(bucket); + if let ErrorBoundary::Ok(ref metric_config) = project_state.config.metrics { + if metric_config.denied_names.is_match(&*bucket.name) { + relay_log::trace!(mri = &*bucket.name, "dropping metrics due to block list"); + denied_buckets.push(bucket); return None; - }; - - if let ErrorBoundary::Ok(ref metric_config) = project_state.config.metrics { - if metric_config.denied_names.is_match(&*bucket.name) { - relay_log::trace!( - mri = &*bucket.name, - "dropping metrics due to block list" - ); - denied_buckets.push(bucket); - return None; - } - - remove_matching_bucket_tags(metric_config, &mut bucket); } - Some(bucket) - }) - .collect(); + remove_matching_bucket_tags(metric_config, &mut bucket); + } - if !disabled_namespace_buckets.is_empty() { - metric_outcomes.track( - scoping, - &disabled_namespace_buckets, - Outcome::Filtered(FilterStatKey::DisabledNamespace), - ); - } + Some(bucket) + }) + .collect(); - if !denied_buckets.is_empty() { - metric_outcomes.track( - scoping, - &denied_buckets, - Outcome::Filtered(FilterStatKey::DeniedName), - ); - } + if !disabled_namespace_buckets.is_empty() { + metric_outcomes.track( + scoping, + &disabled_namespace_buckets, + Outcome::Filtered(FilterStatKey::DisabledNamespace), + ); + } - Buckets::new(self.buckets) + if !denied_buckets.is_empty() { + metric_outcomes.track( + scoping, + &denied_buckets, + Outcome::Filtered(FilterStatKey::DeniedName), + ); } + + buckets } fn is_metric_namespace_valid(state: &ProjectState, namespace: MetricNamespace) -> bool { @@ -158,16 +110,6 @@ mod tests { use super::*; - impl Buckets { - /// Constructor for tests which bypasses the state requirements. - pub fn test(buckets: Vec) -> Self { - Self { - buckets, - state: PhantomData, - } - } - } - fn get_test_bucket(name: &str, tags: BTreeMap) -> Bucket { let json = serde_json::json!({ "timestamp": 1615889440, @@ -235,9 +177,10 @@ mod tests { let b1 = create_custom_bucket_with_name("cpu_time".into()); let b2 = create_custom_bucket_with_name("memory_usage".into()); - let buckets = Buckets::test(vec![b1.clone(), b2.clone()]); + let buckets = vec![b1.clone(), b2.clone()]; - let buckets = buckets.apply_project_state( + let buckets = apply_project_state( + buckets, &metric_outcomes, &project_state, Scoping { @@ -272,9 +215,10 @@ mod tests { let b1 = create_custom_bucket_with_name("cpu_time".into()); let b2 = create_custom_bucket_with_name("memory_usage".into()); - let buckets = Buckets::test(vec![b1.clone(), b2.clone()]); + let buckets = vec![b1.clone(), b2.clone()]; - let buckets = buckets.apply_project_state( + let buckets = apply_project_state( + buckets, &metric_outcomes, &ProjectState::allowed(), Scoping { diff --git a/relay-server/src/services/project_cache.rs b/relay-server/src/services/project_cache.rs index 746a649a8a..c90b098e13 100644 --- a/relay-server/src/services/project_cache.rs +++ b/relay-server/src/services/project_cache.rs @@ -18,8 +18,10 @@ use tokio::time::Instant; use crate::services::global_config::{self, GlobalConfigManager, Subscribe}; use crate::services::outcome::{DiscardReason, TrackOutcome}; -use crate::services::processor::{EncodeMetrics, EnvelopeProcessor, ProcessEnvelope}; -use crate::services::project::{Project, ProjectSender, ProjectState}; +use crate::services::processor::{ + EncodeMetrics, EnvelopeProcessor, ProcessEnvelope, ProjectMetrics, +}; +use crate::services::project::{CheckedBuckets, Project, ProjectSender, ProjectState}; use crate::services::project_local::{LocalProjectSource, LocalProjectSourceService}; #[cfg(feature = "processing")] use crate::services::project_redis::RedisProjectSource; @@ -683,18 +685,12 @@ impl ProjectCacheBroker { } = message; let project_cache = self.services.project_cache.clone(); - let aggregator = self.services.aggregator.clone(); let envelope_processor = self.services.envelope_processor.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); - let metric_outcomes = self.metric_outcomes.clone(); self.get_or_create_project(project_key).update_state( &project_cache, - &aggregator, state, &envelope_processor, - &outcome_aggregator, - &metric_outcomes, no_cache, ); @@ -873,17 +869,15 @@ impl ProjectCacheBroker { fn handle_add_metric_buckets(&mut self, message: AddMetricBuckets) { let project_cache = self.services.project_cache.clone(); let aggregator = self.services.aggregator.clone(); - let outcome_aggregator = self.services.outcome_aggregator.clone(); - let envelope_processor = self.services.envelope_processor.clone(); let metric_outcomes = self.metric_outcomes.clone(); + let outcome_aggregator = self.services.outcome_aggregator.clone(); let project = self.get_or_create_project(message.project_key); project.prefetch(project_cache, false); project.merge_buckets( &aggregator, - &outcome_aggregator, &metric_outcomes, - &envelope_processor, + &outcome_aggregator, message.buckets, message.source, ); @@ -898,19 +892,45 @@ impl ProjectCacheBroker { fn handle_flush_buckets(&mut self, message: FlushBuckets) { let metric_outcomes = self.metric_outcomes.clone(); + let outcome_aggregator = self.services.outcome_aggregator.clone(); + let aggregator = self.services.aggregator.clone(); + let project_cache = self.services.project_cache.clone(); + let mut no_project = 0; let mut scoped_buckets = BTreeMap::new(); for (project_key, buckets) in message.buckets { let project = self.get_or_create_project(project_key); - if let Some((scoping, b)) = project.check_buckets(&metric_outcomes, buckets) { - scoped_buckets.insert(scoping, b); + match project.check_buckets(&metric_outcomes, &outcome_aggregator, buckets) { + CheckedBuckets::NoProject(buckets) => { + no_project += 1; + // Schedule an update for the project just in case. + project.prefetch(project_cache.clone(), false); + project.return_buckets(&aggregator, buckets); + } + CheckedBuckets::Checked { + scoping, + project_state, + buckets, + } => scoped_buckets + .entry(scoping) + .or_insert(ProjectMetrics { + project_state, + buckets: Vec::new(), + }) + .buckets + .extend(buckets), + CheckedBuckets::Dropped => {} } } self.services.envelope_processor.send(EncodeMetrics { partition_key: message.partition_key, scopes: scoped_buckets, - }) + }); + + relay_statsd::metric!( + counter(RelayCounters::ProjectStateFlushMetricsNoProject) += no_project + ); } fn handle_buffer_index(&mut self, message: UpdateSpoolIndex) { diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index fa89d3e647..945787a835 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -678,6 +678,9 @@ pub enum RelayCounters { /// - `resource_id`: The COGS resource id. /// - `app_feature`: The COGS app feature. CogsUsage, + /// The amount of times metrics of a project have been flushed without the project being + /// fetched/available. + ProjectStateFlushMetricsNoProject, /// The decision on whether normalization should run for an event. /// /// This metric is tagged with: @@ -727,6 +730,7 @@ impl CounterMetric for RelayCounters { RelayCounters::MissingDynamicSamplingContext => "missing_dynamic_sampling_context", RelayCounters::FeedbackAttachments => "processing.feedback_attachments", RelayCounters::CogsUsage => "cogs.usage", + RelayCounters::ProjectStateFlushMetricsNoProject => "project_state.metrics.no_project", RelayCounters::NormalizationDecision => "normalization.decision", } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index a40ec2a34b..76f62abbfa 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -119,8 +119,6 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); - #[cfg(feature = "processing")] - let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); #[cfg(feature = "processing")] let redis = config @@ -145,8 +143,6 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { upstream_relay, test_store, #[cfg(feature = "processing")] - aggregator: aggregator.clone(), - #[cfg(feature = "processing")] store_forwarder: None, }, metric_outcomes,