Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quotas): Rate limit metrics before aggregator [INGEST-1655] #1540

Merged
12 changes: 11 additions & 1 deletion relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use relay_system::{
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{
protocol, CounterType, DistributionType, GaugeType, Metric, MetricNamespace,
MetricResourceIdentifier, MetricType, MetricValue, SetType,
MetricResourceIdentifier, MetricType, MetricValue, MetricsContainer, SetType,
};

/// Interval for the flush cycle of the [`AggregatorService`].
Expand Down Expand Up @@ -843,6 +843,16 @@ impl Bucket {
}
}

impl MetricsContainer for Bucket {
fn name(&self) -> &str {
self.name.as_str()
}

fn len(&self) -> usize {
self.value.len()
}
}

/// Any error that may occur during aggregation.
#[derive(Debug, Fail, PartialEq)]
#[fail(display = "failed to aggregate metrics: {}", kind)]
Expand Down
25 changes: 25 additions & 0 deletions relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,31 @@ impl Metric {
}
}

/// Common interface for `Metric` and `Bucket`.
pub trait MetricsContainer {
/// Returns the full metric name (MRI) of this container.
fn name(&self) -> &str;

/// Returns the number of raw data points in this container.
/// See [`crate::aggregation::BucketValue::len()`].
fn len(&self) -> usize;

/// Returns `true` if this container contains no values.
fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl MetricsContainer for Metric {
fn name(&self) -> &str {
self.name.as_str()
}

fn len(&self) -> usize {
1
}
}

/// Iterator over parsed metrics returned from [`Metric::parse_all`].
#[derive(Clone, Debug)]
pub struct ParseMetrics<'a> {
Expand Down
9 changes: 5 additions & 4 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use {
crate::actors::envelopes::SendMetrics,
crate::actors::project_cache::UpdateRateLimits,
crate::service::ServerErrorKind,
crate::utils::{BucketLimiter, EnvelopeLimiter},
crate::utils::{EnvelopeLimiter, MetricsLimiter},
failure::ResultExt,
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_quotas::ItemScoping,
Expand Down Expand Up @@ -522,7 +522,7 @@ impl EncodeEnvelope {
#[cfg(feature = "processing")]
#[derive(Debug)]
pub struct RateLimitFlushBuckets {
pub bucket_limiter: BucketLimiter,
pub bucket_limiter: MetricsLimiter<Bucket>,
pub partition_key: Option<u64>,
}

Expand Down Expand Up @@ -1098,7 +1098,8 @@ impl EnvelopeProcessorService {
item.set_payload(ContentType::Json, &replay[..]);
true
}
Err(_) => {
Err(error) => {
relay_log::warn!("failed to parse replay event: {}", LogError(&error));
context.track_outcome(
Outcome::Invalid(DiscardReason::InvalidReplayEvent),
DataCategory::Replay,
Expand Down Expand Up @@ -2221,7 +2222,7 @@ impl EnvelopeProcessorService {
}
}

let buckets = bucket_limiter.into_buckets();
let buckets = bucket_limiter.into_metrics();
if !buckets.is_empty() {
// Forward buckets to envelope manager to send them to upstream or kafka:
EnvelopeManager::from_registry().send(SendMetrics {
Expand Down
125 changes: 116 additions & 9 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use relay_filter::{matches_any_origin, FiltersConfig};
use relay_general::pii::{DataScrubbingConfig, PiiConfig};
use relay_general::store::{BreakdownsConfig, MeasurementsConfig};
use relay_general::types::SpanAttribute;
use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric};
use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric, MetricsContainer};
use relay_quotas::{Quota, RateLimits, Scoping};
use relay_sampling::SamplingConfig;
use relay_statsd::metric;
Expand All @@ -34,7 +34,9 @@ use crate::metrics_extraction::transactions::TransactionMetricsConfig;
use crate::metrics_extraction::TaggingRule;
use crate::service::Registry;
use crate::statsd::RelayCounters;
use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, Response};
use crate::utils::{
self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, MetricsLimiter, Response,
};

/// The expiry status of a project state. Return value of [`ProjectState::check_expiry`].
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -602,23 +604,45 @@ impl Project {
self.last_updated_at = Instant::now();
}

/// Applies cached rate limits to the given metrics or metrics buckets.
///
/// This only applies the rate limits currently stored on the project.
fn rate_limit_metrics<T: MetricsContainer>(&self, metrics: Vec<T>) -> Vec<T> {
match (&self.state, self.scoping()) {
(Some(state), Some(scoping)) => {
match MetricsLimiter::create(metrics, &state.config.quotas, scoping) {
Ok(mut limiter) => {
limiter.enforce_limits(Ok(&self.rate_limits));
limiter.into_metrics()
}
Err(metrics) => metrics,
}
}
_ => metrics,
}
}

/// Inserts given [buckets](Bucket) into the metrics aggregator.
///
/// The buckets will be keyed underneath this project key.
pub fn merge_buckets(&mut self, buckets: Vec<Bucket>) {
// TODO: rate limits
if self.metrics_allowed() {
Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets));
let buckets = self.rate_limit_metrics(buckets);
if !buckets.is_empty() {
Registry::aggregator().send(MergeBuckets::new(self.project_key, buckets));
}
}
}

/// Inserts given [metrics](Metric) into the metrics aggregator.
///
/// The metrics will be keyed underneath this project key.
pub fn insert_metrics(&mut self, metrics: Vec<Metric>) {
// TODO: rate limits
if self.metrics_allowed() {
Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics));
let metrics = self.rate_limit_metrics(metrics);
if !metrics.is_empty() {
Registry::aggregator().send(InsertMetrics::new(self.project_key, metrics));
}
}
}

Expand Down Expand Up @@ -893,15 +917,17 @@ impl Drop for Project {
mod tests {
use std::sync::Arc;

use relay_common::{ProjectId, ProjectKey};
use relay_common::{ProjectId, ProjectKey, UnixTimestamp};
use relay_metrics::{Bucket, BucketValue, Metric, MetricValue};
use serde_json::json;

use super::{Config, Project, ProjectState, StateChannel};

#[test]
fn get_state_expired() {
for expiry in [9999, 0] {
let config = Arc::new(
Config::from_json_value(serde_json::json!(
Config::from_json_value(json!(
{
"cache": {
"project_expiry": expiry,
Expand Down Expand Up @@ -936,7 +962,7 @@ mod tests {
#[test]
fn test_stale_cache() {
let config = Arc::new(
Config::from_json_value(serde_json::json!(
Config::from_json_value(json!(
{
"cache": {
"project_expiry": 100,
Expand Down Expand Up @@ -966,4 +992,85 @@ mod tests {
// still must be the project id set.
assert!(!project.state.as_ref().unwrap().invalid());
}

fn create_project(config: Option<serde_json::Value>) -> Project {
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut project = Project::new(project_key, Arc::new(Config::default()));
let mut project_state = ProjectState::allowed();
project_state.project_id = Some(ProjectId::new(42));
if let Some(config) = config {
project_state.config = serde_json::from_value(config).unwrap();
}
project.state = Some(Arc::new(project_state));
project
}

fn create_transaction_metric() -> Metric {
Metric {
name: "d:transactions/foo".to_string(),
value: MetricValue::Counter(1.0),
timestamp: UnixTimestamp::now(),
tags: Default::default(),
}
}

#[test]
fn test_rate_limit_incoming_metrics() {
let project = create_project(None);
let metrics = project.rate_limit_metrics(vec![create_transaction_metric()]);

assert!(metrics.len() == 1);
}

#[test]
fn test_rate_limit_incoming_metrics_no_quota() {
let project = create_project(Some(json!({
"quotas": [{
"id": "foo",
"categories": ["transaction_processed"], // TODO: change to "transaction"
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let metrics = project.rate_limit_metrics(vec![create_transaction_metric()]);

assert!(metrics.is_empty());
}

fn create_transaction_bucket() -> Bucket {
Bucket {
name: "d:transactions/foo".to_string(),
value: BucketValue::Counter(1.0),
timestamp: UnixTimestamp::now(),
tags: Default::default(),
width: 10,
}
}

#[test]
fn test_rate_limit_incoming_buckets() {
let project = create_project(None);
let metrics = project.rate_limit_metrics(vec![create_transaction_bucket()]);

assert!(metrics.len() == 1);
}

#[test]
fn test_rate_limit_incoming_buckets_no_quota() {
let project = create_project(Some(json!({
"quotas": [{
"id": "foo",
"categories": ["transaction_processed"], // TODO: change to "transaction"
"window": 3600,
"limit": 0,
"reasonCode": "foo",
}]
})));

let metrics = project.rate_limit_metrics(vec![create_transaction_bucket()]);

assert!(metrics.is_empty());
}
}
6 changes: 3 additions & 3 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::actors::project_upstream::UpstreamProjectSource;
use crate::envelope::Envelope;
use crate::service::Registry;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::{self, BucketLimiter, EnvelopeContext, GarbageDisposal, Response};
use crate::utils::{self, EnvelopeContext, GarbageDisposal, MetricsLimiter, Response};

#[cfg(feature = "processing")]
use {
Expand Down Expand Up @@ -629,7 +629,7 @@ impl Handler<FlushBuckets> for ProjectCache {

// Check rate limits if necessary:
let quotas = project_state.config.quotas.clone();
let buckets = match BucketLimiter::create(buckets, quotas, scoping) {
let buckets = match MetricsLimiter::create(buckets, quotas, scoping) {
Ok(mut bucket_limiter) => {
let cached_rate_limits = project.rate_limits().clone();
#[allow(unused_variables)]
Expand All @@ -646,7 +646,7 @@ impl Handler<FlushBuckets> for ProjectCache {
return;
}

bucket_limiter.into_buckets()
bucket_limiter.into_metrics()
}
Err(buckets) => buckets,
};
Expand Down
Loading