From d907bfda853f9d694f258a666d4b4c896a289150 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 4 Apr 2024 11:18:53 +0200 Subject: [PATCH 01/22] ref: Drive-by refactor --- relay-server/src/services/processor.rs | 1 + relay-server/src/services/processor/span.rs | 19 ++++++++----------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 07520e20379..1d925a11ce5 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1525,6 +1525,7 @@ impl EnvelopeProcessorService { state: &mut ProcessEnvelopeState<SpanGroup>, ) -> Result<(), ProcessingError> { span::filter(state); + if_processing!(self.inner.config, { span::process( state, diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index b2fe29691d1..58f2e57052b 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -6,7 +6,7 @@ use relay_event_schema::protocol::{Event, Span}; use relay_protocol::Annotated; use crate::services::processor::SpanGroup; -use crate::{envelope::ItemType, services::processor::ProcessEnvelopeState, utils::ItemAction}; +use crate::{services::processor::ProcessEnvelopeState, utils::ItemAction}; #[cfg(feature = "processing")] mod processing; @@ -14,19 +14,16 @@ mod processing; pub use processing::*; pub fn filter(state: &mut ProcessEnvelopeState<SpanGroup>) { - let standalone_span_ingestion_enabled = state + let standalone_span_ingestion_disabled = !state .project_state .has_feature(Feature::StandaloneSpanIngestion); - state.managed_envelope.retain_items(|item| match item.ty() { - ItemType::OtelSpan | ItemType::Span => { - if !standalone_span_ingestion_enabled { - relay_log::warn!("dropping span because feature is disabled"); - ItemAction::DropSilently - } else { - ItemAction::Keep - } + state.managed_envelope.retain_items(|item| { + if item.is_span() && standalone_span_ingestion_disabled { + relay_log::warn!("dropping span because feature is disabled"); + ItemAction::DropSilently + } else { + ItemAction::Keep } - _ => ItemAction::Keep, }); } From aa8df487919ab09f2e9060dcd460395c81b3d20f Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 4 Apr 2024 14:16:22 +0200 Subject: [PATCH 02/22] wip --- .../src/protocol/span/convert.rs | 16 ++- relay-server/src/services/processor.rs | 106 ++++++++++-------- relay-server/src/services/processor/span.rs | 2 +- .../src/services/processor/span/processing.rs | 15 +++ 4 files changed, 88 insertions(+), 51 deletions(-) diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index d725ee90185..d1054377ad1 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -78,9 +78,15 @@ macro_rules! map_fields { } #[allow(clippy::needless_update)] - impl From<&Span> for Event { - fn from(span: &Span) -> Self { - Self { + impl TryFrom<&Span> for Event { + type Error = (); + + fn try_from(span: &Span) -> Result<Self, ()> { + if !span.is_segment.value().unwrap_or(&false) { + // Only segment spans can become transactions. + return Err(()); + } + Ok(Self { $( $event_field: span.$span_field.clone(), )* @@ -102,7 +108,7 @@ macro_rules! map_fields { ) ), ..Default::default() - } + }) } } }; @@ -252,7 +258,7 @@ mod tests { } "###); - let roundtripped = Event::from(&span_from_event); + let roundtripped = Event::try_from(&span_from_event).unwrap(); assert_eq!(event, roundtripped); } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 1d925a11ce5..2dec6572666 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -909,12 +909,7 @@ pub struct EnvelopeProcessorService { inner: Arc<InnerProcessor>, } -struct InnerProcessor { - config: Arc<Config>, - global_config: GlobalConfigHandle, - cogs: Cogs, - #[cfg(feature = "processing")] - redis_pool: Option<RedisPool>, +struct Addrs { project_cache: Addr<ProjectCache>, outcome_aggregator: Addr<TrackOutcome>, #[cfg(feature = "processing")] @@ -922,6 +917,17 @@ struct InnerProcessor { upstream_relay: Addr<UpstreamRelay>, test_store: Addr<TestStore>, #[cfg(feature = "processing")] + store_forwarder: Option<Addr<Store>>, +} + +struct InnerProcessor { + config: Arc<Config>, + global_config: GlobalConfigHandle, + cogs: Cogs, + #[cfg(feature = "processing")] + redis_pool: Option<RedisPool>, + addrs: Addrs, + #[cfg(feature = "processing")] rate_limiter: Option<RedisRateLimiter>, geoip_lookup: Option<GeoIpLookup>, #[cfg(feature = "processing")] @@ -929,8 +935,6 @@ struct InnerProcessor { #[cfg(feature = "processing")] cardinality_limiter: Option<CardinalityLimiter>, #[cfg(feature = "processing")] - store_forwarder: Option<Addr<Store>>, - #[cfg(feature = "processing")] metric_stats: MetricStats, } @@ -969,14 +973,18 @@ impl EnvelopeProcessorService { rate_limiter: redis .clone() .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), - project_cache, - outcome_aggregator, - upstream_relay, - test_store, + addrs: Addrs { + project_cache, + outcome_aggregator, + upstream_relay, + test_store, + #[cfg(feature = "processing")] + aggregator, + #[cfg(feature = "processing")] + store_forwarder, + }, geoip_lookup, #[cfg(feature = "processing")] - aggregator, - #[cfg(feature = "processing")] metric_meta_store: redis.clone().map(|pool| { RedisMetricMetaStore::new(pool, config.metrics_meta_locations_expiry()) }), @@ -994,8 +1002,6 @@ impl EnvelopeProcessorService { }) .map(CardinalityLimiter::new), #[cfg(feature = "processing")] - store_forwarder, - #[cfg(feature = "processing")] metric_stats, config, }; @@ -1119,6 +1125,7 @@ impl EnvelopeProcessorService { if limits.is_limited() { self.inner + .addrs .project_cache .send(UpdateRateLimits::new(scoping.project_key, limits)); } @@ -1131,7 +1138,7 @@ impl EnvelopeProcessorService { enforcement.track_outcomes( state.envelope(), &state.managed_envelope.scoping(), - self.inner.outcome_aggregator.clone(), + self.inner.addrs.outcome_aggregator.clone(), ); Ok(()) @@ -1487,7 +1494,7 @@ impl EnvelopeProcessorService { report::process_client_reports( state, &self.inner.config, - self.inner.outcome_aggregator.clone(), + self.inner.addrs.outcome_aggregator.clone(), ); Ok(()) @@ -1526,12 +1533,10 @@ impl EnvelopeProcessorService { ) -> Result<(), ProcessingError> { span::filter(state); + let global_config = self.inner.global_config.current(); + if_processing!(self.inner.config, { - span::process( - state, - self.inner.config.clone(), - &self.inner.global_config.current(), - ); + span::process(state, self.inner.config.clone(), &global_config); self.enforce_quotas(state)?; }); Ok(()) @@ -1692,7 +1697,7 @@ impl EnvelopeProcessorService { state.extracted_metrics.send_metrics( state.managed_envelope.envelope(), - self.inner.project_cache.clone(), + self.inner.addrs.project_cache.clone(), ); let envelope_response = if state.managed_envelope.envelope().is_empty() { @@ -1815,6 +1820,7 @@ impl EnvelopeProcessorService { relay_log::trace!("merging metric buckets into project cache"); self.inner + .addrs .project_cache .send(MergeBuckets::new(public_key, buckets)); } @@ -1865,6 +1871,7 @@ impl EnvelopeProcessorService { relay_log::trace!("merging metric buckets into project cache"); self.inner + .addrs .project_cache .send(MergeBuckets::new(public_key, buckets)); } @@ -1891,6 +1898,7 @@ impl EnvelopeProcessorService { Ok(meta) => { relay_log::trace!("adding metric metadata to project cache"); self.inner + .addrs .project_cache .send(AddMetricMeta { project_key, meta }); } @@ -1925,7 +1933,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { - if let Some(store_forwarder) = self.inner.store_forwarder.clone() { + if let Some(store_forwarder) = self.inner.addrs.store_forwarder.clone() { relay_log::trace!("sending envelope to kafka"); store_forwarder.send(StoreEnvelope { envelope }); return; @@ -1935,7 +1943,10 @@ impl EnvelopeProcessorService { // If we are in capture mode, we stash away the event instead of forwarding it. if Capture::should_capture(&self.inner.config) { relay_log::trace!("capturing envelope in memory"); - self.inner.test_store.send(Capture::accepted(envelope)); + self.inner + .addrs + .test_store + .send(Capture::accepted(envelope)); return; } @@ -1954,12 +1965,15 @@ impl EnvelopeProcessorService { match result { Ok(body) => { - self.inner.upstream_relay.send(SendRequest(SendEnvelope { - envelope, - body, - http_encoding, - project_cache: self.inner.project_cache.clone(), - })); + self.inner + .addrs + .upstream_relay + .send(SendRequest(SendEnvelope { + envelope, + body, + http_encoding, + project_cache: self.inner.addrs.project_cache.clone(), + })); } Err(error) => { // Errors are only logged for what we consider an internal discard reason. These @@ -1993,8 +2007,8 @@ impl EnvelopeProcessorService { let envelope = ManagedEnvelope::standalone( envelope, - self.inner.outcome_aggregator.clone(), - self.inner.test_store.clone(), + self.inner.addrs.outcome_aggregator.clone(), + self.inner.addrs.test_store.clone(), ProcessingGroup::ClientReport, ); self.handle_submit_envelope(SubmitEnvelope { @@ -2037,11 +2051,12 @@ impl EnvelopeProcessorService { if rate_limits.is_limited() { let was_enforced = bucket_limiter - .enforce_limits(&rate_limits, self.inner.outcome_aggregator.clone()); + .enforce_limits(&rate_limits, self.inner.addrs.outcome_aggregator.clone()); if was_enforced { // Update the rate limits in the project cache. self.inner + .addrs .project_cache .send(UpdateRateLimits::new(scoping.project_key, rate_limits)); } @@ -2053,6 +2068,7 @@ impl EnvelopeProcessorService { if !buckets.is_empty() { self.inner + .addrs .aggregator .send(MergeBuckets::new(project_key, buckets)); } @@ -2111,13 +2127,13 @@ impl EnvelopeProcessorService { let reason_code = limits.longest().and_then(|limit| limit.reason_code.clone()); utils::reject_metrics( - &self.inner.outcome_aggregator, + &self.inner.addrs.outcome_aggregator, quantities, *item_scoping.scoping, Outcome::RateLimited(reason_code), ); - self.inner.project_cache.send(UpdateRateLimits::new( + self.inner.addrs.project_cache.send(UpdateRateLimits::new( item_scoping.scoping.project_key, limits, )); @@ -2199,7 +2215,7 @@ impl EnvelopeProcessorService { // Log outcomes for rejected buckets. utils::reject_metrics( - &self.inner.outcome_aggregator, + &self.inner.addrs.outcome_aggregator, utils::extract_metric_quantities(limits.rejected(), mode), scoping, Outcome::CardinalityLimited, @@ -2308,8 +2324,8 @@ impl EnvelopeProcessorService { let mut envelope = ManagedEnvelope::standalone( envelope, - self.inner.outcome_aggregator.clone(), - self.inner.test_store.clone(), + self.inner.addrs.outcome_aggregator.clone(), + self.inner.addrs.test_store.clone(), ProcessingGroup::Metrics, ); envelope.set_partition_key(partition_key).scope(scoping); @@ -2354,10 +2370,10 @@ impl EnvelopeProcessorService { encoded, http_encoding, quantities, - outcome_aggregator: self.inner.outcome_aggregator.clone(), + outcome_aggregator: self.inner.addrs.outcome_aggregator.clone(), }; - self.inner.upstream_relay.send(SendRequest(request)); + self.inner.addrs.upstream_relay.send(SendRequest(request)); } /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint. @@ -2416,7 +2432,7 @@ impl EnvelopeProcessorService { fn handle_encode_metrics(&self, message: EncodeMetrics) { #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { - if let Some(ref store_forwarder) = self.inner.store_forwarder { + if let Some(ref store_forwarder) = self.inner.addrs.store_forwarder { return self.encode_metrics_processing(message, store_forwarder); } } @@ -2450,8 +2466,8 @@ impl EnvelopeProcessorService { let envelope = ManagedEnvelope::standalone( envelope, - self.inner.outcome_aggregator.clone(), - self.inner.test_store.clone(), + self.inner.addrs.outcome_aggregator.clone(), + self.inner.addrs.test_store.clone(), ProcessingGroup::Metrics, ); self.handle_submit_envelope(SubmitEnvelope { diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index 58f2e57052b..6a6147fc889 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -1,6 +1,6 @@ //! Processor code related to standalone spans. -use relay_dynamic_config::Feature; +use relay_dynamic_config::{Feature, GlobalConfig}; use relay_event_normalization::span::tag_extraction; use relay_event_schema::protocol::{Event, Span}; use relay_protocol::Annotated; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index f59a7ddc8f5..c5581880ba8 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -81,6 +81,16 @@ pub fn process( _ => return ItemAction::Keep, }; + if let Some(transaction) = convert_to_transaction(&annotated_span) { + // TODO: check version in global config + // TODO: make sure that transaction is normalized, even if processing relay is configured to + // skip normalization. + // TODO: test performance score total metric is extracted only once. + // TODO: item headers that prevent circular extraction + + todo!(); + } + if let Err(e) = normalize( &mut annotated_span, normalize_span_config.clone(), @@ -491,3 +501,8 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> Ok(span) } + +fn convert_to_transaction(annotated_span: &Annotated<Span>) -> Option<Event> { + let span = annotated_span.value()?; + Event::try_from(span).ok() +} From e16ad3aebd8097cb0d74008430bb4b8ed492f7c9 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 4 Apr 2024 15:53:58 +0200 Subject: [PATCH 03/22] Compiling envelope copy --- relay-sampling/src/evaluation.rs | 5 +++ relay-server/src/envelope.rs | 17 +++++++- relay-server/src/service.rs | 22 +++++----- relay-server/src/services/processor.rs | 42 ++++++++----------- relay-server/src/services/processor/span.rs | 2 +- .../src/services/processor/span/processing.rs | 39 ++++++++++++----- relay-server/src/testutils.rs | 25 ++++++----- 7 files changed, 94 insertions(+), 58 deletions(-) diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index 5f2747d27d0..c183576f22e 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -65,6 +65,11 @@ impl<'a> ReservoirEvaluator<'a> { } } + /// Returns a shared reference to the reservoir counters. + pub fn counters(&self) -> ReservoirCounters { + self.counters.clone() + } + /// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`]. /// /// These values are needed to synchronize with Redis. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 3050a43d9f6..cf9a2050410 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -42,7 +42,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use relay_dynamic_config::ErrorBoundary; use relay_event_normalization::{normalize_transaction_name, TransactionNameRule}; -use relay_event_schema::protocol::{EventId, EventType}; +use relay_event_schema::protocol::{Event, EventId, EventType}; use relay_protocol::{Annotated, Value}; use relay_quotas::DataCategory; use relay_sampling::DynamicSamplingContext; @@ -1060,6 +1060,21 @@ impl Envelope { Box::new(Self { items, headers }) } + /// Creates an envelope from an event, plus headers. + pub fn try_from_event( + mut headers: EnvelopeHeaders, + event: Event, + ) -> Result<Box<Self>, serde_json::Error> { + headers.event_id = event.id.value().copied(); + let event_type = event.ty.value().copied().unwrap_or_default(); + + let serialized = Annotated::new(event).to_json()?; + let mut item = Item::new(ItemType::from_event_type(event_type)); + item.set_payload(ContentType::Json, serialized); + + Ok(Self::from_parts(headers, smallvec::smallvec![item])) + } + /// Creates an envelope from request information. pub fn from_request(event_id: Option<EventId>, meta: RequestMeta) -> Box<Self> { Box::new(Self { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index d41fc3a8dbc..fd2e535114f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -20,7 +20,7 @@ use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; use crate::services::health_check::{HealthCheck, HealthCheckService}; use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome}; use crate::services::outcome_aggregator::OutcomeAggregator; -use crate::services::processor::{EnvelopeProcessor, EnvelopeProcessorService}; +use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; use crate::services::project_cache::{ProjectCache, ProjectCacheService, Services}; use crate::services::relays::{RelayCache, RelayCacheService}; #[cfg(feature = "processing")] @@ -172,15 +172,17 @@ impl ServiceState { cogs, #[cfg(feature = "processing")] redis_pool.clone(), - outcome_aggregator.clone(), - project_cache.clone(), - upstream_relay.clone(), - test_store.clone(), - #[cfg(feature = "processing")] - aggregator.clone(), - #[cfg(feature = "processing")] - store.clone(), - #[cfg(feature = "processing")] + processor::Addrs { + envelope_processor: processor.clone(), + project_cache: project_cache.clone(), + outcome_aggregator: outcome_aggregator.clone(), + upstream_relay: upstream_relay.clone(), + test_store: test_store.clone(), + #[cfg(feature = "processing")] + aggregator: aggregator.clone(), + #[cfg(feature = "processing")] + store_forwarder: store.clone(), + }, metric_stats, ) .spawn_handler(processor_rx); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 2dec6572666..3c5ca839b5a 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -909,15 +909,17 @@ pub struct EnvelopeProcessorService { inner: Arc<InnerProcessor>, } -struct Addrs { - project_cache: Addr<ProjectCache>, - outcome_aggregator: Addr<TrackOutcome>, +/// Contains the addresses of services that the processor publishes to. +pub struct Addrs { + pub envelope_processor: Addr<EnvelopeProcessor>, + pub project_cache: Addr<ProjectCache>, + pub outcome_aggregator: Addr<TrackOutcome>, #[cfg(feature = "processing")] - aggregator: Addr<Aggregator>, - upstream_relay: Addr<UpstreamRelay>, - test_store: Addr<TestStore>, + pub aggregator: Addr<Aggregator>, + pub upstream_relay: Addr<UpstreamRelay>, + pub test_store: Addr<TestStore>, #[cfg(feature = "processing")] - store_forwarder: Option<Addr<Store>>, + pub store_forwarder: Option<Addr<Store>>, } struct InnerProcessor { @@ -940,18 +942,12 @@ struct InnerProcessor { impl EnvelopeProcessorService { /// Creates a multi-threaded envelope processor. - #[allow(clippy::too_many_arguments)] pub fn new( config: Arc<Config>, global_config: GlobalConfigHandle, cogs: Cogs, #[cfg(feature = "processing")] redis: Option<RedisPool>, - outcome_aggregator: Addr<TrackOutcome>, - project_cache: Addr<ProjectCache>, - upstream_relay: Addr<UpstreamRelay>, - test_store: Addr<TestStore>, - #[cfg(feature = "processing")] aggregator: Addr<Aggregator>, - #[cfg(feature = "processing")] store_forwarder: Option<Addr<Store>>, + addrs: Addrs, #[cfg(feature = "processing")] metric_stats: MetricStats, ) -> Self { let geoip_lookup = config.geoip_path().and_then(|p| { @@ -973,16 +969,7 @@ impl EnvelopeProcessorService { rate_limiter: redis .clone() .map(|pool| RedisRateLimiter::new(pool).max_limit(config.max_rate_limit())), - addrs: Addrs { - project_cache, - outcome_aggregator, - upstream_relay, - test_store, - #[cfg(feature = "processing")] - aggregator, - #[cfg(feature = "processing")] - store_forwarder, - }, + addrs, geoip_lookup, #[cfg(feature = "processing")] metric_meta_store: redis.clone().map(|pool| { @@ -1536,7 +1523,12 @@ impl EnvelopeProcessorService { let global_config = self.inner.global_config.current(); if_processing!(self.inner.config, { - span::process(state, self.inner.config.clone(), &global_config); + span::process( + state, + self.inner.config.clone(), + &global_config, + &self.inner.addrs, + ); self.enforce_quotas(state)?; }); Ok(()) diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index 6a6147fc889..58f2e57052b 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -1,6 +1,6 @@ //! Processor code related to standalone spans. -use relay_dynamic_config::{Feature, GlobalConfig}; +use relay_dynamic_config::Feature; use relay_event_normalization::span::tag_extraction; use relay_event_schema::protocol::{Event, Span}; use relay_protocol::Annotated; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index c5581880ba8..b0c45123018 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -20,19 +20,21 @@ use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty}; use relay_spans::{otel_to_sentry_span, otel_trace::Span as OtelSpan}; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::{ContentType, Envelope, Item, ItemType}; use crate::metrics_extraction::generic::extract_metrics; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ - ProcessEnvelopeState, ProcessingError, SpanGroup, TransactionGroup, + Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup, + TransactionGroup, }; -use crate::utils::ItemAction; +use crate::utils::{ItemAction, ManagedEnvelope}; pub fn process( state: &mut ProcessEnvelopeState<SpanGroup>, config: Arc<Config>, global_config: &GlobalConfig, + addrs: &Addrs, ) { use relay_event_normalization::RemoveOtherProcessor; @@ -61,6 +63,12 @@ pub fn process( &user_agent_info, ); + // Ownership aerobics: + let envelope_headers = state.envelope().headers().clone(); // TODO(jjbayer): Would be nice to not clone here. + let project_state = state.project_state.clone(); + let sampling_project_state = state.sampling_project_state.clone(); + let reservoir_counters = state.reservoir.counters(); + state.managed_envelope.retain_items(|item| { let mut annotated_span = match item.ty() { ItemType::OtelSpan => match serde_json::from_slice::<OtelSpan>(&item.payload()) { @@ -82,13 +90,24 @@ pub fn process( }; if let Some(transaction) = convert_to_transaction(&annotated_span) { - // TODO: check version in global config - // TODO: make sure that transaction is normalized, even if processing relay is configured to - // skip normalization. - // TODO: test performance score total metric is extracted only once. - // TODO: item headers that prevent circular extraction - - todo!(); + match Envelope::try_from_event(envelope_headers.clone(), transaction) { + Ok(envelope) => { + addrs.envelope_processor.send(ProcessEnvelope { + envelope: ManagedEnvelope::standalone( + envelope, + addrs.outcome_aggregator.clone(), + addrs.test_store.clone(), + ProcessingGroup::Transaction, + ), + project_state: project_state.clone(), + sampling_project_state: sampling_project_state.clone(), + reservoir_counters: reservoir_counters.clone(), + }); + } + Err(e) => { + relay_log::error!("Failed to create event envelope: {e}"); + } + } } if let Err(e) = normalize( diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 01382d23906..b4b7382d174 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -18,7 +18,7 @@ use crate::extractors::RequestMeta; use crate::metric_stats::MetricStats; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; -use crate::services::processor::EnvelopeProcessorService; +use crate::services::processor::{self, EnvelopeProcessorService}; use crate::services::project::ProjectState; use crate::services::test_store::TestStore; @@ -119,7 +119,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] - let (_aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); + let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); #[cfg(feature = "processing")] let redis = config @@ -134,19 +134,22 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { Cogs::noop(), #[cfg(feature = "processing")] redis, - outcome_aggregator, - project_cache, - upstream_relay, - test_store, - #[cfg(feature = "processing")] - _aggregator.clone(), - #[cfg(feature = "processing")] - None, + processor::Addrs { + envelope_processor: Addr::custom().0, + outcome_aggregator, + project_cache, + upstream_relay, + test_store, + #[cfg(feature = "processing")] + aggregator: aggregator.clone(), + #[cfg(feature = "processing")] + store_forwarder: None, + }, #[cfg(feature = "processing")] MetricStats::new( config, GlobalConfigHandle::fixed(Default::default()), - _aggregator, + aggregator, ), ) } From 0f1526d4e42fd17ea30edafdd94e663b908ab6ff Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 4 Apr 2024 17:44:50 +0200 Subject: [PATCH 04/22] test: See test fail because of duplicate spans --- relay-dynamic-config/src/feature.rs | 9 +++ .../src/protocol/span/convert.rs | 5 +- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/processor/span.rs | 2 +- .../src/services/processor/span/processing.rs | 67 +++++++++++-------- tests/integration/test_spans.py | 28 +++++++- 6 files changed, 80 insertions(+), 33 deletions(-) diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index bc604b592d1..89a2c7864e5 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -81,6 +81,15 @@ pub enum Feature { #[serde(rename = "organizations:continuous-profiling")] ContinuousProfiling, + /// When enabled, every standalone segment span will be duplicated as a transaction. + /// + /// This allows support of product features that rely on transactions for SDKs that only + /// send spans. + /// + /// Serialized as `projects:extract-transaction-from-segment-span`. + #[serde(rename = "projects:extract-transaction-from-segment-span")] + ExtractTransactionFromSegmentSpan, + /// Deprecated, still forwarded for older downstream Relays. #[doc(hidden)] #[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")] diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index d1054377ad1..20dedb8abf7 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -3,6 +3,7 @@ use crate::protocol::{ ContextInner, Contexts, DefaultContext, Event, ProfileContext, Span, TraceContext, }; +use relay_base_schema::events::EventType; use relay_protocol::Annotated; use std::collections::BTreeMap; @@ -47,7 +48,7 @@ macro_rules! map_fields { $(span.$fixed_span_field:ident <= $fixed_span_value:expr), * } fixed_for_event { - $($fixed_event_value:expr => span.$fixed_event_field:ident), * + $($fixed_event_value:expr => event.$fixed_event_field:ident), * } ) => { #[allow(clippy::needless_update)] @@ -146,7 +147,7 @@ map_fields!( span.was_transaction <= true } fixed_for_event { - // nothing yet + EventType::Transaction => event.ty } ); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3c5ca839b5a..d1a82851546 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1409,7 +1409,7 @@ impl EnvelopeProcessorService { profile::process(state, &self.inner.config); }); - if state.has_event() { + if dbg!(state.has_event()) { event::scrub(state)?; if_processing!(self.inner.config, { span::extract_from_event(state, &self.inner.config); diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index 58f2e57052b..754f99a8caf 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -31,7 +31,7 @@ pub fn filter(state: &mut ProcessEnvelopeState<SpanGroup>) { /// /// Returns `None` when [`tag_extraction::extract_span_tags`] clears the span, which it shouldn't. pub fn extract_transaction_span(event: &Event, max_tag_value_size: usize) -> Option<Span> { - let mut spans = [Span::from(event).into()]; + let mut spans = [dbg!(Span::from(event)).into()]; tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size); diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index b0c45123018..50e22a4cbf1 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -14,7 +14,7 @@ use relay_event_normalization::{ PerformanceScoreConfig, RawUserAgentInfo, TransactionsProcessor, }; use relay_event_schema::processor::{process_value, ProcessingState}; -use relay_event_schema::protocol::{BrowserContext, Contexts, Event, Span, SpanData}; +use relay_event_schema::protocol::{BrowserContext, Contexts, Event, EventId, Span, SpanData}; use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp}; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty}; @@ -63,11 +63,10 @@ pub fn process( &user_agent_info, ); - // Ownership aerobics: - let envelope_headers = state.envelope().headers().clone(); // TODO(jjbayer): Would be nice to not clone here. - let project_state = state.project_state.clone(); - let sampling_project_state = state.sampling_project_state.clone(); - let reservoir_counters = state.reservoir.counters(); + let mut extracted_transactions = vec![]; + let should_extract_transactions = state + .project_state + .has_feature(Feature::ExtractTransactionFromSegmentSpan); state.managed_envelope.retain_items(|item| { let mut annotated_span = match item.ty() { @@ -89,24 +88,9 @@ pub fn process( _ => return ItemAction::Keep, }; - if let Some(transaction) = convert_to_transaction(&annotated_span) { - match Envelope::try_from_event(envelope_headers.clone(), transaction) { - Ok(envelope) => { - addrs.envelope_processor.send(ProcessEnvelope { - envelope: ManagedEnvelope::standalone( - envelope, - addrs.outcome_aggregator.clone(), - addrs.test_store.clone(), - ProcessingGroup::Transaction, - ), - project_state: project_state.clone(), - sampling_project_state: sampling_project_state.clone(), - reservoir_counters: reservoir_counters.clone(), - }); - } - Err(e) => { - relay_log::error!("Failed to create event envelope: {e}"); - } + if should_extract_transactions { + if let Some(transaction) = convert_to_transaction(&annotated_span) { + extracted_transactions.push(transaction); } } @@ -170,17 +154,46 @@ pub fn process( ItemAction::Keep }); + + for mut transaction in extracted_transactions { + // Give each transaction event a new random ID: + transaction.id = EventId::new().into(); + + // Enqueue a full processing request for every extracted transaction item. + match Envelope::try_from_event(state.envelope().headers().clone(), transaction) { + Ok(mut envelope) => { + // for item in envelope.items_mut() { + // item.set_spans_extracted + // } + + addrs.envelope_processor.send(ProcessEnvelope { + envelope: ManagedEnvelope::standalone( + envelope, + addrs.outcome_aggregator.clone(), + addrs.test_store.clone(), + ProcessingGroup::Transaction, + ), + project_state: state.project_state.clone(), + sampling_project_state: state.sampling_project_state.clone(), + reservoir_counters: state.reservoir.counters().clone(), + }); + } + Err(e) => { + relay_log::error!("Failed to create event envelope: {e}"); + } + } + } } pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>, config: &Config) { // Only extract spans from transactions (not errors). - if state.event_type() != Some(EventType::Transaction) { + if dbg!(state.event_type()) != Some(EventType::Transaction) { return; }; - if !state + if !dbg!(state .project_state - .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent) + .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent)) { return; } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index ad0358dac03..5e2a663c3b4 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -297,14 +297,18 @@ def envelope_with_spans( return envelope +@pytest.mark.parametrize("extract_transaction", [False, True]) def test_span_ingestion( mini_sentry, relay_with_processing, spans_consumer, metrics_consumer, + transactions_consumer, + extract_transaction, ): spans_consumer = spans_consumer() metrics_consumer = metrics_consumer() + transactions_consumer = transactions_consumer() relay = relay_with_processing( options={ @@ -321,8 +325,11 @@ def test_span_ingestion( project_config["config"]["features"] = [ "organizations:standalone-span-ingestion", "projects:span-metrics-extraction", - "projects:span-metrics-extraction-all-modules", ] + if extract_transaction: + project_config["config"]["features"].append( + "projects:extract-transaction-from-segment-span" + ) duration = timedelta(milliseconds=500) end = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) @@ -391,7 +398,7 @@ def test_span_ingestion( headers={"Content-Type": "application/x-protobuf"}, ) - spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=6)) + spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=7)) for span in spans: span.pop("received", None) @@ -505,6 +512,21 @@ def test_span_ingestion( }, ] + spans_consumer.assert_empty() + + # If transaction extraction is enabled, expect transactions: + if extract_transaction: + expected_transactions = ( + 6 # TODO: modify test input to make is_segment false for some + ) + transactions = [ + transactions_consumer.get_event()[0] for _ in range(expected_transactions) + ] + print(transactions) + # assert transactions == [] # TODO + + transactions_consumer.assert_empty() + metrics = [metric for (metric, _headers) in metrics_consumer.get_metrics()] metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"])) for metric in metrics: @@ -692,6 +714,8 @@ def test_span_ingestion( }, ] + metrics_consumer.assert_empty() + def test_span_extraction_with_metrics_summary( mini_sentry, From b8177bcec43b8a9da4e4d8a37d21ecff493ddc2b Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Fri, 5 Apr 2024 16:16:48 +0200 Subject: [PATCH 05/22] fix: Prevent duplicate span ingestion --- .../src/normalize/span/exclusive_time.rs | 6 ++++ relay-server/src/envelope.rs | 30 +++++++++++++++++++ relay-server/src/services/processor.rs | 6 ++++ relay-server/src/services/processor/event.rs | 4 +++ .../src/services/processor/span/processing.rs | 17 +++++++---- tests/integration/test_spans.py | 1 + 6 files changed, 59 insertions(+), 5 deletions(-) diff --git a/relay-event-normalization/src/normalize/span/exclusive_time.rs b/relay-event-normalization/src/normalize/span/exclusive_time.rs index c8ef5dbcf1a..bb65d007b84 100644 --- a/relay-event-normalization/src/normalize/span/exclusive_time.rs +++ b/relay-event-normalization/src/normalize/span/exclusive_time.rs @@ -42,6 +42,11 @@ fn set_event_exclusive_time( return; }; + if trace_context.exclusive_time.value().is_some() { + // Exclusive time already set, respect. + return; + } + let Some(span_id) = trace_context.span_id.value() else { return; }; @@ -126,6 +131,7 @@ pub fn compute_span_exclusive_time(event: &mut Event) { intervals.sort_unstable_by_key(|interval| interval.start); } + // TODO: Test exclusive_time on extracted transaction set_event_exclusive_time(event_interval, contexts, &span_map); for span in spans.iter_mut() { diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index cf9a2050410..ca47ca018ef 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -554,6 +554,14 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] metrics_extracted: bool, + /// Whether or not a transaction has been extracted from a segment span. + #[serde(default, skip_serializing_if = "is_false")] + transaction_extracted: bool, + + /// Whether or not spans have been extracted from a transaction. + #[serde(default, skip_serializing_if = "is_false")] + spans_extracted: bool, + /// `false` if the sampling decision is "drop". /// /// In the most common use case, the item is dropped when the sampling decision is "drop". @@ -628,6 +636,8 @@ impl Item { sample_rates: None, other: BTreeMap::new(), metrics_extracted: false, + transaction_extracted: false, + spans_extracted: false, sampled: true, }, payload: Bytes::new(), @@ -833,6 +843,26 @@ impl Item { self.headers.metrics_extracted = metrics_extracted; } + /// Returns the transaction extracted flag. + pub fn transaction_extracted(&self) -> bool { + self.headers.transaction_extracted + } + + /// Sets the transaction extracted flag. + pub fn set_transaction_extracted(&mut self, transaction_extracted: bool) { + self.headers.transaction_extracted = transaction_extracted; + } + + /// Returns the spans extracted flag. + pub fn spans_extracted(&self) -> bool { + self.headers.spans_extracted + } + + /// Sets the spans extracted flag. + pub fn set_spans_extracted(&mut self, spans_extracted: bool) { + self.headers.spans_extracted = spans_extracted; + } + /// Gets the `sampled` flag. pub fn sampled(&self) -> bool { self.headers.sampled diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index d1a82851546..ee3ff2aa160 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -572,6 +572,11 @@ struct ProcessEnvelopeState<'a, Group> { /// Track whether transaction metrics were already extracted. event_metrics_extracted: bool, + /// Track whether spans were already extracted. + /// + /// Only applies to envelopes with a transaction item. + spans_extracted: bool, + /// Partial metrics of the Event during construction. /// /// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are @@ -1060,6 +1065,7 @@ impl EnvelopeProcessorService { ProcessEnvelopeState { event: Annotated::empty(), event_metrics_extracted: false, + spans_extracted: false, metrics: Metrics::default(), sample_rates: None, extracted_metrics: Default::default(), diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 5e6565c606a..63dfcbb4505 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -80,6 +80,7 @@ pub fn extract<G: EventProcessing>( relay_log::trace!("processing json transaction"); sample_rates = item.take_sample_rates(); state.event_metrics_extracted = item.metrics_extracted(); + state.spans_extracted = item.spans_extracted(); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Transaction items can only contain transaction events. Force the event type to // hint to normalization that we're dealing with a transaction now. @@ -383,6 +384,9 @@ pub fn serialize<G: EventProcessing>( // If transaction metrics were extracted, set the corresponding item header event_item.set_metrics_extracted(state.event_metrics_extracted); + // TODO: The state should simply maintain & update an `ItemHeaders` object. + event_item.set_spans_extracted(state.spans_extracted); + // If there are sample rates, write them back to the envelope. In processing mode, sample // rates have been removed from the state and burnt into the event via `finalize_event`. if let Some(sample_rates) = state.sample_rates.take() { diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 50e22a4cbf1..62855c88522 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -88,9 +88,10 @@ pub fn process( _ => return ItemAction::Keep, }; - if should_extract_transactions { + if should_extract_transactions && !item.transaction_extracted() { if let Some(transaction) = convert_to_transaction(&annotated_span) { extracted_transactions.push(transaction); + item.set_transaction_extracted(true); } } @@ -162,9 +163,11 @@ pub fn process( // Enqueue a full processing request for every extracted transaction item. match Envelope::try_from_event(state.envelope().headers().clone(), transaction) { Ok(mut envelope) => { - // for item in envelope.items_mut() { - // item.set_spans_extracted - // } + // We don't want to extract spans from a transaction extracted from spans, so + // set the spans_extracted flag: + for item in envelope.items_mut() { + item.set_spans_extracted(true); + } addrs.envelope_processor.send(ProcessEnvelope { envelope: ManagedEnvelope::standalone( @@ -187,10 +190,14 @@ pub fn process( pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>, config: &Config) { // Only extract spans from transactions (not errors). - if dbg!(state.event_type()) != Some(EventType::Transaction) { + if state.event_type() != Some(EventType::Transaction) { return; }; + if state.spans_extracted { + return; + } + if !dbg!(state .project_state .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent)) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 5e2a663c3b4..1f801d21917 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -519,6 +519,7 @@ def test_span_ingestion( expected_transactions = ( 6 # TODO: modify test input to make is_segment false for some ) + transactions = [ transactions_consumer.get_event()[0] for _ in range(expected_transactions) ] From 3896772d58adb8dcb3bd5a884b9fdf3e69a9aefe Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Mon, 8 Apr 2024 14:40:19 +0200 Subject: [PATCH 06/22] fix: extract after normalize --- relay-server/src/services/processor.rs | 2 +- relay-server/src/services/processor/span.rs | 2 +- .../src/services/processor/span/processing.rs | 21 ++++++++++--------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index ee3ff2aa160..82847b5098b 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1415,7 +1415,7 @@ impl EnvelopeProcessorService { profile::process(state, &self.inner.config); }); - if dbg!(state.has_event()) { + if state.has_event() { event::scrub(state)?; if_processing!(self.inner.config, { span::extract_from_event(state, &self.inner.config); diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index 754f99a8caf..58f2e57052b 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -31,7 +31,7 @@ pub fn filter(state: &mut ProcessEnvelopeState<SpanGroup>) { /// /// Returns `None` when [`tag_extraction::extract_span_tags`] clears the span, which it shouldn't. pub fn extract_transaction_span(event: &Event, max_tag_value_size: usize) -> Option<Span> { - let mut spans = [dbg!(Span::from(event)).into()]; + let mut spans = [Span::from(event).into()]; tag_extraction::extract_span_tags(event, &mut spans, max_tag_value_size); diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 62855c88522..aaf4d51d404 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -88,13 +88,6 @@ pub fn process( _ => return ItemAction::Keep, }; - if should_extract_transactions && !item.transaction_extracted() { - if let Some(transaction) = convert_to_transaction(&annotated_span) { - extracted_transactions.push(transaction); - item.set_transaction_extracted(true); - } - } - if let Err(e) = normalize( &mut annotated_span, normalize_span_config.clone(), @@ -105,6 +98,13 @@ pub fn process( return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; + if should_extract_transactions && !item.transaction_extracted() { + if let Some(transaction) = convert_to_transaction(&annotated_span) { + extracted_transactions.push(transaction); + item.set_transaction_extracted(true); + } + } + if let Some(config) = span_metrics_extraction_config { let Some(span) = annotated_span.value_mut() else { return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); @@ -198,9 +198,9 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>, co return; } - if !dbg!(state + if !state .project_state - .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent)) + .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent) { return; } @@ -543,5 +543,6 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> fn convert_to_transaction(annotated_span: &Annotated<Span>) -> Option<Event> { let span = annotated_span.value()?; - Event::try_from(span).ok() + relay_log::trace!("Extracting transaction for span {:?}", &span.span_id); + dbg!(Event::try_from(span).ok()) } From 39f7005f8037cb265f387c6c8168f5a7a2fc544b Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Mon, 8 Apr 2024 16:26:14 +0200 Subject: [PATCH 07/22] fix: No empty profile context --- .../src/protocol/span/convert.rs | 58 +++++++++++++++---- tests/integration/test_spans.py | 13 ++++- 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index 20dedb8abf7..5b069d68c7e 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -83,11 +83,13 @@ macro_rules! map_fields { type Error = (); fn try_from(span: &Span) -> Result<Self, ()> { + use relay_protocol::Empty; + if !span.is_segment.value().unwrap_or(&false) { // Only segment spans can become transactions. return Err(()); } - Ok(Self { + let event = Self { $( $event_field: span.$span_field.clone(), )* @@ -95,21 +97,40 @@ macro_rules! map_fields { $fixed_event_field: $fixed_event_value.into(), )* contexts: Annotated::new( - Contexts( - BTreeMap::from([ + Contexts({ + let mut contexts = BTreeMap::new(); + $( + let mut context = $ContextType::default(); + let mut has_fields = false; $( - (<$ContextType as DefaultContext>::default_key().into(), ContextInner($ContextType { - $( - $context_field: span.$primary_span_field.clone(), - )* - ..Default::default() - }.into_context()).into()), + if !span.$primary_span_field.is_empty() { + context.$context_field = span.$primary_span_field.clone(); + has_fields = true; + } )* - ]), - ) + if has_fields { + let context_key = <$ContextType as DefaultContext>::default_key().into(); + contexts.insert(context_key, ContextInner(context.into_context()).into()); + } + )* + contexts + }) + // BTreeMap::from([ + // $( + // (<$ContextType as DefaultContext>::default_key().into(), ContextInner($ContextType { + // $( + // $context_field: span.$primary_span_field.clone(), + // )* + // ..Default::default() + // }.into_context()).into()), + // )* + // ]), ), ..Default::default() - }) + }; + + + Ok(event) } } }; @@ -262,4 +283,17 @@ mod tests { let roundtripped = Event::try_from(&span_from_event).unwrap(); assert_eq!(event, roundtripped); } + + #[test] + fn no_empty_profile_context() { + let span = Span { + is_segment: true.into(), + ..Default::default() + }; + let event = Event::try_from(&span).unwrap(); + + // No profile context is set. + // profile_id is required on ProfileContext so we should not create an empty one. + assert!(event.context::<ProfileContext>().is_none()); + } } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 1f801d21917..8cfe8bf191a 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -523,8 +523,17 @@ def test_span_ingestion( transactions = [ transactions_consumer.get_event()[0] for _ in range(expected_transactions) ] - print(transactions) - # assert transactions == [] # TODO + + assert len(transactions) == expected_transactions + for transaction in transactions: + # Not checking all individual fields here, most should be tested in convert.rs + + # SDK gets taken from the header: + if sdk := transaction.get("sdk"): + assert sdk == {"name": "raven-node", "version": "2.6.3"} + + # No errors during normalization: + assert not transaction.get("errors") transactions_consumer.assert_empty() From dd2af1e5d929ccd8444067e29b94315c9abfd691 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Tue, 9 Apr 2024 13:51:43 +0200 Subject: [PATCH 08/22] fix: span metrics --- relay-server/src/metrics_extraction/event.rs | 30 ++++++++++++++++--- relay-server/src/services/processor.rs | 1 + .../services/processor/dynamic_sampling.rs | 1 + .../src/services/processor/span/processing.rs | 3 +- tests/integration/test_spans.py | 16 +++++----- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index e146e5607b0..5ce6a6b85a9 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -46,26 +46,48 @@ impl Extractable for Span { /// If this is a transaction event with spans, metrics will also be extracted from the spans. pub fn extract_metrics( event: &Event, + spans_extracted: bool, config: &MetricExtractionConfig, max_tag_value_size: usize, ) -> Vec<Bucket> { let mut metrics = generic::extract_metrics(event, config); + // If spans were already extracted for an event, + // we rely on span processing to extract metrics. + if !spans_extracted { + extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics); + } + + metrics +} + +fn extract_span_metrics_for_event( + event: &Event, + config: &MetricExtractionConfig, + max_tag_value_size: usize, + output: &mut Vec<Bucket>, +) { relay_statsd::metric!(timer(RelayTimers::EventProcessingSpanMetricsExtraction), { if let Some(transaction_span) = extract_transaction_span(event, max_tag_value_size) { - metrics.extend(generic::extract_metrics(&transaction_span, config)); + relay_log::trace!( + "Extracting metrics from transaction span {:?}", + transaction_span.span_id, + ); + output.extend(generic::extract_metrics(&transaction_span, config)); } if let Some(spans) = event.spans.value() { for annotated_span in spans { if let Some(span) = annotated_span.value() { - metrics.extend(generic::extract_metrics(span, config)); + relay_log::trace!( + "Extracting metrics from transaction span {:?}", + span.span_id + ); + output.extend(generic::extract_metrics(span, config)); } } } }); - - metrics } #[cfg(test)] diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 82847b5098b..12994ca8e18 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1164,6 +1164,7 @@ impl EnvelopeProcessorService { if let Some(config) = config { let metrics = crate::metrics_extraction::event::extract_metrics( event, + state.spans_extracted, config, self.inner .config diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 93e69ab6f06..a59d0d0550b 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -497,6 +497,7 @@ mod tests { profile_id: None, event_metrics_extracted: false, reservoir: dummy_reservoir(), + spans_extracted: false, } }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index aaf4d51d404..fa0afd9d52d 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -109,6 +109,7 @@ pub fn process( let Some(span) = annotated_span.value_mut() else { return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; + relay_log::trace!("Extracting metrics from standalone span {:?}", span.span_id); let metrics = extract_metrics(span, config); state.extracted_metrics.project_metrics.extend(metrics); item.set_metrics_extracted(true); @@ -544,5 +545,5 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> fn convert_to_transaction(annotated_span: &Annotated<Span>) -> Option<Event> { let span = annotated_span.value()?; relay_log::trace!("Extracting transaction for span {:?}", &span.span_id); - dbg!(Event::try_from(span).ok()) + Event::try_from(span).ok() } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 2c1e2048e39..19f79c45926 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -49,7 +49,7 @@ def test_span_extraction( project_config["config"]["features"].append("projects:discard-transaction") event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) - end = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) + end = datetime.now(timezone.utc) - timedelta(seconds=1) duration = timedelta(milliseconds=500) start = end - duration event["spans"] = [ @@ -332,7 +332,7 @@ def test_span_ingestion( ) duration = timedelta(milliseconds=500) - end = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) + end = datetime.now(timezone.utc) - timedelta(seconds=1) start = end - duration # 1 - Send OTel span and sentry span via envelope @@ -546,8 +546,7 @@ def test_span_ingestion( pass expected_timestamp = int(end.timestamp()) - - assert metrics == [ + expected_metrics = [ { "name": "c:spans/usage@none", "org_id": 1, @@ -686,6 +685,7 @@ def test_span_ingestion( "value": [500.0], }, ] + assert metrics == expected_metrics metrics_consumer.assert_empty() @@ -903,10 +903,10 @@ def test_span_reject_invalid_timestamps( duration = timedelta(milliseconds=500) yesterday_delta = timedelta(days=1) - end_yesterday = datetime.utcnow().replace(tzinfo=timezone.utc) - yesterday_delta + end_yesterday = datetime.now(timezone.utc) - yesterday_delta start_yesterday = end_yesterday - duration - end_today = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) + end_today = datetime.now(timezone.utc) - timedelta(seconds=1) start_today = end_today - duration envelope = Envelope() @@ -1012,7 +1012,7 @@ def test_span_ingestion_with_performance_scores( ] duration = timedelta(milliseconds=500) - end = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) + end = datetime.now(timezone.utc) - timedelta(seconds=1) start = end - duration envelope = Envelope() @@ -1220,7 +1220,7 @@ def test_rate_limit_indexed_consistent_extracted( end = start + timedelta(seconds=1) event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) - end = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(seconds=1) + end = datetime.now(timezone.utc) - timedelta(seconds=1) duration = timedelta(milliseconds=500) start = end - duration event["spans"] = [ From bb6467b282da9797d70d0f294d7ffbd9f6f46436 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Tue, 9 Apr 2024 14:10:07 +0200 Subject: [PATCH 09/22] test --- .vscode/settings.json | 3 ++- relay-server/src/metrics_extraction/event.rs | 10 ++++----- tests/integration/test_spans.py | 23 ++++++++++++++++++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index ddcd1bdb9a6..065ca611264 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -24,5 +24,6 @@ "[markdown]": { "editor.rulers": [80], "editor.tabSize": 2 - } + }, + "cSpell.enabled": false } diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index 5ce6a6b85a9..f32375724be 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -1038,7 +1038,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200); insta::assert_debug_snapshot!(metrics); } @@ -1173,7 +1173,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200); insta::assert_debug_snapshot!((&event.value().unwrap().spans, metrics)); } @@ -1234,7 +1234,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200); // When transaction.op:ui.load and mobile:true, HTTP spans still get both // exclusive_time metrics: @@ -1270,7 +1270,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200); let usage_metrics = metrics .into_iter() @@ -1496,7 +1496,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200); assert_eq!(metrics.len(), 4); assert_eq!(&*metrics[0].name, "c:spans/usage@none"); diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 19f79c45926..191674c49a5 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -326,6 +326,7 @@ def test_span_ingestion( "organizations:standalone-span-ingestion", "projects:span-metrics-extraction", ] + project_config["config"]["transactionMetrics"] = {"version": 1} if extract_transaction: project_config["config"]["features"].append( "projects:extract-transaction-from-segment-span" @@ -546,7 +547,7 @@ def test_span_ingestion( pass expected_timestamp = int(end.timestamp()) - expected_metrics = [ + expected_span_metrics = [ { "name": "c:spans/usage@none", "org_id": 1, @@ -685,7 +686,25 @@ def test_span_ingestion( "value": [500.0], }, ] - assert metrics == expected_metrics + assert [m for m in metrics if ":spans/" in m["name"]] == expected_span_metrics + + transaction_duration_metrics = [ + m for m in metrics if m["name"] == "d:transactions/duration@millisecond" + ] + assert { + (m["name"], m["tags"]["transaction"]) for m in transaction_duration_metrics + } == { + ("d:transactions/duration@millisecond", "<unlabeled transaction>"), + ("d:transactions/duration@millisecond", "https://example.com/p/blah.js"), + ("d:transactions/duration@millisecond", "my 1st OTel span"), + ("d:transactions/duration@millisecond", "my 2nd OTel span"), + ("d:transactions/duration@millisecond", "my 3rd protobuf OTel span"), + ("d:transactions/duration@millisecond", 'test \\" with \\" escaped \\" chars'), + } + + # Make sure we're not double-reporting: + for m in transaction_duration_metrics: + len(m["value"]) == 1 metrics_consumer.assert_empty() From 5220d1d830336f55427f52e9704ed6fdb10c5dcf Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Tue, 9 Apr 2024 14:25:22 +0200 Subject: [PATCH 10/22] fix: test & lint --- relay-event-schema/src/protocol/span/convert.rs | 1 + relay-server/src/service.rs | 1 + relay-server/src/services/processor.rs | 4 ++-- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index 5b069d68c7e..64b2c0d1ce5 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -182,6 +182,7 @@ mod tests { fn roundtrip() { let event = Annotated::<Event>::from_json( r#"{ + "type": "transaction", "contexts": { "profile": {"profile_id": "a0aaaaaaaaaaaaaaaaaaaaaaaaaaaaab"}, "trace": { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index fd2e535114f..0edc762b353 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -183,6 +183,7 @@ impl ServiceState { #[cfg(feature = "processing")] store_forwarder: store.clone(), }, + #[cfg(feature = "processing")] metric_stats, ) .spawn_handler(processor_rx); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 12994ca8e18..4fda42d4e76 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1527,9 +1527,9 @@ impl EnvelopeProcessorService { ) -> Result<(), ProcessingError> { span::filter(state); - let global_config = self.inner.global_config.current(); - if_processing!(self.inner.config, { + let global_config = self.inner.global_config.current(); + span::process( state, self.inner.config.clone(), From 800ff441d8ab6cc1e2d2bb1bed4aa4d6c4927ef8 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Tue, 9 Apr 2024 15:03:27 +0200 Subject: [PATCH 11/22] cleanup --- .vscode/settings.json | 3 +-- relay-event-schema/src/protocol/span/convert.rs | 10 ---------- relay-server/src/metrics_extraction/event.rs | 8 -------- relay-server/src/services/processor/span/processing.rs | 1 + 4 files changed, 2 insertions(+), 20 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 065ca611264..ddcd1bdb9a6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -24,6 +24,5 @@ "[markdown]": { "editor.rulers": [80], "editor.tabSize": 2 - }, - "cSpell.enabled": false + } } diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index 64b2c0d1ce5..683175f61f1 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -115,16 +115,6 @@ macro_rules! map_fields { )* contexts }) - // BTreeMap::from([ - // $( - // (<$ContextType as DefaultContext>::default_key().into(), ContextInner($ContextType { - // $( - // $context_field: span.$primary_span_field.clone(), - // )* - // ..Default::default() - // }.into_context()).into()), - // )* - // ]), ), ..Default::default() }; diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index f32375724be..a53eb52f119 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -69,20 +69,12 @@ fn extract_span_metrics_for_event( ) { relay_statsd::metric!(timer(RelayTimers::EventProcessingSpanMetricsExtraction), { if let Some(transaction_span) = extract_transaction_span(event, max_tag_value_size) { - relay_log::trace!( - "Extracting metrics from transaction span {:?}", - transaction_span.span_id, - ); output.extend(generic::extract_metrics(&transaction_span, config)); } if let Some(spans) = event.spans.value() { for annotated_span in spans { if let Some(span) = annotated_span.value() { - relay_log::trace!( - "Extracting metrics from transaction span {:?}", - span.span_id - ); output.extend(generic::extract_metrics(span, config)); } } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 8ea9df3f1c7..467f6a4b53b 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -623,6 +623,7 @@ mod tests { managed_envelope: managed_envelope.try_into().unwrap(), profile_id: None, event_metrics_extracted: false, + spans_extracted: false, reservoir: ReservoirEvaluator::new(ReservoirCounters::default()), } } From 3247cd6cb75c69a6fa040f6aeb2fe2d9de5f8c3f Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Wed, 10 Apr 2024 12:08:09 +0200 Subject: [PATCH 12/22] test --- CHANGELOG.md | 1 + relay-server/src/envelope.rs | 5 ++- relay-server/src/services/processor.rs | 2 +- .../src/services/processor/span/processing.rs | 4 +-- tests/integration/test_spans.py | 32 +++++++++++-------- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f158f6f6158..c3dd741cfda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Allow IP addresses in metrics domain tag. ([#3365](https://github.com/getsentry/relay/pull/3365)) - Support the full unicode character set via UTF-8 encoding for metric tags submitted via the statsd format. Certain restricted characters require escape sequences, see [docs](https://develop.sentry.dev/sdk/metrics/#normalization) for the precise rules. ([#3358](https://github.com/getsentry/relay/pull/3358)) - Stop extracting count_per_segment and count_per_op metrics. ([#3380](https://github.com/getsentry/relay/pull/3380)) +- Optionally convert segment spans to transactions for compatibility. ([#3375](https://github.com/getsentry/relay/pull/3375)) **Bug Fixes**: diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index ca47ca018ef..251133a33fc 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -558,7 +558,10 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] transaction_extracted: bool, - /// Whether or not spans have been extracted from a transaction. + /// Whether or not spans and span metrics have been extracted from a transaction. + /// + /// This header is set to `true` after both span extraction and span metrics extraction, + /// and can be used to skip extraction. #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index e207974c2b1..0f23cccbe9c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -572,7 +572,7 @@ struct ProcessEnvelopeState<'a, Group> { /// Track whether transaction metrics were already extracted. event_metrics_extracted: bool, - /// Track whether spans were already extracted. + /// Track whether spans and span metrics were already extracted. /// /// Only applies to envelopes with a transaction item. spans_extracted: bool, diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 467f6a4b53b..b1f7718c9ed 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -164,8 +164,8 @@ pub fn process( // Enqueue a full processing request for every extracted transaction item. match Envelope::try_from_event(state.envelope().headers().clone(), transaction) { Ok(mut envelope) => { - // We don't want to extract spans from a transaction extracted from spans, so - // set the spans_extracted flag: + // We don't want to extract spans or span metrics from a transaction extracted from spans, + // so set the spans_extracted flag: for item in envelope.items_mut() { item.set_spans_extracted(true); } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index c34ca5e33c7..dbc3c357e8e 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -755,20 +755,26 @@ def test_span_ingestion( transaction_duration_metrics = [ m for m in metrics if m["name"] == "d:transactions/duration@millisecond" ] - assert { - (m["name"], m["tags"]["transaction"]) for m in transaction_duration_metrics - } == { - ("d:transactions/duration@millisecond", "<unlabeled transaction>"), - ("d:transactions/duration@millisecond", "https://example.com/p/blah.js"), - ("d:transactions/duration@millisecond", "my 1st OTel span"), - ("d:transactions/duration@millisecond", "my 2nd OTel span"), - ("d:transactions/duration@millisecond", "my 3rd protobuf OTel span"), - ("d:transactions/duration@millisecond", 'test \\" with \\" escaped \\" chars'), - } - # Make sure we're not double-reporting: - for m in transaction_duration_metrics: - len(m["value"]) == 1 + if extract_transaction: + assert { + (m["name"], m["tags"]["transaction"]) for m in transaction_duration_metrics + } == { + ("d:transactions/duration@millisecond", "<unlabeled transaction>"), + ("d:transactions/duration@millisecond", "https://example.com/p/blah.js"), + ("d:transactions/duration@millisecond", "my 1st OTel span"), + ("d:transactions/duration@millisecond", "my 2nd OTel span"), + ("d:transactions/duration@millisecond", "my 3rd protobuf OTel span"), + ( + "d:transactions/duration@millisecond", + 'test \\" with \\" escaped \\" chars', + ), + } + # Make sure we're not double-reporting: + for m in transaction_duration_metrics: + assert len(m["value"]) == 1 + else: + assert len(transaction_duration_metrics) == 0 metrics_consumer.assert_empty() From ac29ffe6be822a050b74ddb24ab0b270f1a1a13c Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Wed, 10 Apr 2024 12:16:08 +0200 Subject: [PATCH 13/22] Adapt test --- tests/integration/test_spans.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index dbc3c357e8e..2a7125ba334 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -440,6 +440,7 @@ def test_span_ingestion( protobuf_span = Span( trace_id=bytes.fromhex("89143b0763095bd9c9955e8175d1fb24"), span_id=bytes.fromhex("f0b809703e783d00"), + parent_span_id=bytes.fromhex("f0f0f0abcdef1234"), name="my 3rd protobuf OTel span", start_time_unix_nano=int(start.timestamp() * 1e9), end_time_unix_nano=int(end.timestamp() * 1e9), @@ -564,12 +565,11 @@ def test_span_ingestion( "description": "my 3rd protobuf OTel span", "duration_ms": 500, "exclusive_time_ms": 500.0, - "is_segment": True, + "is_segment": False, "organization_id": 1, - "parent_span_id": "", + "parent_span_id": "f0f0f0abcdef1234", "project_id": 42, "retention_days": 90, - "segment_id": "f0b809703e783d00", "sentry_tags": {"browser.name": "Python Requests", "op": "default"}, "span_id": "f0b809703e783d00", "start_timestamp_ms": int(start.timestamp() * 1e3), @@ -581,9 +581,7 @@ def test_span_ingestion( # If transaction extraction is enabled, expect transactions: if extract_transaction: - expected_transactions = ( - 6 # TODO: modify test input to make is_segment false for some - ) + expected_transactions = 5 transactions = [ transactions_consumer.get_event()[0] for _ in range(expected_transactions) @@ -764,7 +762,6 @@ def test_span_ingestion( ("d:transactions/duration@millisecond", "https://example.com/p/blah.js"), ("d:transactions/duration@millisecond", "my 1st OTel span"), ("d:transactions/duration@millisecond", "my 2nd OTel span"), - ("d:transactions/duration@millisecond", "my 3rd protobuf OTel span"), ( "d:transactions/duration@millisecond", 'test \\" with \\" escaped \\" chars', From d31d662f074dda7ef6402e176212ed03811dfefc Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Wed, 10 Apr 2024 14:01:59 +0200 Subject: [PATCH 14/22] fix: no duplicate transaction metric --- relay-dynamic-config/src/defaults.rs | 20 ++++++-- .../src/normalize/span/exclusive_time.rs | 1 - relay-server/src/envelope.rs | 2 +- tests/integration/test_spans.py | 50 ++++++++++++++++--- 4 files changed, 62 insertions(+), 11 deletions(-) diff --git a/relay-dynamic-config/src/defaults.rs b/relay-dynamic-config/src/defaults.rs index 8b58feb2e57..ad82817e388 100644 --- a/relay-dynamic-config/src/defaults.rs +++ b/relay-dynamic-config/src/defaults.rs @@ -58,7 +58,11 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) { return; } - config.metrics.extend(span_metrics()); + config.metrics.extend(span_metrics( + project_config + .features + .has(Feature::ExtractTransactionFromSegmentSpan), + )); config._span_metrics_extended = true; if config.version == 0 { @@ -67,7 +71,13 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) { } /// Metrics with tags applied as required. -fn span_metrics() -> impl IntoIterator<Item = MetricSpec> { +fn span_metrics(transaction_extraction_enabled: bool) -> impl IntoIterator<Item = MetricSpec> { + let score_total_transaction_metric = if transaction_extraction_enabled { + RuleCondition::never() + } else { + RuleCondition::all() + }; + let is_db = RuleCondition::eq("span.sentry_tags.category", "db") & !(RuleCondition::eq("span.system", "mongodb") | RuleCondition::glob("span.op", DISABLED_DATABASES) @@ -471,7 +481,11 @@ fn span_metrics() -> impl IntoIterator<Item = MetricSpec> { mri: "d:transactions/measurements.score.total@ratio".into(), field: Some("span.measurements.score.total.value".into()), condition: Some( - is_allowed_browser.clone() & RuleCondition::eq("span.was_transaction", false), + // If transactions are extracted from spans, the transaction processing pipeline + // will take care of this metric. + score_total_transaction_metric + & is_allowed_browser.clone() + & RuleCondition::eq("span.was_transaction", false), ), tags: vec![ Tag::with_key("span.op") diff --git a/relay-event-normalization/src/normalize/span/exclusive_time.rs b/relay-event-normalization/src/normalize/span/exclusive_time.rs index bb65d007b84..3a9f53febb7 100644 --- a/relay-event-normalization/src/normalize/span/exclusive_time.rs +++ b/relay-event-normalization/src/normalize/span/exclusive_time.rs @@ -131,7 +131,6 @@ pub fn compute_span_exclusive_time(event: &mut Event) { intervals.sort_unstable_by_key(|interval| interval.start); } - // TODO: Test exclusive_time on extracted transaction set_event_exclusive_time(event_interval, contexts, &span_map); for span in spans.iter_mut() { diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 251133a33fc..53ed3d68e94 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -1093,7 +1093,7 @@ impl Envelope { Box::new(Self { items, headers }) } - /// Creates an envelope from an event, plus headers. + /// Creates an envelope from headers and an envelope. pub fn try_from_event( mut headers: EnvelopeHeaders, event: Event, diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 2a7125ba334..4e364462889 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -314,6 +314,12 @@ def envelope_with_spans( "timestamp": end.timestamp() + 1, "exclusive_time": 345.0, # The SDK knows that this span has a lower exclusive time "trace_id": "ff62a8b040f340bda5d830223def1d81", + "measurements": { + "score.total": {"unit": "ratio", "value": 0.12121616}, + }, + "data": { + "browser.name": "Chrome", + }, }, ).encode() ), @@ -401,7 +407,13 @@ def test_span_ingestion( # 1 - Send OTel span and sentry span via envelope envelope = envelope_with_spans(start, end) - relay.send_envelope(project_id, envelope) + relay.send_envelope( + project_id, + envelope, + headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" + }, + ) # 2 - Send OTel json span via endpoint relay.send_otel_span( @@ -463,7 +475,9 @@ def test_span_ingestion( headers={"Content-Type": "application/x-protobuf"}, ) - spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=7)) + print("Waiting for spans..") + spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=6)) + print("Done waiting for spans.") for span in spans: span.pop("received", None) @@ -483,7 +497,7 @@ def test_span_ingestion( "retention_days": 90, "segment_id": "a342abb1214ca181", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "category": "db", "op": "db.query", }, @@ -496,12 +510,13 @@ def test_span_ingestion( "duration_ms": 1500, "exclusive_time_ms": 345.0, "is_segment": True, + "measurements": {"score.total": {"value": 0.12121616}}, "organization_id": 1, "project_id": 42, "retention_days": 90, "segment_id": "bd429c44b67a3eb1", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "category": "resource", "description": "https://example.com/*/blah.js", "domain": "example.com", @@ -522,7 +537,7 @@ def test_span_ingestion( "project_id": 42, "retention_days": 90, "segment_id": "cd429c44b67a3eb1", - "sentry_tags": {"browser.name": "Python Requests", "op": "default"}, + "sentry_tags": {"browser.name": "Chrome", "op": "default"}, "span_id": "cd429c44b67a3eb1", "start_timestamp_ms": int(start.timestamp() * 1e3), "trace_id": "ff62a8b040f340bda5d830223def1d81", @@ -554,7 +569,7 @@ def test_span_ingestion( "retention_days": 90, "segment_id": "ed429c44b67a3eb1", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "op": "default", }, "span_id": "ed429c44b67a3eb1", @@ -577,6 +592,7 @@ def test_span_ingestion( }, ] + print("Asserting emptiness") spans_consumer.assert_empty() # If transaction extraction is enabled, expect transactions: @@ -598,9 +614,12 @@ def test_span_ingestion( # No errors during normalization: assert not transaction.get("errors") + print("Asserting emptiness") transactions_consumer.assert_empty() + print("Waiting for metrics") metrics = [metric for (metric, _headers) in metrics_consumer.get_metrics()] + print("/Waiting for metrics") metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"])) for metric in metrics: try: @@ -747,6 +766,16 @@ def test_span_ingestion( "type": "d", "value": [500.0], }, + { + "name": "d:spans/webvital.score.total@ratio", + "org_id": 1, + "project_id": 42, + "retention_days": 90, + "tags": {"span.op": "resource.script"}, + "timestamp": expected_timestamp + 1, + "type": "d", + "value": [0.12121616], + }, ] assert [m for m in metrics if ":spans/" in m["name"]] == expected_span_metrics @@ -773,6 +802,15 @@ def test_span_ingestion( else: assert len(transaction_duration_metrics) == 0 + # Regardless of whether transactions are extracted, score.total is only converted to a transaction metric once: + score_total_metrics = [ + m + for m in metrics + if m["name"] == "d:transactions/measurements.score.total@ratio" + ] + assert len(score_total_metrics) == 1, score_total_metrics + assert len(score_total_metrics[0]["value"]) == 1 + metrics_consumer.assert_empty() From bac3566dec988782e29cd005a951fcf0e0b2cf07 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 11:45:11 +0200 Subject: [PATCH 15/22] ref: review comments --- relay-sampling/src/evaluation.rs | 4 +- .../src/services/processor/span/processing.rs | 40 ++++++++++++------- relay-server/src/statsd.rs | 3 ++ tests/integration/test_spans.py | 8 +--- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index c183576f22e..067862b7088 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -65,9 +65,9 @@ impl<'a> ReservoirEvaluator<'a> { } } - /// Returns a shared reference to the reservoir counters. + /// Gets shared ownership of the reference counters. pub fn counters(&self) -> ReservoirCounters { - self.counters.clone() + Arc::clone(&self.counters) } /// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`]. diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 2715e5cf549..b72ab968bd6 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -28,6 +28,7 @@ use crate::services::processor::{ Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup, TransactionGroup, }; +use crate::statsd::RelayCounters; use crate::utils::{sample, ItemAction, ManagedEnvelope}; use thiserror::Error; @@ -93,6 +94,15 @@ pub fn process( _ => return ItemAction::Keep, }; + set_segment_attributes(&mut annotated_span); + + if should_extract_transactions && !item.transaction_extracted() { + if let Some(transaction) = convert_to_transaction(&annotated_span) { + extracted_transactions.push(transaction); + item.set_transaction_extracted(true); + } + } + if let Err(e) = normalize( &mut annotated_span, normalize_span_config.clone(), @@ -103,13 +113,6 @@ pub fn process( return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; - if should_extract_transactions && !item.transaction_extracted() { - if let Some(transaction) = convert_to_transaction(&annotated_span) { - extracted_transactions.push(transaction); - item.set_transaction_extracted(true); - } - } - if let Some(config) = span_metrics_extraction_config { let Some(span) = annotated_span.value_mut() else { return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); @@ -180,6 +183,8 @@ pub fn process( item.set_spans_extracted(true); } + relay_statsd::metric!(counter(RelayCounters::TransactionsFromSpans) += 1); + addrs.envelope_processor.send(ProcessEnvelope { envelope: ManagedEnvelope::standalone( envelope, @@ -189,7 +194,7 @@ pub fn process( ), project_state: state.project_state.clone(), sampling_project_state: state.sampling_project_state.clone(), - reservoir_counters: state.reservoir.counters().clone(), + reservoir_counters: state.reservoir.counters(), }); } Err(e) => { @@ -366,6 +371,19 @@ fn get_normalize_span_config<'a>( } } +fn set_segment_attributes(span: &mut Annotated<Span>) { + let Some(span) = span.value_mut() else { return }; + + // TODO: A span might be a segment span even if the parent_id is not empty + // (parent within a trace). I.e. do not overwrite here. + let is_segment = span.parent_span_id.is_empty(); + + span.is_segment = Annotated::new(is_segment); + if is_segment { + span.segment_id = span.span_id.clone(); + } +} + /// Normalizes a standalone span. fn normalize( annotated_span: &mut Annotated<Span>, @@ -433,14 +451,8 @@ fn normalize( ); } - let is_segment = span.parent_span_id.is_empty(); - span.is_segment = Annotated::new(is_segment); span.received = Annotated::new(received_at.into()); - if is_segment { - span.segment_id = span.span_id.clone(); - } - if let Some(transaction) = span .data .value_mut() diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 93edbd4cd40..07350e4150a 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -642,6 +642,8 @@ pub enum RelayCounters { /// This metric is tagged with: /// - `decision`: "drop" if dynamic sampling drops the envelope, else "keep". DynamicSamplingDecision, + /// Counts how many transactions were created from segment spans. + TransactionsFromSpans, } impl CounterMetric for RelayCounters { @@ -680,6 +682,7 @@ impl CounterMetric for RelayCounters { RelayCounters::OpenTelemetryEvent => "event.opentelemetry", RelayCounters::GlobalConfigFetched => "global_config.fetch", RelayCounters::DynamicSamplingDecision => "dynamic_sampling_decision", + RelayCounters::TransactionsFromSpans => "transactions_from_spans", } } } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 4e364462889..6138f6dde9c 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -410,7 +410,7 @@ def test_span_ingestion( relay.send_envelope( project_id, envelope, - headers={ + headers={ # Set browser header to verify that `d:transactions/measurements.score.total@ratio` is extracted only once. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" }, ) @@ -475,9 +475,7 @@ def test_span_ingestion( headers={"Content-Type": "application/x-protobuf"}, ) - print("Waiting for spans..") spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=6)) - print("Done waiting for spans.") for span in spans: span.pop("received", None) @@ -592,7 +590,6 @@ def test_span_ingestion( }, ] - print("Asserting emptiness") spans_consumer.assert_empty() # If transaction extraction is enabled, expect transactions: @@ -614,12 +611,9 @@ def test_span_ingestion( # No errors during normalization: assert not transaction.get("errors") - print("Asserting emptiness") transactions_consumer.assert_empty() - print("Waiting for metrics") metrics = [metric for (metric, _headers) in metrics_consumer.get_metrics()] - print("/Waiting for metrics") metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"])) for metric in metrics: try: From 7a9fcef351450fd80e30a9e1b95e519aa556e571 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 11:54:02 +0200 Subject: [PATCH 16/22] more instr --- .../src/services/processor/span/processing.rs | 12 ++++++++++-- relay-server/src/statsd.rs | 4 ++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index b72ab968bd6..8cbca16c145 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -28,7 +28,7 @@ use crate::services::processor::{ Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup, TransactionGroup, }; -use crate::statsd::RelayCounters; +use crate::statsd::{RelayCounters, RelayHistograms}; use crate::utils::{sample, ItemAction, ManagedEnvelope}; use thiserror::Error; @@ -170,6 +170,7 @@ pub fn process( ItemAction::Keep }); + let mut transaction_count = 0; for mut transaction in extracted_transactions { // Give each transaction event a new random ID: transaction.id = EventId::new().into(); @@ -183,7 +184,7 @@ pub fn process( item.set_spans_extracted(true); } - relay_statsd::metric!(counter(RelayCounters::TransactionsFromSpans) += 1); + transaction_count += 1; addrs.envelope_processor.send(ProcessEnvelope { envelope: ManagedEnvelope::standalone( @@ -202,6 +203,13 @@ pub fn process( } } } + + if transaction_count > 0 { + relay_statsd::metric!(counter(RelayCounters::TransactionsFromSpans) += transaction_count); + relay_statsd::metric!( + histogram(RelayHistograms::TransactionsFromSpansPerEnvelope) = transaction_count as u64 + ); + } } pub fn extract_from_event( diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 07350e4150a..37f9f1600b4 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -382,6 +382,9 @@ pub enum RelayTimers { /// This metric is tagged with: /// - `type`: The type of the health check, `liveness` or `readiness`. HealthCheckDuration, + + /// Measurees how many transactions were created from segment spans in a single envelope. + TransactionsFromSpansPerEnvelope, } impl TimerMetric for RelayTimers { @@ -420,6 +423,7 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferMessageProcessDuration => "buffer.message.duration", RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration", RelayTimers::HealthCheckDuration => "health.message.duration", + RelayTimers::TransactionsFromSpansPerEnvelope => "transactions_from_spans_per_envelope", } } } From ef0c15c0a248bce0d374041efa10dda2c495b326 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 13:21:42 +0200 Subject: [PATCH 17/22] Update relay-event-normalization/src/normalize/span/exclusive_time.rs Co-authored-by: David Herberth <david.herberth@sentry.io> --- relay-event-normalization/src/normalize/span/exclusive_time.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-event-normalization/src/normalize/span/exclusive_time.rs b/relay-event-normalization/src/normalize/span/exclusive_time.rs index 3a9f53febb7..aacba0797cb 100644 --- a/relay-event-normalization/src/normalize/span/exclusive_time.rs +++ b/relay-event-normalization/src/normalize/span/exclusive_time.rs @@ -43,7 +43,7 @@ fn set_event_exclusive_time( }; if trace_context.exclusive_time.value().is_some() { - // Exclusive time already set, respect. + // Exclusive time already set, respect it. return; } From 34c8012cc8e49e688922ae752c0deb46524107bb Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 12:32:01 +0200 Subject: [PATCH 18/22] fix: metric in wrong place --- relay-server/src/statsd.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 37f9f1600b4..77a05f067d9 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -178,6 +178,9 @@ pub enum RelayHistograms { /// The distribution of buckets should be even. /// If it is not, this metric should expose it. PartitionKeys, + + /// Measures how many transactions were created from segment spans in a single envelope. + TransactionsFromSpansPerEnvelope, } impl HistogramMetric for RelayHistograms { @@ -211,6 +214,9 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::UpstreamEnvelopeBodySize => "upstream.envelope.body_size", RelayHistograms::UpstreamMetricsBodySize => "upstream.metrics.body_size", RelayHistograms::PartitionKeys => "metrics.buckets.partition_keys", + RelayHistograms::TransactionsFromSpansPerEnvelope => { + "transactions_from_spans_per_envelope" + } } } } @@ -382,9 +388,6 @@ pub enum RelayTimers { /// This metric is tagged with: /// - `type`: The type of the health check, `liveness` or `readiness`. HealthCheckDuration, - - /// Measurees how many transactions were created from segment spans in a single envelope. - TransactionsFromSpansPerEnvelope, } impl TimerMetric for RelayTimers { @@ -423,7 +426,6 @@ impl TimerMetric for RelayTimers { RelayTimers::BufferMessageProcessDuration => "buffer.message.duration", RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration", RelayTimers::HealthCheckDuration => "health.message.duration", - RelayTimers::TransactionsFromSpansPerEnvelope => "transactions_from_spans_per_envelope", } } } From 469e96cead4ed37a94fd40f4310443ffcd47cac9 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 13:21:00 +0200 Subject: [PATCH 19/22] fix: always normalize --- relay-server/src/extractors/request_meta.rs | 5 + relay-server/src/services/processor.rs | 5 +- .../src/services/processor/span/processing.rs | 3 + tests/integration/test_spans.py | 116 +++++++++++++----- 4 files changed, 98 insertions(+), 31 deletions(-) diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 86b6d75ba8a..605d9922ffd 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -316,6 +316,11 @@ impl<D> RequestMeta<D> { pub fn is_from_internal_relay(&self) -> bool { self.from_internal_relay } + + /// Overwrite internal property. + pub fn set_from_internal_relay(&mut self, value: bool) { + self.from_internal_relay = value; + } } impl RequestMeta { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 3124286286a..ff383583bf6 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1225,11 +1225,12 @@ impl EnvelopeProcessorService { // not being available, envelopes can go to other PoP regions or // directly to processing relays. Events should be fully // normalized, independently of the ingestion path. - if self.inner.config.processing_enabled() - && !state.envelope().meta().is_from_internal_relay() + if dbg!(self.inner.config.processing_enabled()) + && (!dbg!(state.envelope().meta().is_from_internal_relay())) { true } else { + relay_log::trace!("Skipping event normalization"); return Ok(()); } } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index c75068632ff..2d14a8d2a19 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -190,6 +190,9 @@ pub fn process( // Enqueue a full processing request for every extracted transaction item. match Envelope::try_from_event(state.envelope().headers().clone(), transaction) { Ok(mut envelope) => { + // In order to force normalization, treat as external: + envelope.meta_mut().set_from_internal_relay(false); + // We don't want to extract spans or span metrics from a transaction extracted from spans, // so set the spans_extracted flag: for item in envelope.items_mut() { diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 6138f6dde9c..cf2c11cbd66 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -366,6 +366,38 @@ def envelope_with_spans( return envelope +def make_otel_span(start, end): + return { + "resourceSpans": [ + { + "scopeSpans": [ + { + "spans": [ + { + "traceId": "89143b0763095bd9c9955e8175d1fb24", + "spanId": "d342abb1214ca182", + "name": "my 2nd OTel span", + "startTimeUnixNano": int(start.timestamp() * 1e9), + "endTimeUnixNano": int(end.timestamp() * 1e9), + "attributes": [ + { + "key": "sentry.exclusive_time_ns", + "value": { + "intValue": int( + (end - start).total_seconds() * 1e9 + ), + }, + }, + ], + }, + ], + }, + ], + }, + ], + } + + @pytest.mark.parametrize("extract_transaction", [False, True]) def test_span_ingestion( mini_sentry, @@ -418,35 +450,7 @@ def test_span_ingestion( # 2 - Send OTel json span via endpoint relay.send_otel_span( project_id, - json={ - "resourceSpans": [ - { - "scopeSpans": [ - { - "spans": [ - { - "traceId": "89143b0763095bd9c9955e8175d1fb24", - "spanId": "d342abb1214ca182", - "name": "my 2nd OTel span", - "startTimeUnixNano": int(start.timestamp() * 1e9), - "endTimeUnixNano": int(end.timestamp() * 1e9), - "attributes": [ - { - "key": "sentry.exclusive_time_ns", - "value": { - "intValue": int( - duration.total_seconds() * 1e9 - ), - }, - }, - ], - }, - ], - }, - ], - }, - ], - }, + json=make_otel_span(start, end), ) protobuf_span = Span( @@ -881,6 +885,60 @@ def test_span_extraction_with_metrics_summary( assert metrics_summary["mri"] == mri +def test_extracted_transaction_gets_normalized( + mini_sentry, transactions_consumer, relay_with_processing, relay, relay_credentials +): + """When normalization in processing relays has been disabled, an extracted + transaction still gets normalized. + + This test was copied and adapted from test_store::test_relay_chain_normalization + + """ + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:extract-transaction-from-segment-span", + ] + + transactions_consumer = transactions_consumer() + + credentials = relay_credentials() + processing = relay_with_processing( + static_relays={ + credentials["id"]: { + "public_key": credentials["public_key"], + "internal": True, + }, + }, + options={"processing": {"normalize": "disabled"}}, + ) + relay = relay( + processing, + credentials=credentials, + options={ + "processing": { + "normalize": "full", + } + }, + ) + + duration = timedelta(milliseconds=500) + end = datetime.now(timezone.utc) - timedelta(seconds=1) + start = end - duration + otel_payload = make_otel_span(start, end) + + # Unset name to validate transaction normalization + del otel_payload["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["name"] + + relay.send_otel_span(project_id, json=otel_payload) + + ingested, _ = transactions_consumer.get_event(timeout=5) + + # "<unlabeled transaction>" was set by normalization: + assert ingested["transaction"] == "<unlabeled transaction>" + + def test_span_no_extraction_with_metrics_summary( mini_sentry, relay_with_processing, From 0cf16a2295e852ffbaa14ab4a5cdabc2859c75e6 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 13:52:38 +0200 Subject: [PATCH 20/22] clean --- relay-sampling/src/evaluation.rs | 2 +- relay-server/src/services/processor.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index 067862b7088..1f97a75fc03 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -65,7 +65,7 @@ impl<'a> ReservoirEvaluator<'a> { } } - /// Gets shared ownership of the reference counters. + /// Gets shared ownership of the reservoir counters. pub fn counters(&self) -> ReservoirCounters { Arc::clone(&self.counters) } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index ff383583bf6..34199ffb5dd 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1225,8 +1225,8 @@ impl EnvelopeProcessorService { // not being available, envelopes can go to other PoP regions or // directly to processing relays. Events should be fully // normalized, independently of the ingestion path. - if dbg!(self.inner.config.processing_enabled()) - && (!dbg!(state.envelope().meta().is_from_internal_relay())) + if self.inner.config.processing_enabled() + && (!state.envelope().meta().is_from_internal_relay()) { true } else { From e0fae237367309fc323dfeee3fb5180d8044012a Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 14:05:24 +0200 Subject: [PATCH 21/22] fix: Get permit for spin-off envelope --- relay-server/src/service.rs | 7 ++-- relay-server/src/services/processor.rs | 7 +++- .../src/services/processor/span/processing.rs | 42 +++++++++++++------ relay-server/src/testutils.rs | 2 + 4 files changed, 41 insertions(+), 17 deletions(-) diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 0edc762b353..32ddd5b29a4 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -108,7 +108,7 @@ impl ServiceState { _ => None, }; - let buffer = Arc::new(BufferGuard::new(config.envelope_buffer_size())); + let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); // Create an address for the `EnvelopeProcessor`, which can be injected into the // other services. @@ -185,6 +185,7 @@ impl ServiceState { }, #[cfg(feature = "processing")] metric_stats, + buffer_guard.clone(), ) .spawn_handler(processor_rx); @@ -201,7 +202,7 @@ impl ServiceState { let guard = runtimes.project.enter(); ProjectCacheService::new( config.clone(), - buffer.clone(), + buffer_guard.clone(), project_cache_services, redis_pool, ) @@ -237,7 +238,7 @@ impl ServiceState { }; let state = StateInner { - buffer_guard: buffer, + buffer_guard, config, registry, }; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 34199ffb5dd..f56fd9f4eac 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -83,7 +83,7 @@ use crate::services::upstream::{ }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{ - self, ExtractionMode, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, + self, BufferGuard, ExtractionMode, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope, }; @@ -943,6 +943,8 @@ struct InnerProcessor { cardinality_limiter: Option<CardinalityLimiter>, #[cfg(feature = "processing")] metric_stats: MetricStats, + #[cfg(feature = "processing")] + buffer_guard: Arc<BufferGuard>, } impl EnvelopeProcessorService { @@ -954,6 +956,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] redis: Option<RedisPool>, addrs: Addrs, #[cfg(feature = "processing")] metric_stats: MetricStats, + #[cfg(feature = "processing")] buffer_guard: Arc<BufferGuard>, ) -> Self { let geoip_lookup = config.geoip_path().and_then(|p| { match GeoIpLookup::open(p).context(ServiceError::GeoIp) { @@ -996,6 +999,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] metric_stats, config, + buffer_guard, }; Self { @@ -1562,6 +1566,7 @@ impl EnvelopeProcessorService { self.inner.config.clone(), &global_config, &self.inner.addrs, + &self.inner.buffer_guard, ); self.enforce_quotas(state)?; }); diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 2d14a8d2a19..d5eda73d008 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -30,7 +30,7 @@ use crate::services::processor::{ TransactionGroup, }; use crate::statsd::{RelayCounters, RelayHistograms}; -use crate::utils::{sample, ItemAction, ManagedEnvelope}; +use crate::utils::{sample, BufferGuard, ItemAction}; use thiserror::Error; #[derive(Error, Debug)] @@ -42,6 +42,7 @@ pub fn process( config: Arc<Config>, global_config: &GlobalConfig, addrs: &Addrs, + buffer_guard: &BufferGuard, ) { use relay_event_normalization::RemoveOtherProcessor; @@ -201,20 +202,35 @@ pub fn process( transaction_count += 1; - addrs.envelope_processor.send(ProcessEnvelope { - envelope: ManagedEnvelope::standalone( - envelope, - addrs.outcome_aggregator.clone(), - addrs.test_store.clone(), - ProcessingGroup::Transaction, - ), - project_state: state.project_state.clone(), - sampling_project_state: state.sampling_project_state.clone(), - reservoir_counters: state.reservoir.counters(), - }); + let managed_envelope = buffer_guard.enter( + envelope, + addrs.outcome_aggregator.clone(), + addrs.test_store.clone(), + ProcessingGroup::Transaction, + ); + + match managed_envelope { + Ok(managed_envelope) => { + addrs.envelope_processor.send(ProcessEnvelope { + envelope: managed_envelope, + project_state: state.project_state.clone(), + sampling_project_state: state.sampling_project_state.clone(), + reservoir_counters: state.reservoir.counters(), + }); + } + Err(e) => { + relay_log::error!( + error = &e as &dyn Error, + "Failed to obtain permit for spinoff envelope:" + ); + } + } } Err(e) => { - relay_log::error!("Failed to create event envelope: {e}"); + relay_log::error!( + error = &e as &dyn Error, + "Failed to create spinoff envelope:" + ); } } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 219ef7edcf0..bfe5cd367c3 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -21,6 +21,7 @@ use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; use crate::services::project::ProjectState; use crate::services::test_store::TestStore; +use crate::utils::BufferGuard; pub fn state_with_rule_and_condition( sample_rate: Option<f64>, @@ -151,6 +152,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), aggregator, ), + Arc::new(BufferGuard::new(usize::MAX)), ) } From 6580ad8bea526a150ea8dc850e9f3486a67bfbf5 Mon Sep 17 00:00:00 2001 From: Joris Bayer <joris.bayer@sentry.io> Date: Thu, 11 Apr 2024 15:23:32 +0200 Subject: [PATCH 22/22] more fixes --- relay-server/src/envelope.rs | 3 +++ relay-server/src/service.rs | 1 + relay-server/src/services/processor.rs | 5 ++++- relay-server/src/services/processor/span/processing.rs | 2 ++ relay-server/src/statsd.rs | 4 ++++ relay-server/src/testutils.rs | 2 ++ tests/integration/test_spans.py | 2 +- 7 files changed, 17 insertions(+), 2 deletions(-) diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 53ed3d68e94..3dba72ef5ad 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -562,6 +562,9 @@ pub struct ItemHeaders { /// /// This header is set to `true` after both span extraction and span metrics extraction, /// and can be used to skip extraction. + /// + /// NOTE: This header is also set to `true` for transactions that are themselves extracted + /// from spans (the opposite direction), to prevent going in circles. #[serde(default, skip_serializing_if = "is_false")] spans_extracted: bool, diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 32ddd5b29a4..41f967bed1f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -185,6 +185,7 @@ impl ServiceState { }, #[cfg(feature = "processing")] metric_stats, + #[cfg(feature = "processing")] buffer_guard.clone(), ) .spawn_handler(processor_rx); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index f56fd9f4eac..f5bb1895b95 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -82,8 +82,10 @@ use crate::services::upstream::{ SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError, }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; +#[cfg(feature = "processing")] +use crate::utils::BufferGuard; use crate::utils::{ - self, BufferGuard, ExtractionMode, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, + self, ExtractionMode, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope, }; @@ -999,6 +1001,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] metric_stats, config, + #[cfg(feature = "processing")] buffer_guard, }; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index d5eda73d008..72b8e64828e 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -345,6 +345,8 @@ pub fn extract_from_event( } add_span(transaction_span.into()); + + state.spans_extracted = true; } /// Removes the transaction in case the project has made the transition to spans-only. diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 77a05f067d9..c69a24e2fbb 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -180,6 +180,7 @@ pub enum RelayHistograms { PartitionKeys, /// Measures how many transactions were created from segment spans in a single envelope. + #[cfg(feature = "processing")] TransactionsFromSpansPerEnvelope, } @@ -214,6 +215,7 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::UpstreamEnvelopeBodySize => "upstream.envelope.body_size", RelayHistograms::UpstreamMetricsBodySize => "upstream.metrics.body_size", RelayHistograms::PartitionKeys => "metrics.buckets.partition_keys", + #[cfg(feature = "processing")] RelayHistograms::TransactionsFromSpansPerEnvelope => { "transactions_from_spans_per_envelope" } @@ -649,6 +651,7 @@ pub enum RelayCounters { /// - `decision`: "drop" if dynamic sampling drops the envelope, else "keep". DynamicSamplingDecision, /// Counts how many transactions were created from segment spans. + #[cfg(feature = "processing")] TransactionsFromSpans, } @@ -688,6 +691,7 @@ impl CounterMetric for RelayCounters { RelayCounters::OpenTelemetryEvent => "event.opentelemetry", RelayCounters::GlobalConfigFetched => "global_config.fetch", RelayCounters::DynamicSamplingDecision => "dynamic_sampling_decision", + #[cfg(feature = "processing")] RelayCounters::TransactionsFromSpans => "transactions_from_spans", } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index bfe5cd367c3..ab1541d22d8 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -21,6 +21,7 @@ use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; use crate::services::project::ProjectState; use crate::services::test_store::TestStore; +#[cfg(feature = "processing")] use crate::utils::BufferGuard; pub fn state_with_rule_and_condition( @@ -152,6 +153,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), aggregator, ), + #[cfg(feature = "processing")] Arc::new(BufferGuard::new(usize::MAX)), ) } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index cf2c11cbd66..208d43ad8f5 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -933,7 +933,7 @@ def test_extracted_transaction_gets_normalized( relay.send_otel_span(project_id, json=otel_payload) - ingested, _ = transactions_consumer.get_event(timeout=5) + ingested, _ = transactions_consumer.get_event(timeout=10) # "<unlabeled transaction>" was set by normalization: assert ingested["transaction"] == "<unlabeled transaction>"