Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quotas): Always require transaction processing quota #1507

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7fff3ea
first wip
Sep 14, 2022
987f5b2
More generically allow multiple datacategories for an event
Sep 15, 2022
aac3e70
clean up random comments
Sep 15, 2022
ad38afe
Fixup some stuff I missed because missing --all-features
Sep 15, 2022
201e955
Plumb through to the processing pipeline
Sep 16, 2022
f582a28
Merge branch 'master' into flub/billing-free-processing
Sep 16, 2022
0571c72
switch back to single datacategory for an event
Sep 22, 2022
f0ffb61
clippy
Sep 22, 2022
d8ebaa9
more cleanup
Sep 22, 2022
49ee815
more cleanup
Sep 22, 2022
514d56d
A WIP commit that breaks compilation but updates envelope limiter
Sep 22, 2022
eb829e3
Merge branch 'master' into flub/billing-free-processing
Sep 26, 2022
a60b668
Hook up early enforcement to be passed around the processing
Sep 26, 2022
53954f1
cleanup
Sep 26, 2022
e3789f0
clippy
Sep 26, 2022
1758670
typo
Sep 26, 2022
cfd5bfb
fix some test, add some tests
Sep 27, 2022
c349d79
Merge branch 'master' into flub/billing-free-processing
Sep 27, 2022
d9365ad
Apply suggestions from code review
Sep 27, 2022
4d6a903
make clippy happy
Sep 27, 2022
9fc9e5f
Processing quota must always be available
Sep 30, 2022
ede0e5d
Return 429 responses only if processing quota is exhausted
Sep 30, 2022
f536891
remove extracted metrics when we have to
Sep 30, 2022
5f2513e
Do not skip finalize_event
Sep 30, 2022
21aefbf
clippy
Sep 30, 2022
bb3efe3
update tests for current logic
Sep 30, 2022
6d0ff89
correct doc links
Oct 4, 2022
fcd26c9
learn to spell
Oct 4, 2022
d69570d
Use snake case for json
Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl fmt::Display for EventType {

/// Classifies the type of data that is being ingested.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
#[repr(i8)]
pub enum DataCategory {
/// Reserved and unused.
Expand Down Expand Up @@ -128,6 +128,7 @@ pub enum DataCategory {
impl DataCategory {
/// Returns the data category corresponding to the given name.
pub fn from_name(string: &str) -> Self {
// TODO: This should probably use serde.
match string {
"default" => Self::Default,
"error" => Self::Error,
Expand All @@ -144,6 +145,7 @@ impl DataCategory {

/// Returns the canonical name of this data category.
pub fn name(self) -> &'static str {
// TODO: This should probably use serde.
match self {
Self::Default => "default",
Self::Error => "error",
Expand Down
8 changes: 8 additions & 0 deletions relay-quotas/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ impl RateLimits {
self.limits.retain(|limit| !limit.retry_after.expired());
}

/// Removes limits for which `f(&rate_limit)` returns `false` from the rate limits.
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&RateLimit) -> bool,
{
self.limits.retain(f)
}

/// Checks whether any rate limits apply to the given scoping.
///
/// If no limits match, then the returned `RateLimits` instance evalutes `is_ok`. Otherwise, it
Expand Down
58 changes: 38 additions & 20 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use relay_general::store::{ClockDriftProcessor, LightNormalizationConfig};
use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value};
use relay_log::LogError;
use relay_metrics::{Bucket, InsertMetrics, MergeBuckets, Metric};
use relay_quotas::{DataCategory, RateLimits, ReasonCode};
use relay_quotas::{DataCategory, ReasonCode};
use relay_redis::RedisPool;
use relay_sampling::{DynamicSamplingContext, RuleId};
use relay_statsd::metric;
Expand All @@ -49,14 +49,15 @@ use crate::metrics_extraction::transactions::extract_transaction_metrics;
use crate::service::{ServerError, REGISTRY};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, ChunkedFormDataAggregator, EnvelopeContext, ErrorBoundary, FormDataIter, SamplingResult,
self, ChunkedFormDataAggregator, Enforcement, EnvelopeContext, ErrorBoundary, FormDataIter,
SamplingResult,
};

#[cfg(feature = "processing")]
use {
crate::actors::project_cache::UpdateRateLimits,
crate::service::ServerErrorKind,
crate::utils::EnvelopeLimiter,
crate::utils::{EnvelopeLimiter, QuotaCheckReason},
failure::ResultExt,
relay_general::store::{GeoIpLookup, StoreConfig, StoreProcessor},
relay_quotas::{RateLimitingError, RedisRateLimiter},
Expand Down Expand Up @@ -216,6 +217,16 @@ struct ProcessEnvelopeState {
/// Track whether transaction metrics were already extracted.
transaction_metrics_extracted: bool,

/// Tracks quota enforcements applied in the store endpoint.
///
/// While the request is still open an in-memory quota check is performed in case any
/// exceeded quota is cached. The results of this check is stored here. Most often the
/// envelope is then dropped and not processed, and thus this struct wouldn't even be
/// created, however for e.g. transactions there are two types of quotas and only one
/// may be exceeded. This allows the processing pipeline to inspect the applied quotas
/// and adjust processing as needed.
early_enforcement: Enforcement,

/// Partial metrics of the Event during construction.
///
/// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are
Expand All @@ -228,14 +239,6 @@ struct ProcessEnvelopeState {
/// resulting item.
sample_rates: Option<Value>,

/// Rate limits returned in processing mode.
///
/// The rate limiter is invoked in processing mode, after which the resulting limits are stored
/// in this field. Note that there can be rate limits even if the envelope still carries items.
///
/// These are always empty in non-processing mode, since the rate limiter is not invoked.
rate_limits: RateLimits,

/// Metrics extracted from items in the envelope.
///
/// Relay can extract metrics for sessions and transactions, which is controlled by
Expand Down Expand Up @@ -463,9 +466,6 @@ pub struct ProcessEnvelopeResponse {
/// removed from the envelope. Otherwise, if the envelope is empty or the entire envelope needs
/// to be dropped, this is `None`.
pub envelope: Option<(Envelope, EnvelopeContext)>,

/// All rate limits that have been applied on the envelope.
pub rate_limits: RateLimits,
}

/// Applies processing to all contents of the given envelope.
Expand Down Expand Up @@ -1145,13 +1145,14 @@ impl EnvelopeProcessorService {
// 2. The DSN was moved and the envelope sent to the old project ID.
envelope.meta_mut().set_project_id(project_id);

let early_enforcement = envelope.get_early_enforcement().clone();
Ok(ProcessEnvelopeState {
envelope,
event: Annotated::empty(),
transaction_metrics_extracted: false,
early_enforcement,
metrics: Metrics::default(),
sample_rates: None,
rate_limits: RateLimits::new(),
extracted_metrics: Vec::new(),
project_state,
sampling_project_state,
Expand Down Expand Up @@ -1643,6 +1644,12 @@ impl EnvelopeProcessorService {

#[cfg(feature = "processing")]
fn store_process_event(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
if state.early_enforcement.event.is_active()
&& state.early_enforcement.event.category() == DataCategory::Transaction
{
return Ok(());
}

let ProcessEnvelopeState {
ref envelope,
ref mut event,
Expand Down Expand Up @@ -1745,18 +1752,26 @@ impl EnvelopeProcessorService {
}

let scoping = state.envelope_context.scoping();
let reason = QuotaCheckReason::EnforceQuota(&state.early_enforcement);
let (enforcement, limits) = metric!(timer(RelayTimers::EventProcessingRateLimiting), {
envelope_limiter
.enforce(&mut state.envelope, &scoping)
.enforce(&mut state.envelope, &scoping, reason)
.map_err(ProcessingError::QuotasFailed)?
});

// Metrics are already extracted from the transaction, remove them again if we now
// exhausted quota for them.
if enforcement.transaction_processed.is_active() {
// Sessions also append to this vector, but envelopes at this stage will never
// have both a session and a transaction.
state.extracted_metrics.clear();
}

if limits.is_limited() {
ProjectCache::from_registry()
.do_send(UpdateRateLimits::new(scoping.project_key, limits.clone()));
.do_send(UpdateRateLimits::new(scoping.project_key, limits));
}

state.rate_limits = limits;
enforcement.track_outcomes(&state.envelope, &state.envelope_context.scoping());

if remove_event {
Expand All @@ -1772,7 +1787,9 @@ impl EnvelopeProcessorService {
&self,
state: &mut ProcessEnvelopeState,
) -> Result<(), ProcessingError> {
if state.transaction_metrics_extracted {
if state.transaction_metrics_extracted
|| state.early_enforcement.transaction_processed.is_active()
{
// Nothing to do here.
return Ok(());
}
Expand Down Expand Up @@ -2013,6 +2030,8 @@ impl EnvelopeProcessorService {
if_processing!({
self.enforce_quotas(state)?;
});
// Note: else: since the project state is now a little bit fresher, we could
// re-check the in-memory quotas to see if they are exceeded.

if state.has_event() {
self.scrub_event(state)?;
Expand Down Expand Up @@ -2068,7 +2087,6 @@ impl EnvelopeProcessorService {

Ok(ProcessEnvelopeResponse {
envelope: envelope_response,
rate_limits: state.rate_limits,
})
}
Err(err) => {
Expand Down
14 changes: 11 additions & 3 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use smallvec::SmallVec;
use url::Url;

use relay_auth::PublicKey;
use relay_common::{ProjectId, ProjectKey};
use relay_common::{DataCategory, ProjectId, ProjectKey};
use relay_config::Config;
use relay_filter::{matches_any_origin, FiltersConfig};
use relay_general::pii::{DataScrubbingConfig, PiiConfig};
Expand All @@ -33,7 +33,9 @@ use crate::metrics_extraction::sessions::SessionMetricsConfig;
use crate::metrics_extraction::transactions::TransactionMetricsConfig;
use crate::metrics_extraction::TaggingRule;
use crate::statsd::RelayCounters;
use crate::utils::{self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, Response};
use crate::utils::{
self, EnvelopeContext, EnvelopeLimiter, ErrorBoundary, QuotaCheckReason, Response,
};

/// The expiry status of a project state. Return value of [`ProjectState::check_expiry`].
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -846,7 +848,8 @@ impl Project {
Ok(self.rate_limits.check_with_quotas(quotas, item_scoping))
});

let (enforcement, rate_limits) = envelope_limiter.enforce(&mut envelope, &scoping)?;
let (enforcement, mut rate_limits) =
envelope_limiter.enforce(&mut envelope, &scoping, QuotaCheckReason::CheckEnvelope)?;
enforcement.track_outcomes(&envelope, &scoping);
envelope_context.update(&envelope);

Expand All @@ -858,6 +861,11 @@ impl Project {
Some((envelope, envelope_context))
};

// Transaction quota always runs out before TransactionProcessed, so we remove rate
// limits for Transaction so that the endpoint handler only sends 429 responses if
// TransactionProcessed is exhausted.
rate_limits.retain(|l| !l.categories.contains(&DataCategory::Transaction));

Ok(CheckedEnvelope {
envelope,
rate_limits,
Expand Down
26 changes: 25 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use relay_sampling::DynamicSamplingContext;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::extractors::{PartialMeta, RequestMeta};
use crate::utils::ErrorBoundary;
use crate::utils::{Enforcement, ErrorBoundary};

pub const CONTENT_TYPE: &str = "application/x-sentry-envelope";

Expand Down Expand Up @@ -715,6 +715,16 @@ pub struct EnvelopeHeaders<M = RequestMeta> {
/// Other attributes for forward compatibility.
#[serde(flatten)]
other: BTreeMap<String, Value>,

/// The result of quota checks, if done.
///
/// While handling the request an early quota check is done with just in-memory quota
/// information. If the envelope is still enqueed to process after this this will
/// contain the enfrocement results of that early check. This allows for transactions
/// to have indexing and processing quotas while they can be exhausted independently,
/// the processing needs to continue for the other part still.
#[serde(default, skip)]
early_enforcement: Enforcement,
}

impl EnvelopeHeaders<PartialMeta> {
Expand Down Expand Up @@ -753,6 +763,7 @@ impl EnvelopeHeaders<PartialMeta> {
sent_at: self.sent_at,
trace: self.trace,
other: self.other,
early_enforcement: Enforcement::default(),
})
}
}
Expand All @@ -774,6 +785,7 @@ impl Envelope {
sent_at: None,
other: BTreeMap::new(),
trace: None,
early_enforcement: Enforcement::default(),
},
items: Items::new(),
}
Expand Down Expand Up @@ -881,6 +893,18 @@ impl Envelope {
self.headers.other.insert(name.into(), value.into())
}

/// Returns the internal early enforcement header.
///
/// See [`EnvelopeHeaders::early_enforcement`].
pub fn get_early_enforcement(&self) -> &Enforcement {
&self.headers.early_enforcement
}

/// Sets the internal early enforcement header.
pub fn set_early_enforcement(&mut self, enforcement: Enforcement) {
self.headers.early_enforcement = enforcement;
}

/// Returns an iterator over items in this envelope.
///
/// Note that iteration order may change when using `take_item`.
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/utils/envelope_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl EnvelopeContext {
self.track_outcome(outcome.clone(), category, 1);
}

if self.summary.transaction_processing {
self.track_outcome(outcome.clone(), DataCategory::TransactionProcessed, 1);
}

if self.summary.attachment_quantity > 0 {
self.track_outcome(
outcome.clone(),
Expand Down
Loading