Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Aggregate metrics before rate limiting #3746

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

**Internal**:

- Aggregate metrics before rate limiting. ([#3746](https://github.com/getsentry/relay/pull/3746))
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved

## 24.6.0

**Bug fixes**:
Expand Down
6 changes: 0 additions & 6 deletions relay-server/src/metrics/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,6 @@ impl<Q: AsRef<Vec<Quota>>> MetricsLimiter<Q> {
false
}

/// Returns a reference to the contained metrics.
#[cfg(feature = "processing")]
pub fn buckets(&self) -> impl Iterator<Item = &Bucket> {
self.buckets.iter().map(|s| &s.bucket)
}

/// Consume this struct and return the contained metrics.
pub fn into_buckets(self) -> Vec<Bucket> {
self.buckets.into_iter().map(|s| s.bucket).collect()
Expand Down
2 changes: 0 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ impl ServiceState {
upstream_relay: upstream_relay.clone(),
test_store: test_store.clone(),
#[cfg(feature = "processing")]
aggregator: aggregator.clone(),
#[cfg(feature = "processing")]
store_forwarder: store.clone(),
},
metric_outcomes.clone(),
Expand Down
140 changes: 37 additions & 103 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use {
RedisSetLimiterOptions,
},
relay_dynamic_config::{CardinalityLimiterMode, GlobalConfig, MetricExtractionGroups},
relay_metrics::{Aggregator, MergeBuckets, RedisMetricMetaStore},
relay_metrics::RedisMetricMetaStore,
relay_quotas::{Quota, RateLimitingError, RateLimits, RedisRateLimiter},
relay_redis::RedisPool,
std::iter::Chain,
Expand Down Expand Up @@ -914,13 +914,6 @@ pub struct SubmitClientReports {
pub scoping: Scoping,
}

/// Applies rate limits to metrics buckets and forwards them to the [`Aggregator`].
#[cfg(feature = "processing")]
#[derive(Debug)]
pub struct RateLimitBuckets {
pub bucket_limiter: MetricsLimiter,
}

/// CPU-intensive processing tasks for envelopes.
#[derive(Debug)]
pub enum EnvelopeProcessor {
Expand All @@ -932,8 +925,6 @@ pub enum EnvelopeProcessor {
EncodeMetricMeta(Box<EncodeMetricMeta>),
SubmitEnvelope(Box<SubmitEnvelope>),
SubmitClientReports(Box<SubmitClientReports>),
#[cfg(feature = "processing")]
RateLimitBuckets(RateLimitBuckets),
}

impl EnvelopeProcessor {
Expand All @@ -948,8 +939,6 @@ impl EnvelopeProcessor {
EnvelopeProcessor::EncodeMetricMeta(_) => "EncodeMetricMeta",
EnvelopeProcessor::SubmitEnvelope(_) => "SubmitEnvelope",
EnvelopeProcessor::SubmitClientReports(_) => "SubmitClientReports",
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(_) => "RateLimitBuckets",
}
}
}
Expand Down Expand Up @@ -1020,15 +1009,6 @@ impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
}
}

#[cfg(feature = "processing")]
impl FromMessage<RateLimitBuckets> for EnvelopeProcessor {
type Response = NoResponse;

fn from_message(message: RateLimitBuckets, _: ()) -> Self {
Self::RateLimitBuckets(message)
}
}

/// Service implementing the [`EnvelopeProcessor`] interface.
///
/// This service handles messages in a worker pool with configurable concurrency.
Expand All @@ -1043,8 +1023,6 @@ pub struct Addrs {
pub envelope_processor: Addr<EnvelopeProcessor>,
pub project_cache: Addr<ProjectCache>,
pub outcome_aggregator: Addr<TrackOutcome>,
#[cfg(feature = "processing")]
pub aggregator: Addr<Aggregator>,
pub upstream_relay: Addr<UpstreamRelay>,
pub test_store: Addr<TestStore>,
#[cfg(feature = "processing")]
Expand All @@ -1058,8 +1036,6 @@ impl Default for Addrs {
envelope_processor: Addr::dummy(),
project_cache: Addr::dummy(),
outcome_aggregator: Addr::dummy(),
#[cfg(feature = "processing")]
aggregator: Addr::dummy(),
upstream_relay: Addr::dummy(),
test_store: Addr::dummy(),
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -2304,11 +2280,8 @@ impl EnvelopeProcessorService {

/// Check and apply rate limits to metrics buckets.
#[cfg(feature = "processing")]
fn handle_rate_limit_buckets(&self, message: RateLimitBuckets) {
use relay_quotas::RateLimits;

fn handle_rate_limit_buckets(&self, mut bucket_limiter: MetricsLimiter) -> Vec<Bucket> {
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
relay_log::trace!("handle_rate_limit_buckets");
let RateLimitBuckets { mut bucket_limiter } = message;

let scoping = *bucket_limiter.scoping();

Expand Down Expand Up @@ -2377,87 +2350,65 @@ impl EnvelopeProcessorService {
}
}

let project_key = bucket_limiter.scoping().project_key;
let buckets = bucket_limiter.into_buckets();

if !buckets.is_empty() {
self.inner
.addrs
.aggregator
.send(MergeBuckets::new(project_key, buckets));
}
bucket_limiter.into_buckets()
}

#[cfg(feature = "processing")]
fn rate_limit_buckets_by_namespace(
fn rate_limit_buckets(
&self,
scoping: Scoping,
buckets: Vec<Bucket>,
quotas: CombinedQuotas<'_>,
project_state: &ProjectState,
mut buckets: Vec<Bucket>,
) -> Vec<Bucket> {
let Some(rate_limiter) = self.inner.rate_limiter.as_ref() else {
return buckets;
};

let buckets_by_ns: HashMap<MetricNamespace, Vec<Bucket>> = buckets
.into_iter()
.filter_map(|bucket| Some((bucket.name.try_namespace()?, bucket)))
.into_group_map();
let global_config = self.inner.global_config.current();
let namespaces = buckets
.iter()
.filter_map(|bucket| bucket.name.try_namespace())
.counts();

buckets_by_ns
.into_iter()
.flat_map(|(namespace, buckets)| {
let item_scoping = scoping.metric_bucket(namespace);
self.rate_limit_buckets(item_scoping, buckets, quotas, rate_limiter)
})
.collect()
}
let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas());

/// Returns `true` if the batches should be rate limited.
#[cfg(feature = "processing")]
fn rate_limit_buckets(
&self,
item_scoping: relay_quotas::ItemScoping,
buckets: Vec<Bucket>,
quotas: CombinedQuotas<'_>,
rate_limiter: &RedisRateLimiter,
) -> Vec<Bucket> {
let batch_size = self.inner.config.metrics_max_batch_size_bytes();
let quantity = BucketsView::from(&buckets)
.by_size(batch_size)
.flatten()
.count();
for (namespace, quantity) in namespaces {
let item_scoping = scoping.metric_bucket(namespace);

let limits = match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) {
Ok(limits) => limits,
Err(err) => {
relay_log::error!(
error = &err as &dyn std::error::Error,
"failed to check redis rate limits"
);
break;
}
};

// Check with redis if the throughput limit has been exceeded, while also updating
// the count so that other relays will be updated too.
match rate_limiter.is_rate_limited(quotas, item_scoping, quantity, false) {
Ok(limits) if limits.is_limited() => {
relay_log::debug!("dropping {quantity} buckets due to throughput rate limit");
if limits.is_limited() {
let rejected;
(buckets, rejected) = utils::split_off(buckets, |bucket| {
bucket.name.try_namespace() == Some(namespace)
});

let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone());

self.inner.metric_outcomes.track(
*item_scoping.scoping,
&buckets,
scoping,
&rejected,
Outcome::RateLimited(reason_code),
);

self.inner.addrs.project_cache.send(UpdateRateLimits::new(
item_scoping.scoping.project_key,
limits,
));

Vec::new()
}
Ok(_) => buckets,
Err(e) => {
relay_log::error!(
error = &e as &dyn std::error::Error,
"failed to check redis rate limits"
);
}

buckets
}
match MetricsLimiter::create(buckets, project_state.config.quotas.clone(), scoping) {
Err(buckets) => buckets,
Ok(bucket_limiter) => self.handle_rate_limit_buckets(bucket_limiter),
}
}

Expand Down Expand Up @@ -2542,16 +2493,13 @@ impl EnvelopeProcessorService {
use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::services::store::StoreMetrics;

let global_config = self.inner.global_config.current();

for (scoping, message) in message.scopes {
let ProjectMetrics {
buckets,
project_state,
} = message;

let quotas = CombinedQuotas::new(&global_config, project_state.get_quotas());
let buckets = self.rate_limit_buckets_by_namespace(scoping, buckets, quotas);
let buckets = self.rate_limit_buckets(scoping, &project_state, buckets);

let limits = project_state.get_cardinality_limits();
let buckets = self.cardinality_limit_buckets(scoping, limits, buckets);
Expand Down Expand Up @@ -2800,8 +2748,6 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::EncodeMetricMeta(m) => self.handle_encode_metric_meta(*m),
EnvelopeProcessor::SubmitEnvelope(m) => self.handle_submit_envelope(*m),
EnvelopeProcessor::SubmitClientReports(m) => self.handle_submit_client_reports(*m),
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(m) => self.handle_rate_limit_buckets(m),
}
});
}
Expand All @@ -2828,18 +2774,6 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::EncodeMetricMeta(_) => AppFeature::MetricMeta.into(),
EnvelopeProcessor::SubmitEnvelope(v) => AppFeature::from(v.envelope.group()).into(),
EnvelopeProcessor::SubmitClientReports(_) => AppFeature::ClientReports.into(),
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitBuckets(v) => {
relay_metrics::cogs::ByCount(v.bucket_limiter.buckets().filter(|b| {
// Only spans and transactions are actually rate limited at this point.
// Other metrics do not cause costs.
matches!(
b.name.try_namespace(),
Some(MetricNamespace::Spans | MetricNamespace::Transactions)
)
}))
.into()
}
}
}
}
Expand Down
Loading
Loading