diff --git a/CHANGELOG.md b/CHANGELOG.md index 94f51d2d894..737fd8b7245 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,10 @@ - Limit the number of custom measurements per event. ([#1483](https://github.com/getsentry/relay/pull/1483))) - Add INP web vital as a measurement. ([#1487](https://github.com/getsentry/relay/pull/1487)) +- Enforce rate limits on metrics buckets using the transactions_processed quota. ([#1515](https://github.com/getsentry/relay/pull/1515)) - PII scrubbing now treats any key containing `token` as a password. ([#1527](https://github.com/getsentry/relay/pull/1527)) -** Bug Fixes**: +**Bug Fixes**: - Make sure that non-processing Relays drop all invalid transactions. ([#1513](https://github.com/getsentry/relay/pull/1513)) diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 2dad99e94a5..89b431d6da5 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -54,11 +54,13 @@ use crate::utils::{ #[cfg(feature = "processing")] use { + crate::actors::envelopes::SendMetrics, crate::actors::project_cache::UpdateRateLimits, crate::service::ServerErrorKind, - crate::utils::EnvelopeLimiter, + crate::utils::{BucketLimiter, EnvelopeLimiter}, failure::ResultExt, relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor}, + relay_quotas::ItemScoping, relay_quotas::{RateLimitingError, RedisRateLimiter}, symbolic_unreal::{Unreal4Error, Unreal4ErrorKind}, }; @@ -516,12 +518,22 @@ impl EncodeEnvelope { } } +/// Applies rate limits to metrics buckets and forwards them to the envelope manager. +#[cfg(feature = "processing")] +#[derive(Debug)] +pub struct RateLimitFlushBuckets { + pub bucket_limiter: BucketLimiter, + pub partition_key: Option, +} + /// CPU-intensive processing tasks for envelopes. #[derive(Debug)] pub enum EnvelopeProcessor { ProcessEnvelope(Box), ProcessMetrics(Box), EncodeEnvelope(Box), + #[cfg(feature = "processing")] + RateLimitFlushBuckets(RateLimitFlushBuckets), } impl EnvelopeProcessor { @@ -556,6 +568,15 @@ impl FromMessage for EnvelopeProcessor { } } +#[cfg(feature = "processing")] +impl FromMessage for EnvelopeProcessor { + type Response = NoResponse; + + fn from_message(message: RateLimitFlushBuckets, _: ()) -> Self { + Self::RateLimitFlushBuckets(message) + } +} + /// Service implementing the [`EnvelopeProcessor`] interface. /// /// This service handles messages in a worker pool with configurable concurrency. @@ -2164,6 +2185,53 @@ impl EnvelopeProcessorService { } } + /// Check and apply rate limits to metrics buckets. + #[cfg(feature = "processing")] + fn handle_rate_limit_flush_buckets(&self, message: RateLimitFlushBuckets) { + let RateLimitFlushBuckets { + mut bucket_limiter, + partition_key, + } = message; + + let scoping = *bucket_limiter.scoping(); + + if let Some(rate_limiter) = self.rate_limiter.as_ref() { + let item_scoping = ItemScoping { + category: DataCategory::TransactionProcessed, + scoping: &scoping, + }; + // We set over_accept_once such that the limit is actually reached, which allows subsequent + // calls with quantity=0 to be rate limited. + let over_accept_once = true; + let rate_limits = rate_limiter.is_rate_limited( + bucket_limiter.quotas(), + item_scoping, + bucket_limiter.transaction_count(), + over_accept_once, + ); + + let was_enforced = bucket_limiter.enforce_limits(rate_limits.as_ref().map_err(|_| ())); + + if was_enforced { + if let Ok(limits) = rate_limits { + // Update the rate limits in the project cache. + ProjectCache::from_registry() + .do_send(UpdateRateLimits::new(scoping.project_key, limits)); + } + } + } + + let buckets = bucket_limiter.into_buckets(); + if !buckets.is_empty() { + // Forward buckets to envelope manager to send them to upstream or kafka: + EnvelopeManager::from_registry().send(SendMetrics { + buckets, + scoping, + partition_key, + }); + } + } + fn encode_envelope_body( body: Vec, http_encoding: HttpEncoding, @@ -2211,6 +2279,10 @@ impl EnvelopeProcessorService { EnvelopeProcessor::ProcessEnvelope(message) => self.handle_process_envelope(*message), EnvelopeProcessor::ProcessMetrics(message) => self.handle_process_metrics(*message), EnvelopeProcessor::EncodeEnvelope(message) => self.handle_encode_envelope(*message), + #[cfg(feature = "processing")] + EnvelopeProcessor::RateLimitFlushBuckets(message) => { + self.handle_rate_limit_flush_buckets(message); + } } } } diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 8b23b0c7ec8..e19d3513c35 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -585,6 +585,11 @@ impl Project { } } + /// The rate limits that are active for this project. + pub fn rate_limits(&self) -> &RateLimits { + &self.rate_limits + } + /// The last time the project state was updated pub fn last_updated_at(&self) -> Instant { self.last_updated_at @@ -601,6 +606,7 @@ impl Project { /// /// The buckets will be keyed underneath this project key. pub fn merge_buckets(&mut self, buckets: Vec) { + // TODO: rate limits if self.metrics_allowed() { Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets)); } @@ -610,6 +616,7 @@ impl Project { /// /// The metrics will be keyed underneath this project key. pub fn insert_metrics(&mut self, metrics: Vec) { + // TODO: rate limits if self.metrics_allowed() { Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics)); } diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 4336de88858..34d7fcdc7a1 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -9,25 +9,29 @@ use futures01::{future, Future}; use relay_common::ProjectKey; use relay_config::{Config, RelayMode}; use relay_metrics::{self, AggregateMetricsError, FlushBuckets, InsertMetrics, MergeBuckets}; + use relay_quotas::RateLimits; + use relay_redis::RedisPool; use relay_statsd::metric; use crate::actors::envelopes::{EnvelopeManager, SendMetrics}; use crate::actors::outcome::DiscardReason; use crate::actors::processor::ProcessEnvelope; -use crate::actors::project::{Project, ProjectState}; +use crate::actors::project::{ExpiryState, Project, ProjectState}; use crate::actors::project_local::LocalProjectSource; use crate::actors::project_upstream::UpstreamProjectSource; use crate::envelope::Envelope; use crate::service::Registry; use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers}; -use crate::utils::{self, EnvelopeContext, GarbageDisposal, Response}; - -use super::project::ExpiryState; +use crate::utils::{self, BucketLimiter, EnvelopeContext, GarbageDisposal, Response}; #[cfg(feature = "processing")] -use {crate::actors::project_redis::RedisProjectSource, relay_common::clone}; +use { + crate::actors::processor::{EnvelopeProcessor, RateLimitFlushBuckets}, + crate::actors::project_redis::RedisProjectSource, + relay_common::clone, +}; #[derive(Fail, Debug)] pub enum ProjectError { @@ -623,10 +627,36 @@ impl Handler for ProjectCache { return; } - EnvelopeManager::from_registry().send(SendMetrics { - buckets, - scoping, - partition_key, - }); + // Check rate limits if necessary: + let quotas = project_state.config.quotas.clone(); + let buckets = match BucketLimiter::create(buckets, quotas, scoping) { + Ok(mut bucket_limiter) => { + let cached_rate_limits = project.rate_limits().clone(); + #[allow(unused_variables)] + let was_rate_limited = bucket_limiter.enforce_limits(Ok(&cached_rate_limits)); + + #[cfg(feature = "processing")] + if !was_rate_limited && config.processing_enabled() { + // If there were no cached rate limits active, let the processor check redis: + EnvelopeProcessor::from_registry().send(RateLimitFlushBuckets { + bucket_limiter, + partition_key, + }); + + return; + } + + bucket_limiter.into_buckets() + } + Err(buckets) => buckets, + }; + + if !buckets.is_empty() { + EnvelopeManager::from_registry().send(SendMetrics { + buckets, + scoping, + partition_key, + }); + } } } diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs new file mode 100644 index 00000000000..d19199f938f --- /dev/null +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -0,0 +1,166 @@ +//! Quota and rate limiting helpers for metrics buckets. + +use relay_common::{DataCategory, UnixTimestamp}; +use relay_metrics::{Bucket, MetricNamespace, MetricResourceIdentifier}; +use relay_quotas::{ItemScoping, Quota, RateLimits, Scoping}; + +use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; + +/// Holds metrics buckets with some information about their contents. +#[derive(Debug)] +pub struct BucketLimiter { + /// A list of metrics buckets. + buckets: Vec, + + /// The quotas set on the current project. + quotas: Vec, + + /// Project information. + scoping: Scoping, + + /// Binary index of buckets in the transaction namespace (used to retain). + transaction_buckets: Vec, + + /// The number of transactions contributing to these buckets. + transaction_count: usize, +} + +impl BucketLimiter { + /// Create a new limiter instance. + /// + /// Returns Ok if `buckets` contain transaction metrics, `buckets` otherwise. + pub fn create( + buckets: Vec, + quotas: Vec, + scoping: Scoping, + ) -> Result> { + let transaction_counts: Vec<_> = buckets + .iter() + .map(|bucket| { + let mri = match MetricResourceIdentifier::parse(bucket.name.as_str()) { + Ok(mri) => mri, + Err(_) => { + relay_log::error!("Invalid MRI: {}", bucket.name); + return None; + } + }; + + // Keep all metrics that are not transaction related: + if mri.namespace != MetricNamespace::Transactions { + return None; + } + + if mri.name == "duration" { + // The "duration" metric is extracted exactly once for every processed transaction, + // so we can use it to count the number of transactions. + let count = bucket.value.len(); + Some(count) + } else { + // For any other metric in the transaction namespace, we check the limit with quantity=0 + // so transactions are not double counted against the quota. + Some(0) + } + }) + .collect(); + + // Accumulate the total transaction count: + let transaction_count = transaction_counts + .iter() + .fold(None, |acc, transaction_count| match transaction_count { + Some(count) => Some(acc.unwrap_or(0) + count), + None => acc, + }); + + if let Some(transaction_count) = transaction_count { + let transaction_buckets = transaction_counts.iter().map(Option::is_some).collect(); + Ok(Self { + buckets, + quotas, + scoping, + transaction_buckets, + transaction_count, + }) + } else { + Err(buckets) + } + } + + #[allow(dead_code)] + pub fn scoping(&self) -> &Scoping { + &self.scoping + } + + #[allow(dead_code)] + pub fn quotas(&self) -> &[Quota] { + self.quotas.as_ref() + } + + #[allow(dead_code)] + pub fn transaction_count(&self) -> usize { + self.transaction_count + } + + fn drop_with_outcome(&mut self, outcome: Outcome) { + // Drop transaction buckets: + let buckets = std::mem::take(&mut self.buckets); + + self.buckets = buckets + .into_iter() + .zip(self.transaction_buckets.iter()) + .filter_map(|(bucket, is_transaction_bucket)| { + (!is_transaction_bucket).then_some(bucket) + }) + .collect(); + + // Track outcome for the processed transactions we dropped here: + if self.transaction_count > 0 { + TrackOutcome::from_registry().send(TrackOutcome { + timestamp: UnixTimestamp::now().as_datetime(), // as good as any timestamp + scoping: self.scoping, + outcome, + event_id: None, + remote_addr: None, + category: DataCategory::TransactionProcessed, + quantity: self.transaction_count as u32, + }); + } + } + + // Drop transaction-related buckets and create outcomes for any active rate limits. + // + // If rate limits could not be checked for some reason, pass an `Err` to this function. + // In this case, transaction-related metrics buckets will also be dropped, and an "internal" + // outcome is generated. + // + // Returns true if any buckets were dropped. + pub fn enforce_limits(&mut self, rate_limits: Result<&RateLimits, ()>) -> bool { + let mut dropped_stuff = false; + match rate_limits { + Ok(rate_limits) => { + let item_scoping = ItemScoping { + category: DataCategory::TransactionProcessed, + scoping: &self.scoping, + }; + let applied_rate_limits = rate_limits.check_with_quotas(&self.quotas, item_scoping); + + // If a rate limit is active, discard transaction buckets. + if let Some(limit) = applied_rate_limits.longest() { + self.drop_with_outcome(Outcome::RateLimited(limit.reason_code.clone())); + dropped_stuff = true; + } + } + Err(_) => { + // Error from rate limiter, drop transaction buckets. + self.drop_with_outcome(Outcome::Invalid(DiscardReason::Internal)); + dropped_stuff = true; + } + }; + + dropped_stuff + } + + /// Consume this struct and return its buckets. + pub fn into_buckets(self) -> Vec { + self.buckets + } +} diff --git a/relay-server/src/utils/mod.rs b/relay-server/src/utils/mod.rs index f95cb174df3..6ba2cc72806 100644 --- a/relay-server/src/utils/mod.rs +++ b/relay-server/src/utils/mod.rs @@ -5,6 +5,7 @@ mod dynamic_sampling; mod envelope_context; mod error_boundary; mod garbage; +mod metrics_rate_limits; mod multipart; mod param_parser; mod rate_limits; @@ -29,6 +30,7 @@ pub use self::dynamic_sampling::*; pub use self::envelope_context::*; pub use self::error_boundary::*; pub use self::garbage::*; +pub use self::metrics_rate_limits::*; pub use self::multipart::*; pub use self::param_parser::*; pub use self::rate_limits::*; diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 5cac16d513e..2dd5ce94c37 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -224,6 +224,17 @@ def send_metrics(self, project_id, payload, timestamp=None): ) self.send_envelope(project_id, envelope) + def send_metrics_buckets(self, project_id, payload, timestamp=None): + envelope = Envelope() + envelope.add_item( + Item( + payload=PayloadRef(json=payload), + type="metric_buckets", + headers=None if timestamp is None else {"timestamp": timestamp}, + ) + ) + self.send_envelope(project_id, envelope) + def send_security_report( self, project_id, diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index 2c42528bbeb..99dc46b9623 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -200,6 +200,8 @@ def category_value(category): return 4 if category == "session": return 5 + if category == "transaction_processed": + return 8 assert False, "invalid category" @@ -224,7 +226,7 @@ def get_outcome(self): assert len(outcomes) == 1, "More than one outcome was consumed" return outcomes[0] - def assert_rate_limited(self, reason, key_id=None, categories=None): + def assert_rate_limited(self, reason, key_id=None, categories=None, quantity=None): if categories is None: outcome = self.get_outcome() assert isinstance(outcome["category"], int) @@ -241,6 +243,10 @@ def assert_rate_limited(self, reason, key_id=None, categories=None): if key_id is not None: assert outcome["key_id"] == key_id + if quantity is not None: + count = sum(outcome["quantity"] for outcome in outcomes) + assert count == quantity + @pytest.fixture def events_consumer(kafka_consumer): @@ -292,6 +298,15 @@ def get_metric(self, timeout=None): return json.loads(message.value()) + def get_metrics(self, timeout=None, max_attempts=100): + for _ in range(max_attempts): + message = self.poll(timeout=timeout) + if message is None: + return + else: + assert message.error() is None + yield json.loads(message.value()) + class SessionsConsumer(ConsumerBase): def get_session(self): diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 906f39dd790..9e162121f5b 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -1,12 +1,13 @@ import json import os import queue -import datetime +from time import sleep import uuid import six import socket import threading import pytest +from datetime import datetime, timedelta from requests.exceptions import HTTPError from flask import abort, Response @@ -269,12 +270,12 @@ def test_store_not_normalized(mini_sentry, relay): def make_transaction(event): - now = datetime.datetime.utcnow() + now = datetime.utcnow() event.update( { "type": "transaction", "timestamp": now.isoformat(), - "start_timestamp": (now - datetime.timedelta(seconds=2)).isoformat(), + "start_timestamp": (now - timedelta(seconds=2)).isoformat(), "spans": [], "contexts": { "trace": { @@ -470,6 +471,188 @@ def test_processing_quotas( assert event["logentry"]["formatted"] == f"otherkey{i}" +@pytest.mark.parametrize("violating_bucket", [[4.0, 5.0], [4.0, 5.0, 6.0]]) +def test_rate_limit_metrics_buckets( + mini_sentry, + relay_with_processing, + metrics_consumer, + outcomes_consumer, + violating_bucket, +): + """ + param violating_bucket is parametrized so we cover both cases: + 1. the quota is matched exactly + 2. quota is exceeded by one + """ + bucket_interval = 1 # second + relay = relay_with_processing( + { + "processing": {"max_rate_limit": 2 * 86400}, + "aggregator": { + "bucket_interval": bucket_interval, + "initial_delay": 0, + "debounce_delay": 0, + }, + } + ) + metrics_consumer = metrics_consumer() + outcomes_consumer = outcomes_consumer() + + project_id = 42 + projectconfig = mini_sentry.add_full_project_config(project_id) + # add another dsn key (we want 2 keys so we can set limits per key) + mini_sentry.add_dsn_key_to_project(project_id) + + public_keys = mini_sentry.get_dsn_public_key_configs(project_id) + key_id = public_keys[0]["numericId"] + + reason_code = uuid.uuid4().hex + + projectconfig["config"]["quotas"] = [ + { + "id": "test_rate_limiting_{}".format(uuid.uuid4().hex), + "scope": "key", + "scopeId": six.text_type(key_id), + "categories": ["transaction_processed"], + "limit": 5, + "window": 86400, + "reasonCode": reason_code, + } + ] + + def generate_ticks(): + # Generate a new timestamp for every bucket, so they do not get merged by the aggregator + tick = int(datetime.utcnow().timestamp() // bucket_interval * bucket_interval) + while True: + yield tick + tick += bucket_interval + + tick = generate_ticks() + + def make_bucket(name, type_, values): + return { + "org_id": 1, + "project_id": project_id, + "timestamp": next(tick), + "name": name, + "type": type_, + "value": values, + "width": bucket_interval, + } + + def send_buckets(buckets): + relay.send_metrics_buckets(project_id, buckets) + sleep(0.2) + + # NOTE: Sending these buckets in multiple envelopes because the order of flushing + # and also the order of rate limiting is not deterministic. + send_buckets( + [ + # Send a few non-duration buckets, they will not deplete the quota + make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [1.0]), + # Session metrics are accepted + make_bucket("d:sessions/session@none", "c", 1), + make_bucket("d:sessions/duration@second", "d", 9 * [1]), + ] + ) + send_buckets( + [ + # Duration metric, subtract 3 from quota + make_bucket("d:transactions/duration@millisecond", "d", [1, 2, 3]), + ], + ) + send_buckets( + [ + # Can still send unlimited non-duration metrics + make_bucket("d:transactions/measurements.lcp@millisecond", "d", 10 * [2.0]), + ], + ) + send_buckets( + [ + # Duration metric, subtract from quota. This bucket is still accepted, but the rest + # will be exceeded. + make_bucket("d:transactions/duration@millisecond", "d", violating_bucket), + ], + ) + send_buckets( + [ + # FCP buckets won't make it into kakfa + make_bucket("d:transactions/measurements.fcp@millisecond", "d", 10 * [7.0]), + ], + ) + send_buckets( + [ + # Another three for duration, won't make it into kafka. + make_bucket("d:transactions/duration@millisecond", "d", [7, 8, 9]), + # Session metrics are still accepted. + make_bucket("d:sessions/session@user", "s", [1254]), + ], + ) + + produced_buckets = list(metrics_consumer.get_metrics(timeout=4)) + + # Sort buckets to prevent ordering flakiness: + produced_buckets.sort(key=lambda b: (b["name"], b["value"])) + for bucket in produced_buckets: + del bucket["timestamp"] + + assert produced_buckets == [ + { + "name": "d:sessions/duration@second", + "org_id": 1, + "project_id": 42, + "type": "d", + "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + }, + { + "name": "d:sessions/session@none", + "org_id": 1, + "project_id": 42, + "type": "c", + "value": 1.0, + }, + { + "name": "d:sessions/session@user", + "org_id": 1, + "project_id": 42, + "type": "s", + "value": [1254], + }, + { + "name": "d:transactions/duration@millisecond", + "org_id": 1, + "project_id": 42, + "type": "d", + "value": [1.0, 2.0, 3.0], + }, + { + "name": "d:transactions/duration@millisecond", + "org_id": 1, + "project_id": 42, + "type": "d", + "value": violating_bucket, + }, + { + "name": "d:transactions/measurements.lcp@millisecond", + "org_id": 1, + "project_id": 42, + "type": "d", + "value": [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + }, + { + "name": "d:transactions/measurements.lcp@millisecond", + "org_id": 1, + "project_id": 42, + "type": "d", + "value": [2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0], + }, + ] + + outcomes_consumer.assert_rate_limited( + reason_code, key_id=key_id, categories=["transaction_processed"], quantity=3, + ) + + def test_events_buffered_before_auth(relay, mini_sentry): evt = threading.Event()