From efea8080141b6ea892bf4bd317a6b57ec970b5f0 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Thu, 18 Mar 2021 17:33:47 +0100 Subject: [PATCH 1/7] WIP add outcomes for transaction sampling --- relay-sampling/src/lib.rs | 41 +++++++++--- relay-server/src/actors/events.rs | 30 ++++----- relay-server/src/endpoints/common.rs | 14 ++-- relay-server/src/utils/dynamic_sampling.rs | 76 ++++++++++------------ 4 files changed, 84 insertions(+), 77 deletions(-) diff --git a/relay-sampling/src/lib.rs b/relay-sampling/src/lib.rs index 6f870e2537..a0fcfba123 100644 --- a/relay-sampling/src/lib.rs +++ b/relay-sampling/src/lib.rs @@ -30,6 +30,17 @@ pub enum RuleType { Error, } +/// The result of a Sampling operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SamplingResult { + /// Keep the event. + Keep, + /// Drop the event, due to the rule with provided Id. + Drop(RuleId), + /// No decision can be made. + NoDecision, +} + /// A condition that checks the values using the equality operator. /// /// For string values it supports case-insensitive comparison. @@ -492,14 +503,25 @@ pub struct TraceContext { } impl TraceContext { - /// Returns the decision of whether to sample or not a trace based on the configuration rules. + /// Returns the sampling decision of whether to drop or not a transaction based on the configuration rules. /// /// If None then a decision can't be made either because of an invalid of missing trace context or /// because no applicable sampling rule could be found. - pub fn should_sample(&self, ip_addr: Option, config: &SamplingConfig) -> Option { - let rule = get_matching_trace_rule(config, self, ip_addr, RuleType::Trace)?; - let rate = pseudo_random_from_uuid(self.trace_id)?; - Some(rate < rule.sample_rate) + pub fn should_keep(&self, ip_addr: Option, config: &SamplingConfig) -> SamplingResult { + if let Some(rule) = get_matching_trace_rule(config, self, ip_addr, RuleType::Trace) { + let rate = match pseudo_random_from_uuid(self.trace_id) { + None => return SamplingResult::NoDecision, + Some(rate) => rate, + }; + + if rate < rule.sample_rate { + SamplingResult::Keep + } else { + SamplingResult::Drop(rule.id) + } + } else { + SamplingResult::NoDecision + } } } @@ -552,15 +574,18 @@ pub fn pseudo_random_from_uuid(id: Uuid) -> Option { #[cfg(test)] mod tests { - use super::*; + use std::net::{IpAddr as NetIpAddr, Ipv4Addr}; + use std::str::FromStr; + use insta::assert_ron_snapshot; + use relay_general::protocol::{ Csp, Exception, Headers, IpAddr, JsonLenientString, LenientString, LogEntry, PairList, Request, User, Values, }; use relay_general::types::Annotated; - use std::net::{IpAddr as NetIpAddr, Ipv4Addr}; - use std::str::FromStr; + + use super::*; fn eq(name: &str, value: &[&str], ignore_case: bool) -> RuleCondition { RuleCondition::Eq(EqCondition { diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 476428552d..0303c37f78 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -9,7 +9,6 @@ use actix::prelude::*; use chrono::{DateTime, Duration as SignedDuration, Utc}; use failure::Fail; use futures::prelude::*; -use relay_metrics::Metric; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, ProjectId, UnixTimestamp}; @@ -23,9 +22,10 @@ use relay_general::protocol::{ use relay_general::store::ClockDriftProcessor; use relay_general::types::{Annotated, Array, FromValue, Object, ProcessingAction, Value}; use relay_log::LogError; +use relay_metrics::Metric; use relay_quotas::{DataCategory, RateLimits}; use relay_redis::RedisPool; -use relay_sampling::RuleId; +use relay_sampling::{RuleId, SamplingResult}; use crate::actors::outcome::{DiscardReason, Outcome, OutcomeProducer, TrackOutcome}; use crate::actors::project::{ @@ -37,9 +37,7 @@ use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemTyp use crate::http::{HttpError, RequestBuilder}; use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers}; use crate::service::ServerError; -use crate::utils::{ - self, ChunkedFormDataAggregator, EnvelopeSummary, FormDataIter, FutureExt, SamplingResult, -}; +use crate::utils::{self, ChunkedFormDataAggregator, EnvelopeSummary, FormDataIter, FutureExt}; #[cfg(feature = "processing")] use { @@ -130,8 +128,11 @@ enum ProcessingError { #[fail(display = "event exceeded its configured lifetime")] Timeout, - #[fail(display = "envelope empty, transaction removed by sampling")] - TransactionSampled, + #[fail( + display = "envelope empty, transaction removed by sampling rule:{}", + _0 + )] + TransactionSampled(RuleId), #[fail(display = "envelope empty, event removed by sampling rule:{}", _0)] EventSampled(RuleId), @@ -173,7 +174,7 @@ impl ProcessingError { } // Dynamic sampling (not an error, just discarding messages that were removed by sampling) - Self::TransactionSampled => Some(Outcome::Invalid(DiscardReason::TransactionSampled)), + Self::TransactionSampled(rule_id) => Some(Outcome::FilteredSampling(rule_id)), Self::EventSampled(rule_id) => Some(Outcome::FilteredSampling(rule_id)), // If we send to an upstream, we don't emit outcomes. @@ -1554,14 +1555,7 @@ impl Handler for EventManager { })) .and_then(move |envelope| { utils::sample_transaction(envelope, sampling_project, false, processing_enabled) - .map_err(|()| (ProcessingError::TransactionSampled)) - }) - .and_then(|envelope| { - if envelope.is_empty() { - Err(ProcessingError::TransactionSampled) - } else { - Ok(envelope) - } + .map_err(|rule_id| (ProcessingError::TransactionSampled(rule_id))) }) .and_then(clone!(project, |envelope| { // get the state for the current project. we can always fetch the cached version @@ -1798,11 +1792,11 @@ impl Handler for EventManager { #[cfg(test)] mod tests { - use super::*; + use chrono::{DateTime, TimeZone, Utc}; use crate::extractors::RequestMeta; - use chrono::{DateTime, TimeZone, Utc}; + use super::*; fn create_breadcrumbs_item(breadcrumbs: &[(Option>, &str)]) -> Item { let mut data = Vec::new(); diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 899abe2f6d..6601525afb 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -16,6 +16,7 @@ use relay_config::Config; use relay_general::protocol::{EventId, EventType}; use relay_log::LogError; use relay_quotas::RateLimits; +use relay_sampling::RuleId; use crate::actors::events::{QueueEnvelope, QueueEnvelopeError}; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; @@ -82,7 +83,7 @@ pub enum BadStoreRequest { EventRejected(DiscardReason), #[fail(display = "envelope empty due to sampling")] - TraceSampled(Option), + TraceSampled(Option, RuleId), } impl BadStoreRequest { @@ -131,9 +132,7 @@ impl BadStoreRequest { .longest_error() .map(|r| Outcome::RateLimited(r.reason_code.clone())); } - //TODO fix this when we decide how to map empty Envelopes due to the trace being - //removed by sampling - BadStoreRequest::TraceSampled(_) => Outcome::Invalid(DiscardReason::Internal), + BadStoreRequest::TraceSampled(_, rule_id) => Outcome::FilteredSampling(*rule_id), // should actually never create an outcome BadStoreRequest::InvalidEventId => Outcome::Invalid(DiscardReason::Internal), @@ -503,10 +502,7 @@ where processing_enabled, ) .then(move |result| match result { - Err(()) => Err(BadStoreRequest::TraceSampled(event_id)), - Ok(envelope) if envelope.is_empty() => { - Err(BadStoreRequest::TraceSampled(event_id)) - } + Err(rule_id) => Err(BadStoreRequest::TraceSampled(event_id, rule_id)), Ok(envelope) => Ok((envelope, rate_limits, sampling_project)), }) }) @@ -564,7 +560,7 @@ where return Ok(create_response(*event_id.borrow())); } - if let BadStoreRequest::TraceSampled(event_id) = error { + if let BadStoreRequest::TraceSampled(event_id, _) = error { relay_log::debug!("creating response for trace sampled event"); return Ok(create_response(event_id)); } diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index 0f4d1d736f..ce6e668278 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -7,23 +7,12 @@ use futures::{future, prelude::*}; use relay_general::protocol::{Event, EventId}; use relay_sampling::{ - get_matching_event_rule, pseudo_random_from_uuid, rule_type_for_event, RuleId, + get_matching_event_rule, pseudo_random_from_uuid, rule_type_for_event, RuleId, SamplingResult, }; use crate::actors::project::{GetCachedProjectState, GetProjectState, Project, ProjectState}; use crate::envelope::{Envelope, ItemType}; -/// The result of a Sampling operation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum SamplingResult { - /// Keep the event. - Keep, - /// Drop the event, due to the rule with provided Id. - Drop(RuleId), - /// No decision can be made. - NoDecision, -} - /// Checks whether an event should be kept or removed by dynamic sampling. pub fn should_keep_event( event: &Event, @@ -68,21 +57,21 @@ fn sample_transaction_internal( mut envelope: Envelope, project_state: Option<&ProjectState>, processing_enabled: bool, -) -> Envelope { +) -> Result { let project_state = match project_state { - None => return envelope, + None => return Ok(envelope), Some(project_state) => project_state, }; let sampling_config = match project_state.config.dynamic_sampling { // without sampling config we cannot sample transactions so give up here - None => return envelope, + None => return Ok(envelope), Some(ref sampling_config) => sampling_config, }; // when we have unsupported rules disable sampling for non processing relays if !processing_enabled && sampling_config.has_unsupported_rules() { - return envelope; + return Ok(envelope); } let trace_context = envelope.trace_context(); @@ -90,26 +79,29 @@ fn sample_transaction_internal( let trace_context = match (trace_context, transaction_item) { // we don't have what we need, can't sample the transactions in this envelope - (None, _) | (_, None) => return envelope, + (None, _) | (_, None) => return Ok(envelope), // see if we need to sample the transaction (Some(trace_context), Some(_)) => trace_context, }; let client_ip = envelope.meta().client_addr(); - let should_sample = trace_context - // see if we should sample - .should_sample(client_ip, sampling_config) - // TODO verify that this is the desired behaviour (i.e. if we can't find a rule - // for sampling, include the transaction) - .unwrap_or(true); - - if !should_sample { - // finally we decided that we should sample the transaction - envelope.take_item_by(|item| item.ty() == ItemType::Transaction); + let should_keep = trace_context.should_keep(client_ip, sampling_config); + + match should_keep { + // if we don't have a decision yet keep the transaction + SamplingResult::Keep | SamplingResult::NoDecision => Ok(envelope), + SamplingResult::Drop(rule_id) => { + envelope.take_item_by(|item| item.ty() == ItemType::Transaction); + if envelope.is_empty() { + // if after we removed the transaction we ended up with an empty envelope + // return an error so we can generate an outcome for the rule that dropped the transaction + Err(rule_id) + } else { + Ok(envelope) + } + } } - - envelope } /// Check if we should remove transactions from this envelope (because of trace sampling) and @@ -119,7 +111,7 @@ pub fn sample_transaction( project: Option>, fast_processing: bool, processing_enabled: bool, -) -> ResponseFuture { +) -> ResponseFuture { let project = match project { None => return Box::new(future::ok(envelope)), Some(project) => project, @@ -141,11 +133,7 @@ pub fn sample_transaction( Err(_) => return Ok(envelope), Ok(project_state) => project_state, }; - Ok(sample_transaction_internal( - envelope, - project_state.as_deref(), - processing_enabled, - )) + sample_transaction_internal(envelope, project_state.as_deref(), processing_enabled) }); Box::new(fut) as ResponseFuture<_, _> } else { @@ -157,11 +145,11 @@ pub fn sample_transaction( Err(_) => return Ok(envelope), Ok(project_state) => project_state, }; - Ok(sample_transaction_internal( + sample_transaction_internal( envelope, project_state.ok().as_deref(), processing_enabled, - )) + ) }); Box::new(fut) as ResponseFuture<_, _> } @@ -169,13 +157,17 @@ pub fn sample_transaction( #[cfg(test)] mod tests { - use super::*; - use crate::actors::project::ProjectConfig; + use std::time::Instant; + + use smallvec::SmallVec; + use relay_common::EventType; use relay_general::types::Annotated; - use relay_sampling::SamplingConfig; - use smallvec::SmallVec; - use std::time::Instant; + use relay_sampling::{RuleId, SamplingConfig}; + + use crate::actors::project::ProjectConfig; + + use super::*; fn get_project_state(sample_rate: Option) -> ProjectState { let sampling_config_str = if let Some(sample_rate) = sample_rate { From 47a0654051e537ac385da678caede7e370fe6b0b Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Fri, 19 Mar 2021 12:19:17 +0100 Subject: [PATCH 2/7] minor format fix --- relay-server/src/actors/events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 0303c37f78..e304dbefc9 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1555,7 +1555,7 @@ impl Handler for EventManager { })) .and_then(move |envelope| { utils::sample_transaction(envelope, sampling_project, false, processing_enabled) - .map_err(|rule_id| (ProcessingError::TransactionSampled(rule_id))) + .map_err(ProcessingError::TransactionSampled) }) .and_then(clone!(project, |envelope| { // get the state for the current project. we can always fetch the cached version From 90297d600ab452100de11fcd083ca6c19b1073d7 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Fri, 19 Mar 2021 12:30:03 +0100 Subject: [PATCH 3/7] fix integration tests for transaction outcomes --- tests/integration/test_dynamic_sampling.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index 9f556247a2..5878a730cb 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -142,7 +142,7 @@ def test_it_removes_transactions(mini_sentry, relay): config = mini_sentry.add_basic_project_config(project_id) # add a sampling rule to project config that removes all transactions (sample_rate=0) public_key = config["publicKeys"][0]["publicKey"] - _add_sampling_config(config, sample_rate=0, rule_type="trace") + rules=_add_sampling_config(config, sample_rate=0, rule_type="trace") # create an envelope with a trace context that is initiated by this project (for simplicity) envelope = Envelope() @@ -159,9 +159,8 @@ def test_it_removes_transactions(mini_sentry, relay): outcomes = mini_sentry.captured_outcomes.get(timeout=2) assert outcomes is not None outcome = outcomes["outcomes"][0] - assert outcome.get("outcome") == 3 - assert outcome.get("reason") == "transaction_sampled" - + assert outcome.get("outcome") == 1 + assert outcome.get("reason") == f"Sampled:{rules[0]['id']}" def test_it_keeps_transactions(mini_sentry, relay): """ From 78cc640b200b3561521f481c4ab1e0f6e8dc1947 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Fri, 19 Mar 2021 17:06:07 +0100 Subject: [PATCH 4/7] unit test sample_transaction_internal --- relay-server/src/utils/dynamic_sampling.rs | 119 +++++++++++++++++++-- 1 file changed, 112 insertions(+), 7 deletions(-) diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index ce6e668278..45c6430857 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -159,28 +159,35 @@ pub fn sample_transaction( mod tests { use std::time::Instant; + use bytes::Bytes; use smallvec::SmallVec; use relay_common::EventType; use relay_general::types::Annotated; - use relay_sampling::{RuleId, SamplingConfig}; + use relay_sampling::{RuleId, RuleType, SamplingConfig}; use crate::actors::project::ProjectConfig; + use crate::envelope::Item; use super::*; - fn get_project_state(sample_rate: Option) -> ProjectState { + fn get_project_state(sample_rate: Option, rule_type: RuleType) -> ProjectState { let sampling_config_str = if let Some(sample_rate) = sample_rate { + let rt = match rule_type { + RuleType::Transaction => "transaction", + RuleType::Error => "error", + RuleType::Trace => "trace", + }; format!( r#"{{ "rules":[{{ "condition": {{ "op": "and", "inner":[]}}, "sampleRate": {}, - "type": "error", + "type": "{}", "id": 1 }}] }}"#, - sample_rate + sample_rate, rt ) } else { "{\"rules\":[]}".to_owned() @@ -203,6 +210,46 @@ mod tests { } } + /// ugly hack to build an envelope with an optional trace context + fn new_envelope(with_trace_context: bool) -> Envelope { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"; + let event_id = EventId::new(); + + let raw_event = if with_trace_context { + let trace_id = uuid::Uuid::new_v4(); + let public_key = "12345678901234567890123456789012"; + let trace_context_raw = format!( + r#"{{"trace_id": "{}", "public_key": "{}"}}"#, + trace_id.to_simple(), + public_key, + ); + format!( + "{{\"event_id\":\"{}\",\"dsn\":\"{}\", \"trace\": {}}}\n", + event_id.0.to_simple(), + dsn, + trace_context_raw, + ) + } else { + format!( + "{{\"event_id\":\"{}\",\"dsn\":\"{}\"}}\n", + event_id.0.to_simple(), + dsn, + ) + }; + + let bytes = Bytes::from(raw_event); + + let mut envelope = Envelope::parse_bytes(bytes).unwrap(); + + let item1 = Item::new(ItemType::Transaction); + envelope.add_item(item1); + + let item2 = Item::new(ItemType::Event); + envelope.add_item(item2); + + envelope + } + #[test] /// Should_keep_event returns the expected results. fn test_should_keep_event() { @@ -212,21 +259,79 @@ mod tests { ..Event::default() }; - let proj_state = get_project_state(Some(0.0)); + let proj_state = get_project_state(Some(0.0), RuleType::Error); assert_eq!( SamplingResult::Drop(RuleId(1)), should_keep_event(&event, None, &proj_state, true) ); - let proj_state = get_project_state(Some(1.0)); + let proj_state = get_project_state(Some(1.0), RuleType::Error); assert_eq!( SamplingResult::Keep, should_keep_event(&event, None, &proj_state, true) ); - let proj_state = get_project_state(None); + let proj_state = get_project_state(None, RuleType::Error); assert_eq!( SamplingResult::NoDecision, should_keep_event(&event, None, &proj_state, true) ); } + + #[test] + /// Should remove transaction from envelope when a matching rule is detected + fn test_should_drop_transaction() { + //create an envelope with a event and a transaction + let envelope = new_envelope(true); + let state = get_project_state(Some(0.0), RuleType::Trace); + + let result = sample_transaction_internal(envelope, Some(&state), true); + assert!(result.is_ok()); + let envelope = result.unwrap(); + // the transaction item should have been removed + assert_eq!(envelope.len(), 1); + } + + #[test] + /// Should keep transaction when no trace context is present + fn test_should_keep_transaction_no_trace() { + //create an envelope with a event and a transaction + let envelope = new_envelope(false); + let state = get_project_state(Some(0.0), RuleType::Trace); + + let result = sample_transaction_internal(envelope, Some(&state), true); + assert!(result.is_ok()); + let envelope = result.unwrap(); + // both the event and the transaction item should have been left in the envelope + assert_eq!(envelope.len(), 2); + } + + #[test] + /// When no state is provided the envelope should be left unchanged + fn test_should_keep_transactions_no_state() { + //create an envelope with a event and a transaction + let envelope = new_envelope(true); + + let result = sample_transaction_internal(envelope, None, true); + assert!(result.is_ok()); + let envelope = result.unwrap(); + // both the event and the transaction item should have been left in the envelope + assert_eq!(envelope.len(), 2); + } + + #[test] + /// When the envelope becomes empty due to sampling we should get back the rule that dropped the + /// transaction + fn test_should_signal_when_envelope_becomes_empty() { + //create an envelope with a event and a transaction + let mut envelope = new_envelope(true); + //remove the event (so the envelope contains only a transaction + envelope.take_item_by(|item| item.ty() == ItemType::Event); + let state = get_project_state(Some(0.0), RuleType::Trace); + + let result = sample_transaction_internal(envelope, Some(&state), true); + assert!(result.is_err()); + let rule_id = result.unwrap_err(); + // we got back the rule id + assert_eq!(rule_id, RuleId(1)); + } } From 89df89117d00558ee1ab3261b84b7a7c90859b37 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Fri, 19 Mar 2021 17:10:28 +0100 Subject: [PATCH 5/7] lint python --- tests/integration/test_dynamic_sampling.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index 5878a730cb..ee58986b26 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -142,7 +142,7 @@ def test_it_removes_transactions(mini_sentry, relay): config = mini_sentry.add_basic_project_config(project_id) # add a sampling rule to project config that removes all transactions (sample_rate=0) public_key = config["publicKeys"][0]["publicKey"] - rules=_add_sampling_config(config, sample_rate=0, rule_type="trace") + rules = _add_sampling_config(config, sample_rate=0, rule_type="trace") # create an envelope with a trace context that is initiated by this project (for simplicity) envelope = Envelope() @@ -162,6 +162,7 @@ def test_it_removes_transactions(mini_sentry, relay): assert outcome.get("outcome") == 1 assert outcome.get("reason") == f"Sampled:{rules[0]['id']}" + def test_it_keeps_transactions(mini_sentry, relay): """ Tests that when sampling is set to 100% for the trace context project the transactions are kept From 3bac82ff9de803fd4eaefa9c112bce3a4473e879 Mon Sep 17 00:00:00 2001 From: Radu Woinaroski Date: Fri, 19 Mar 2021 17:12:28 +0100 Subject: [PATCH 6/7] add change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58474f34ee..2518c657e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Emit the `quantity` field for outcomes of events. This field describes the total size in bytes for attachments or the event count for all other categories. A separate outcome is emitted for attachments in a rejected envelope, if any, in addition to the event outcome. ([#942](https://github.com/getsentry/relay/pull/942)) - Add experimental metrics ingestion without bucketing or pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948)) - Disallow empty metric names, require alphabetic start. ([#952](https://github.com/getsentry/relay/pull/952)) +- Add rule id to outcomes coming from transaction sampling. ([#953](https://github.com/getsentry/relay/pull/953)) ## 21.3.0 From 17959b8cceeb6ca33d9553732b7ee261de957a96 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 22 Mar 2021 18:12:36 +0100 Subject: [PATCH 7/7] ref: Simplify --- relay-sampling/src/lib.rs | 13 +++--- relay-server/src/actors/events.rs | 15 +++---- relay-server/src/endpoints/common.rs | 17 +++----- relay-server/src/utils/dynamic_sampling.rs | 47 ++++++++++++---------- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/relay-sampling/src/lib.rs b/relay-sampling/src/lib.rs index a0fcfba123..c338c04398 100644 --- a/relay-sampling/src/lib.rs +++ b/relay-sampling/src/lib.rs @@ -30,14 +30,14 @@ pub enum RuleType { Error, } -/// The result of a Sampling operation. +/// The result of a sampling operation returned by [`TraceContext::should_keep`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SamplingResult { /// Keep the event. Keep, - /// Drop the event, due to the rule with provided Id. + /// Drop the event, due to the rule with provided identifier. Drop(RuleId), - /// No decision can be made. + /// No decision can be made. NoDecision, } @@ -503,10 +503,11 @@ pub struct TraceContext { } impl TraceContext { - /// Returns the sampling decision of whether to drop or not a transaction based on the configuration rules. + /// Returns whether a trace should be retained based on sampling rules. /// - /// If None then a decision can't be made either because of an invalid of missing trace context or - /// because no applicable sampling rule could be found. + /// If [`SamplingResult::NoDecision`] is returned, then no rule matched this trace. In this + /// case, the caller may decide whether to keep the trace or not. The same is returned if the + /// configuration is invalid. pub fn should_keep(&self, ip_addr: Option, config: &SamplingConfig) -> SamplingResult { if let Some(rule) = get_matching_trace_rule(config, self, ip_addr, RuleType::Trace) { let rate = match pseudo_random_from_uuid(self.trace_id) { diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index e304dbefc9..ce06894c4c 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -128,13 +128,10 @@ enum ProcessingError { #[fail(display = "event exceeded its configured lifetime")] Timeout, - #[fail( - display = "envelope empty, transaction removed by sampling rule:{}", - _0 - )] - TransactionSampled(RuleId), + #[fail(display = "trace dropped by sampling rule {}", _0)] + TraceSampled(RuleId), - #[fail(display = "envelope empty, event removed by sampling rule:{}", _0)] + #[fail(display = "event dropped by sampling rule {}", _0)] EventSampled(RuleId), } @@ -174,7 +171,7 @@ impl ProcessingError { } // Dynamic sampling (not an error, just discarding messages that were removed by sampling) - Self::TransactionSampled(rule_id) => Some(Outcome::FilteredSampling(rule_id)), + Self::TraceSampled(rule_id) => Some(Outcome::FilteredSampling(rule_id)), Self::EventSampled(rule_id) => Some(Outcome::FilteredSampling(rule_id)), // If we send to an upstream, we don't emit outcomes. @@ -1554,8 +1551,8 @@ impl Handler for EventManager { } })) .and_then(move |envelope| { - utils::sample_transaction(envelope, sampling_project, false, processing_enabled) - .map_err(ProcessingError::TransactionSampled) + utils::sample_trace(envelope, sampling_project, false, processing_enabled) + .map_err(ProcessingError::TraceSampled) }) .and_then(clone!(project, |envelope| { // get the state for the current project. we can always fetch the cached version diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 6601525afb..fb78b4027a 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -83,7 +83,7 @@ pub enum BadStoreRequest { EventRejected(DiscardReason), #[fail(display = "envelope empty due to sampling")] - TraceSampled(Option, RuleId), + TraceSampled(RuleId), } impl BadStoreRequest { @@ -132,7 +132,7 @@ impl BadStoreRequest { .longest_error() .map(|r| Outcome::RateLimited(r.reason_code.clone())); } - BadStoreRequest::TraceSampled(_, rule_id) => Outcome::FilteredSampling(*rule_id), + BadStoreRequest::TraceSampled(rule_id) => Outcome::FilteredSampling(*rule_id), // should actually never create an outcome BadStoreRequest::InvalidEventId => Outcome::Invalid(DiscardReason::Internal), @@ -491,18 +491,14 @@ where Box::new(response) as RetVal })) .and_then(move |(envelope, rate_limits, sampling_project)| { - // do the fast path transaction sampling (if we can't do it here - // we'll try again after the envelope is queued) - let event_id = envelope.event_id(); - - utils::sample_transaction( + utils::sample_trace( envelope, sampling_project.clone(), true, processing_enabled, ) .then(move |result| match result { - Err(rule_id) => Err(BadStoreRequest::TraceSampled(event_id, rule_id)), + Err(rule_id) => Err(BadStoreRequest::TraceSampled(rule_id)), Ok(envelope) => Ok((envelope, rate_limits, sampling_project)), }) }) @@ -560,9 +556,8 @@ where return Ok(create_response(*event_id.borrow())); } - if let BadStoreRequest::TraceSampled(event_id, _) = error { - relay_log::debug!("creating response for trace sampled event"); - return Ok(create_response(event_id)); + if matches!(error, BadStoreRequest::TraceSampled(_)) { + return Ok(create_response(*event_id.borrow())); } let response = error.error_response(); diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index 45c6430857..a3b29a7b84 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -50,9 +50,11 @@ pub fn should_keep_event( SamplingResult::NoDecision } -/// Takes an envelope and potentially removes the transaction item from it if that -/// transaction item should be sampled out according to the dynamic sampling configuration -/// and the trace context. +/// Execute dynamic sampling on an envelope using the provided project state. +/// +/// This function potentially removes the transaction item from the envelpoe if that transaction +/// item should be sampled out according to the dynamic sampling configuration and the trace +/// context. fn sample_transaction_internal( mut envelope: Envelope, project_state: Option<&ProjectState>, @@ -85,28 +87,31 @@ fn sample_transaction_internal( }; let client_ip = envelope.meta().client_addr(); - - let should_keep = trace_context.should_keep(client_ip, sampling_config); - - match should_keep { - // if we don't have a decision yet keep the transaction - SamplingResult::Keep | SamplingResult::NoDecision => Ok(envelope), - SamplingResult::Drop(rule_id) => { - envelope.take_item_by(|item| item.ty() == ItemType::Transaction); - if envelope.is_empty() { - // if after we removed the transaction we ended up with an empty envelope - // return an error so we can generate an outcome for the rule that dropped the transaction - Err(rule_id) - } else { - Ok(envelope) - } + if let SamplingResult::Drop(rule_id) = trace_context.should_keep(client_ip, sampling_config) { + // TODO: Remove event-dependent items similar to `EnvelopeLimiter`. + envelope.take_item_by(|item| item.ty() == ItemType::Transaction); + if envelope.is_empty() { + // if after we removed the transaction we ended up with an empty envelope + // return an error so we can generate an outcome for the rule that dropped the transaction + Err(rule_id) + } else { + Ok(envelope) } + } else { + // if we don't have a decision yet keep the transaction + Ok(envelope) } } -/// Check if we should remove transactions from this envelope (because of trace sampling) and -/// return what is left of the envelope. -pub fn sample_transaction( +/// Execute dynamic sampling on the given envelope. +/// +/// Computes a sampling decision based on the envelope's trace context and sampling rules in the +/// provided project. If the trace is to be dropped, transaction-related items are removed from the +/// envelope. Trace sampling never applies to error events. +/// +/// Returns `Ok` if there are remaining items in the envelope. Returns `Err` with the matching rule +/// identifier if all elements have been removed. +pub fn sample_trace( envelope: Envelope, project: Option>, fast_processing: bool,