Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quotas): Enforce rate limits on metrics buckets [INGEST-1654] #1515

Merged
merged 40 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c7d3f5c
ref: Pass metrics buckets through processor
jjbayer Oct 5, 2022
90fb99e
wip
jjbayer Oct 5, 2022
e1a0b22
test: Basic integration test
jjbayer Oct 6, 2022
b6070aa
test: integration test still fails
jjbayer Oct 6, 2022
b7bd8e0
Merge branch 'master' into feat/metrics-enforce-quotas
jan-auer Oct 7, 2022
99ecb74
fix: Import error on default features
jan-auer Oct 10, 2022
71c20a3
test: Fix integration test to match rate limiter behavior
jjbayer Oct 10, 2022
86ff9f6
doc: Add TODOs
jjbayer Oct 10, 2022
10d9dcf
test: Parametrize test to cover exact exhaustion vs. exceeding exhaus…
jjbayer Oct 10, 2022
ca12d23
ref: Call rate limiter only once for a batch of buckets
jjbayer Oct 10, 2022
699d0cb
feat: Emit outcome for rate limited metrics
jjbayer Oct 10, 2022
3f5bbb0
test: Check outcomes were produced
jjbayer Oct 10, 2022
040c012
Merge remote-tracking branch 'origin/master' into feat/metrics-enforc…
jjbayer Oct 11, 2022
3c43037
doc: changelog
jjbayer Oct 11, 2022
6f3f889
ref: clippy
jjbayer Oct 11, 2022
b9bcd98
wip: Move logic from processor to project_cache
jjbayer Oct 11, 2022
308a066
wip
jjbayer Oct 11, 2022
afc84a4
fix: Get async fn working
jjbayer Oct 11, 2022
af5b358
fix: Restore sending of buckets
jjbayer Oct 11, 2022
c48a8c8
Merge remote-tracking branch 'origin/master' into feat/metrics-enforc…
jjbayer Oct 11, 2022
4061ad5
ref: function name, docs, respect category arg
jjbayer Oct 11, 2022
e8fa434
ref: Replace async block by plain function call
jjbayer Oct 12, 2022
9302fac
ref: Track internal outcomes for errors
jjbayer Oct 12, 2022
e66b46d
fix: Actually check rate limits
jjbayer Oct 12, 2022
3639335
fix: Don't loop over rate limits
jjbayer Oct 12, 2022
650f0f4
ref: Add helper to RateLimits
jjbayer Oct 12, 2022
1cabbb1
ref: Update some comments
jjbayer Oct 12, 2022
7494af0
fix: lint
jjbayer Oct 12, 2022
00331f3
fix: Drop buckets on internal error
jjbayer Oct 12, 2022
75c4925
Merge remote-tracking branch 'origin/master' into feat/metrics-enforc…
jjbayer Oct 13, 2022
157ce04
fix: Call rate limiter with over_accept_once
jjbayer Oct 13, 2022
e784b34
ref: Move rate limiting to utils
jjbayer Oct 13, 2022
3cf8499
wip
jjbayer Oct 14, 2022
de54f9d
fix: Send to processor
jjbayer Oct 14, 2022
f1bca59
ref: clippy
jjbayer Oct 14, 2022
b3da4c0
Merge remote-tracking branch 'origin/master' into feat/metrics-enforc…
jjbayer Oct 14, 2022
d97d127
ref: self-review
jjbayer Oct 14, 2022
8d5f406
test: Try to unflake
jjbayer Oct 14, 2022
aa93591
test: unflake, attempt #2
jjbayer Oct 14, 2022
1da8498
Merge remote-tracking branch 'origin/master' into feat/metrics-enforc…
jjbayer Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

- Limit the number of custom measurements per event. ([#1483](https://github.com/getsentry/relay/pull/1483)))
- Add INP web vital as a measurement. ([#1487](https://github.com/getsentry/relay/pull/1487))
- Enforce rate limits on metrics buckets using the transactions_processed quota. ([#1515](https://github.com/getsentry/relay/pull/1515))
- PII scrubbing now treats any key containing `token` as a password. ([#1527](https://github.com/getsentry/relay/pull/1527))

** Bug Fixes**:
**Bug Fixes**:

- Make sure that non-processing Relays drop all invalid transactions. ([#1513](https://github.com/getsentry/relay/pull/1513))

Expand Down
74 changes: 73 additions & 1 deletion relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ use crate::utils::{

#[cfg(feature = "processing")]
use {
crate::actors::envelopes::SendMetrics,
crate::actors::project_cache::UpdateRateLimits,
crate::service::ServerErrorKind,
crate::utils::EnvelopeLimiter,
crate::utils::{BucketLimiter, EnvelopeLimiter},
failure::ResultExt,
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_quotas::ItemScoping,
relay_quotas::{RateLimitingError, RedisRateLimiter},
symbolic_unreal::{Unreal4Error, Unreal4ErrorKind},
};
Expand Down Expand Up @@ -516,12 +518,22 @@ impl EncodeEnvelope {
}
}

/// Applies rate limits to metrics buckets and forwards them to the envelope manager.
#[cfg(feature = "processing")]
#[derive(Debug)]
pub struct RateLimitFlushBuckets {
pub bucket_limiter: BucketLimiter,
pub partition_key: Option<u64>,
}

/// CPU-intensive processing tasks for envelopes.
#[derive(Debug)]
pub enum EnvelopeProcessor {
ProcessEnvelope(Box<ProcessEnvelope>),
ProcessMetrics(Box<ProcessMetrics>),
EncodeEnvelope(Box<EncodeEnvelope>),
#[cfg(feature = "processing")]
RateLimitFlushBuckets(RateLimitFlushBuckets),
}

impl EnvelopeProcessor {
Expand Down Expand Up @@ -556,6 +568,15 @@ impl FromMessage<EncodeEnvelope> for EnvelopeProcessor {
}
}

#[cfg(feature = "processing")]
impl FromMessage<RateLimitFlushBuckets> for EnvelopeProcessor {
type Response = NoResponse;

fn from_message(message: RateLimitFlushBuckets, _: ()) -> Self {
Self::RateLimitFlushBuckets(message)
}
}

/// Service implementing the [`EnvelopeProcessor`] interface.
///
/// This service handles messages in a worker pool with configurable concurrency.
Expand Down Expand Up @@ -2164,6 +2185,53 @@ impl EnvelopeProcessorService {
}
}

/// Check and apply rate limits to metrics buckets.
#[cfg(feature = "processing")]
fn handle_rate_limit_flush_buckets(&self, message: RateLimitFlushBuckets) {
let RateLimitFlushBuckets {
mut bucket_limiter,
partition_key,
} = message;

let scoping = *bucket_limiter.scoping();

if let Some(rate_limiter) = self.rate_limiter.as_ref() {
let item_scoping = ItemScoping {
category: DataCategory::TransactionProcessed,
scoping: &scoping,
};
// We set over_accept_once such that the limit is actually reached, which allows subsequent
// calls with quantity=0 to be rate limited.
let over_accept_once = true;
let rate_limits = rate_limiter.is_rate_limited(
bucket_limiter.quotas(),
item_scoping,
bucket_limiter.transaction_count(),
over_accept_once,
);

let was_enforced = bucket_limiter.enforce_limits(rate_limits.as_ref().map_err(|_| ()));

if was_enforced {
if let Ok(limits) = rate_limits {
// Update the rate limits in the project cache.
ProjectCache::from_registry()
.do_send(UpdateRateLimits::new(scoping.project_key, limits));
}
}
}

let buckets = bucket_limiter.into_buckets();
if !buckets.is_empty() {
// Forward buckets to envelope manager to send them to upstream or kafka:
EnvelopeManager::from_registry().send(SendMetrics {
buckets,
scoping,
partition_key,
});
}
}

fn encode_envelope_body(
body: Vec<u8>,
http_encoding: HttpEncoding,
Expand Down Expand Up @@ -2211,6 +2279,10 @@ impl EnvelopeProcessorService {
EnvelopeProcessor::ProcessEnvelope(message) => self.handle_process_envelope(*message),
EnvelopeProcessor::ProcessMetrics(message) => self.handle_process_metrics(*message),
EnvelopeProcessor::EncodeEnvelope(message) => self.handle_encode_envelope(*message),
#[cfg(feature = "processing")]
EnvelopeProcessor::RateLimitFlushBuckets(message) => {
self.handle_rate_limit_flush_buckets(message);
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ impl Project {
}
}

/// The rate limits that are active for this project.
pub fn rate_limits(&self) -> &RateLimits {
&self.rate_limits
}

/// The last time the project state was updated
pub fn last_updated_at(&self) -> Instant {
self.last_updated_at
Expand All @@ -601,6 +606,7 @@ impl Project {
///
/// The buckets will be keyed underneath this project key.
pub fn merge_buckets(&mut self, buckets: Vec<Bucket>) {
// TODO: rate limits
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will add a call to BucketLimiter::enforce_limits() to metrics_allowed() in a follow-up PR.

if self.metrics_allowed() {
Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets));
}
Expand All @@ -610,6 +616,7 @@ impl Project {
///
/// The metrics will be keyed underneath this project key.
pub fn insert_metrics(&mut self, metrics: Vec<Metric>) {
// TODO: rate limits
if self.metrics_allowed() {
Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics));
}
Expand Down
50 changes: 40 additions & 10 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,29 @@ use futures01::{future, Future};
use relay_common::ProjectKey;
use relay_config::{Config, RelayMode};
use relay_metrics::{self, AggregateMetricsError, FlushBuckets, InsertMetrics, MergeBuckets};

use relay_quotas::RateLimits;

use relay_redis::RedisPool;
use relay_statsd::metric;

use crate::actors::envelopes::{EnvelopeManager, SendMetrics};
use crate::actors::outcome::DiscardReason;
use crate::actors::processor::ProcessEnvelope;
use crate::actors::project::{Project, ProjectState};
use crate::actors::project::{ExpiryState, Project, ProjectState};
use crate::actors::project_local::LocalProjectSource;
use crate::actors::project_upstream::UpstreamProjectSource;
use crate::envelope::Envelope;
use crate::service::Registry;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{self, EnvelopeContext, GarbageDisposal, Response};

use super::project::ExpiryState;
use crate::utils::{self, BucketLimiter, EnvelopeContext, GarbageDisposal, Response};

#[cfg(feature = "processing")]
use {crate::actors::project_redis::RedisProjectSource, relay_common::clone};
use {
crate::actors::processor::{EnvelopeProcessor, RateLimitFlushBuckets},
crate::actors::project_redis::RedisProjectSource,
relay_common::clone,
};

#[derive(Fail, Debug)]
pub enum ProjectError {
Expand Down Expand Up @@ -623,10 +627,36 @@ impl Handler<FlushBuckets> for ProjectCache {
return;
}

EnvelopeManager::from_registry().send(SendMetrics {
buckets,
scoping,
partition_key,
});
// Check rate limits if necessary:
let quotas = project_state.config.quotas.clone();
let buckets = match BucketLimiter::create(buckets, quotas, scoping) {
Ok(mut bucket_limiter) => {
let cached_rate_limits = project.rate_limits().clone();
#[allow(unused_variables)]
let was_rate_limited = bucket_limiter.enforce_limits(Ok(&cached_rate_limits));

#[cfg(feature = "processing")]
if !was_rate_limited && config.processing_enabled() {
// If there were no cached rate limits active, let the processor check redis:
EnvelopeProcessor::from_registry().send(RateLimitFlushBuckets {
bucket_limiter,
partition_key,
});

return;
}

bucket_limiter.into_buckets()
}
Err(buckets) => buckets,
};

if !buckets.is_empty() {
EnvelopeManager::from_registry().send(SendMetrics {
buckets,
scoping,
partition_key,
});
}
}
}
Loading