Skip to content

Commit

Permalink
ref(metrics): Aggregate metrics before rate limiting (#3746)
Browse files Browse the repository at this point in the history
Implements the rest of #3662.
Fixes #3553.

Removes all pre-aggregation and rate limit steps before we receive a
project state, all rate limiting is done after aggregation.
  • Loading branch information
Dav1dde authored Jun 24, 2024
1 parent 4b6dcb5 commit 4ff3d83
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 620 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## Unreleased

**Bug Fixes**:

- Fixes metrics dropped due to missing project state. ([#3553](https://github.com/getsentry/relay/issues/3553))


**Internal**:

- Aggregate metrics before rate limiting. ([#3746](https://github.com/getsentry/relay/pull/3746))

## 24.6.0

**Bug fixes**:
Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/metrics/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,6 @@ impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
false
}

/// Returns a reference to the contained metrics.
#[cfg(feature = "processing")]
pub fn buckets(&self) -> impl Iterator<Item = &Bucket> {
self.buckets.iter().map(|s| &s.bucket)
}

/// Consume this struct and return the contained metrics.
pub fn into_buckets(self) -> Vec<Bucket> {
self.buckets.into_iter().map(|s| s.bucket).collect()
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ impl ServiceState {
upstream_relay: upstream_relay.clone(),
test_store: test_store.clone(),
#[cfg(feature = "processing")]
aggregator: aggregator.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
},
metric_outcomes.clone(),
Expand Down
192 changes: 63 additions & 129 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use {
RedisSetLimiterOptions,
},
relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups},
relay_metrics::{Aggregator, MergeBuckets, RedisMetricMetaStore},
relay_metrics::RedisMetricMetaStore,
relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter},
relay_redis::RedisPool,
std::iter::Chain,
Expand Down Expand Up @@ -914,13 +914,6 @@ pub struct SubmitClientReports {
pub scoping: Scoping,
}

/// Applies rate limits to metrics buckets and forwards them to the [`Aggregator`].
#[cfg(feature = "processing")]
#[derive(Debug)]
pub struct RateLimitBuckets {
pub bucket_limiter: MetricsLimiter,
}

/// CPU-intensive processing tasks for envelopes.
#[derive(Debug)]
pub enum EnvelopeProcessor {
Expand All @@ -932,8 +925,6 @@ pub enum EnvelopeProcessor {
EncodeMetricMeta(Box<EncodeMetricMeta>),
SubmitEnvelope(Box<SubmitEnvelope>),
SubmitClientReports(Box<SubmitClientReports>),
#[cfg(feature = "processing")]
RateLimitBuckets(RateLimitBuckets),
}

impl EnvelopeProcessor {
Expand All @@ -948,8 +939,6 @@ impl EnvelopeProcessor {
EnvelopeProcessor::EncodeMetricMeta(_) => "EncodeMetricMeta",
EnvelopeProcessor::SubmitEnvelope(_) => "SubmitEnvelope",
EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(_) => "RateLimitBuckets",
}
}
}
Expand Down Expand Up @@ -1020,15 +1009,6 @@ impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
}
}

#[cfg(feature = "processing")]
impl FromMessage<RateLimitBuckets> for EnvelopeProcessor {
type Response = NoResponse;

fn from_message(message: RateLimitBuckets, _: ()) -> Self {
Self::RateLimitBuckets(message)
}
}

/// Service implementing the [`EnvelopeProcessor`] interface.
///
/// This service handles messages in a worker pool with configurable concurrency.
Expand All @@ -1043,8 +1023,6 @@ pub struct Addrs {
pub envelope_processor: Addr<EnvelopeProcessor>,
pub project_cache: Addr<ProjectCache>,
pub outcome_aggregator: Addr<TrackOutcome>,
#[cfg(feature = "processing")]
pub aggregator: Addr<Aggregator>,
pub upstream_relay: Addr<UpstreamRelay>,
pub test_store: Addr<TestStore>,
#[cfg(feature = "processing")]
Expand All @@ -1058,8 +1036,6 @@ impl Default for Addrs {
envelope_processor: Addr::dummy(),
project_cache: Addr::dummy(),
outcome_aggregator: Addr::dummy(),
#[cfg(feature = "processing")]
aggregator: Addr::dummy(),
upstream_relay: Addr::dummy(),
test_store: Addr::dummy(),
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -2302,13 +2278,69 @@ impl EnvelopeProcessorService {
});
}

/// Check and apply rate limits to metrics buckets.
#[cfg(feature = "processing")]
fn handle_rate_limit_buckets(&self, message: RateLimitBuckets) {
use relay_quotas::RateLimits;
fn rate_limit_buckets(
&self,
scoping: Scoping,
project_state: &ProjectState,
mut buckets: Vec<Bucket>,
) -> Vec<Bucket> {
let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else {
return buckets;
};

let global_config = self.inner.global_config.current();
let namespaces = buckets
.iter()
.filter_map(|bucket| bucket.name.try_namespace())
.counts();

let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas());

for (namespace, quantity) in namespaces {
let item_scoping = scoping.metric_bucket(namespace);

let limits = match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) {
Ok(limits) => limits,
Err(err) => {
relay_log::error!(
error = &err as &dyn std::error::Error,
"failed to check redis rate limits"
);
break;
}
};

if limits.is_limited() {
let rejected;
(buckets, rejected) = utils::split_off(buckets, |bucket| {
bucket.name.try_namespace() == Some(namespace)
});

let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());
self.inner.metric_outcomes.track(
scoping,
&rejected,
Outcome::RateLimited(reason_code),
);

self.inner.addrs.project_cache.send(UpdateRateLimits::new(
item_scoping.scoping.project_key,
limits,
));
}
}

match MetricsLimiter::create(buckets, project_state.config.quotas.clone(), scoping) {
Err(buckets) => buckets,
Ok(bucket_limiter) => self.apply_other_rate_limits(bucket_limiter),
}
}

/// Check and apply rate limits to metrics buckets for transactions and spans.
#[cfg(feature = "processing")]
fn apply_other_rate_limits(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
relay_log::trace!("handle_rate_limit_buckets");
let RateLimitBuckets { mut bucket_limiter } = message;

let scoping = *bucket_limiter.scoping();

Expand Down Expand Up @@ -2377,88 +2409,7 @@ impl EnvelopeProcessorService {
}
}

let project_key = bucket_limiter.scoping().project_key;
let buckets = bucket_limiter.into_buckets();

if !buckets.is_empty() {
self.inner
.addrs
.aggregator
.send(MergeBuckets::new(project_key, buckets));
}
}

#[cfg(feature = "processing")]
fn rate_limit_buckets_by_namespace(
&self,
scoping: Scoping,
buckets: Vec<Bucket>,
quotas: CombinedQuotas<'_>,
) -> Vec<Bucket> {
let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else {
return buckets;
};

let buckets_by_ns: HashMap<MetricNamespace, Vec<Bucket>> = buckets
.into_iter()
.filter_map(|bucket| Some((bucket.name.try_namespace()?, bucket)))
.into_group_map();

buckets_by_ns
.into_iter()
.flat_map(|(namespace, buckets)| {
let item_scoping = scoping.metric_bucket(namespace);
self.rate_limit_buckets(item_scoping, buckets, quotas, rate_limiter)
})
.collect()
}

/// Returns `true` if the batches should be rate limited.
#[cfg(feature = "processing")]
fn rate_limit_buckets(
&self,
item_scoping: relay_quotas::ItemScoping,
buckets: Vec<Bucket>,
quotas: CombinedQuotas<'_>,
rate_limiter: &RedisRateLimiter,
) -> Vec<Bucket> {
let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let quantity = BucketsView::from(&buckets)
.by_size(batch_size)
.flatten()
.count();

// Check with redis if the throughput limit has been exceeded, while also updating
// the count so that other relays will be updated too.
match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) {
Ok(limits) if limits.is_limited() => {
relay_log::debug!("dropping {quantity} buckets due to throughput rate limit");

let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());

self.inner.metric_outcomes.track(
*item_scoping.scoping,
&buckets,
Outcome::RateLimited(reason_code),
);

self.inner.addrs.project_cache.send(UpdateRateLimits::new(
item_scoping.scoping.project_key,
limits,
));

Vec::new()
}
Ok(_) => buckets,
Err(e) => {
relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to check redis rate limits"
);

buckets
}
}
bucket_limiter.into_buckets()
}

/// Cardinality limits the passed buckets and returns a filtered vector of only accepted buckets.
Expand Down Expand Up @@ -2542,16 +2493,13 @@ impl EnvelopeProcessorService {
use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::services::store::StoreMetrics;

let global_config = self.inner.global_config.current();

for (scoping, message) in message.scopes {
let ProjectMetrics {
buckets,
project_state,
} = message;

let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas());
let buckets = self.rate_limit_buckets_by_namespace(scoping, buckets, quotas);
let buckets = self.rate_limit_buckets(scoping, &project_state, buckets);

let limits = project_state.get_cardinality_limits();
let buckets = self.cardinality_limit_buckets(scoping, limits, buckets);
Expand Down Expand Up @@ -2800,8 +2748,6 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::EncodeMetricMeta(m) => self.handle_encode_metric_meta(*m),
EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m),
EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(m) => self.handle_rate_limit_buckets(m),
}
});
}
Expand All @@ -2828,18 +2774,6 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::EncodeMetricMeta(_) => AppFeature::MetricMeta.into(),
EnvelopeProcessor::SubmitEnvelope(v) => AppFeature::from(v.envelope.group()).into(),
EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(v) => {
relay_metrics::cogs::ByCount(v.bucket_limiter.buckets().filter(|b| {
// Only spans and transactions are actually rate limited at this point.
// Other metrics do not cause costs.
matches!(
b.name.try_namespace(),
Some(MetricNamespace::Spans | MetricNamespace::Transactions)
)
}))
.into()
}
}
}
}
Expand Down
Loading

0 comments on commit 4ff3d83

Please sign in to comment.