diff --git a/CHANGELOG.md b/CHANGELOG.md index eb63560a0a..21077c07e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - Stop extracting count_per_segment and count_per_op metrics. ([#3380](https://github.com/getsentry/relay/pull/3380)) - Add `cardinality_limited` outcome with id `6`. ([#3389](https://github.com/getsentry/relay/pull/3389)) - Extract `cache.item_size` and `cache.hit` metrics. ([#3371]https://github.com/getsentry/relay/pull/3371) +- Optionally convert segment spans to transactions for compatibility. ([#3375](https://github.com/getsentry/relay/pull/3375)) **Internal**: diff --git a/relay-dynamic-config/src/defaults.rs b/relay-dynamic-config/src/defaults.rs index 8b58feb2e5..ad82817e38 100644 --- a/relay-dynamic-config/src/defaults.rs +++ b/relay-dynamic-config/src/defaults.rs @@ -58,7 +58,11 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) { return; } - config.metrics.extend(span_metrics()); + config.metrics.extend(span_metrics( + project_config + .features + .has(Feature::ExtractTransactionFromSegmentSpan), + )); config._span_metrics_extended = true; if config.version == 0 { @@ -67,7 +71,13 @@ pub fn add_span_metrics(project_config: &mut ProjectConfig) { } /// Metrics with tags applied as required. -fn span_metrics() -> impl IntoIterator { +fn span_metrics(transaction_extraction_enabled: bool) -> impl IntoIterator { + let score_total_transaction_metric = if transaction_extraction_enabled { + RuleCondition::never() + } else { + RuleCondition::all() + }; + let is_db = RuleCondition::eq("span.sentry_tags.category", "db") & !(RuleCondition::eq("span.system", "mongodb") | RuleCondition::glob("span.op", DISABLED_DATABASES) @@ -471,7 +481,11 @@ fn span_metrics() -> impl IntoIterator { mri: "d:transactions/measurements.score.total@ratio".into(), field: Some("span.measurements.score.total.value".into()), condition: Some( - is_allowed_browser.clone() & RuleCondition::eq("span.was_transaction", false), + // If transactions are extracted from spans, the transaction processing pipeline + // will take care of this metric. + score_total_transaction_metric + & is_allowed_browser.clone() + & RuleCondition::eq("span.was_transaction", false), ), tags: vec![ Tag::with_key("span.op") diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index bc604b592d..89a2c7864e 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -81,6 +81,15 @@ pub enum Feature { #[serde(rename = "organizations:continuous-profiling")] ContinuousProfiling, + /// When enabled, every standalone segment span will be duplicated as a transaction. + /// + /// This allows support of product features that rely on transactions for SDKs that only + /// send spans. + /// + /// Serialized as `projects:extract-transaction-from-segment-span`. + #[serde(rename = "projects:extract-transaction-from-segment-span")] + ExtractTransactionFromSegmentSpan, + /// Deprecated, still forwarded for older downstream Relays. #[doc(hidden)] #[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")] diff --git a/relay-event-normalization/src/normalize/span/exclusive_time.rs b/relay-event-normalization/src/normalize/span/exclusive_time.rs index c8ef5dbcf1..aacba0797c 100644 --- a/relay-event-normalization/src/normalize/span/exclusive_time.rs +++ b/relay-event-normalization/src/normalize/span/exclusive_time.rs @@ -42,6 +42,11 @@ fn set_event_exclusive_time( return; }; + if trace_context.exclusive_time.value().is_some() { + // Exclusive time already set, respect it. + return; + } + let Some(span_id) = trace_context.span_id.value() else { return; }; diff --git a/relay-event-schema/src/protocol/span/convert.rs b/relay-event-schema/src/protocol/span/convert.rs index 72e0ca49b4..b03a5aa8d6 100644 --- a/relay-event-schema/src/protocol/span/convert.rs +++ b/relay-event-schema/src/protocol/span/convert.rs @@ -3,6 +3,7 @@ use crate::protocol::{ ContextInner, Contexts, DefaultContext, Event, ProfileContext, Span, TraceContext, }; +use relay_base_schema::events::EventType; use relay_protocol::Annotated; use std::collections::BTreeMap; @@ -47,7 +48,7 @@ macro_rules! map_fields { $(span.$fixed_span_field:ident <= $fixed_span_value:expr), * } fixed_for_event { - $($fixed_event_value:expr => span.$fixed_event_field:ident), * + $($fixed_event_value:expr => event.$fixed_event_field:ident), * } ) => { #[allow(clippy::needless_update)] @@ -78,9 +79,17 @@ macro_rules! map_fields { } #[allow(clippy::needless_update)] - impl From<&Span> for Event { - fn from(span: &Span) -> Self { - Self { + impl TryFrom<&Span> for Event { + type Error = (); + + fn try_from(span: &Span) -> Result { + use relay_protocol::Empty; + + if !span.is_segment.value().unwrap_or(&false) { + // Only segment spans can become transactions. + return Err(()); + } + let event = Self { $( $event_field: span.$span_field.clone().map_value(Into::into), )* @@ -88,21 +97,30 @@ macro_rules! map_fields { $fixed_event_field: $fixed_event_value.into(), )* contexts: Annotated::new( - Contexts( - BTreeMap::from([ + Contexts({ + let mut contexts = BTreeMap::new(); + $( + let mut context = $ContextType::default(); + let mut has_fields = false; $( - (<$ContextType as DefaultContext>::default_key().into(), ContextInner($ContextType { - $( - $context_field: span.$primary_span_field.clone(), - )* - ..Default::default() - }.into_context()).into()), + if !span.$primary_span_field.is_empty() { + context.$context_field = span.$primary_span_field.clone(); + has_fields = true; + } )* - ]), - ) + if has_fields { + let context_key = <$ContextType as DefaultContext>::default_key().into(); + contexts.insert(context_key, ContextInner(context.into_context()).into()); + } + )* + contexts + }) ), ..Default::default() - } + }; + + + Ok(event) } } }; @@ -141,7 +159,7 @@ map_fields!( span.was_transaction <= true } fixed_for_event { - // nothing yet + EventType::Transaction => event.ty } ); @@ -155,6 +173,7 @@ mod tests { fn roundtrip() { let event = Annotated::::from_json( r#"{ + "type": "transaction", "contexts": { "profile": {"profile_id": "a0aaaaaaaaaaaaaaaaaaaaaaaaaaaaab"}, "trace": { @@ -253,7 +272,20 @@ mod tests { } "###); - let roundtripped = Event::from(&span_from_event); + let roundtripped = Event::try_from(&span_from_event).unwrap(); assert_eq!(event, roundtripped); } + + #[test] + fn no_empty_profile_context() { + let span = Span { + is_segment: true.into(), + ..Default::default() + }; + let event = Event::try_from(&span).unwrap(); + + // No profile context is set. + // profile_id is required on ProfileContext so we should not create an empty one. + assert!(event.context::().is_none()); + } } diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index 5f2747d27d..1f97a75fc0 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -65,6 +65,11 @@ impl<'a> ReservoirEvaluator<'a> { } } + /// Gets shared ownership of the reservoir counters. + pub fn counters(&self) -> ReservoirCounters { + Arc::clone(&self.counters) + } + /// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`]. /// /// These values are needed to synchronize with Redis. diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 3050a43d9f..3dba72ef5a 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -42,7 +42,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use relay_dynamic_config::ErrorBoundary; use relay_event_normalization::{normalize_transaction_name, TransactionNameRule}; -use relay_event_schema::protocol::{EventId, EventType}; +use relay_event_schema::protocol::{Event, EventId, EventType}; use relay_protocol::{Annotated, Value}; use relay_quotas::DataCategory; use relay_sampling::DynamicSamplingContext; @@ -554,6 +554,20 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "is_false")] metrics_extracted: bool, + /// Whether or not a transaction has been extracted from a segment span. + #[serde(default, skip_serializing_if = "is_false")] + transaction_extracted: bool, + + /// Whether or not spans and span metrics have been extracted from a transaction. + /// + /// This header is set to `true` after both span extraction and span metrics extraction, + /// and can be used to skip extraction. + /// + /// NOTE: This header is also set to `true` for transactions that are themselves extracted + /// from spans (the opposite direction), to prevent going in circles. + #[serde(default, skip_serializing_if = "is_false")] + spans_extracted: bool, + /// `false` if the sampling decision is "drop". /// /// In the most common use case, the item is dropped when the sampling decision is "drop". @@ -628,6 +642,8 @@ impl Item { sample_rates: None, other: BTreeMap::new(), metrics_extracted: false, + transaction_extracted: false, + spans_extracted: false, sampled: true, }, payload: Bytes::new(), @@ -833,6 +849,26 @@ impl Item { self.headers.metrics_extracted = metrics_extracted; } + /// Returns the transaction extracted flag. + pub fn transaction_extracted(&self) -> bool { + self.headers.transaction_extracted + } + + /// Sets the transaction extracted flag. + pub fn set_transaction_extracted(&mut self, transaction_extracted: bool) { + self.headers.transaction_extracted = transaction_extracted; + } + + /// Returns the spans extracted flag. + pub fn spans_extracted(&self) -> bool { + self.headers.spans_extracted + } + + /// Sets the spans extracted flag. + pub fn set_spans_extracted(&mut self, spans_extracted: bool) { + self.headers.spans_extracted = spans_extracted; + } + /// Gets the `sampled` flag. pub fn sampled(&self) -> bool { self.headers.sampled @@ -1060,6 +1096,21 @@ impl Envelope { Box::new(Self { items, headers }) } + /// Creates an envelope from headers and an envelope. + pub fn try_from_event( + mut headers: EnvelopeHeaders, + event: Event, + ) -> Result, serde_json::Error> { + headers.event_id = event.id.value().copied(); + let event_type = event.ty.value().copied().unwrap_or_default(); + + let serialized = Annotated::new(event).to_json()?; + let mut item = Item::new(ItemType::from_event_type(event_type)); + item.set_payload(ContentType::Json, serialized); + + Ok(Self::from_parts(headers, smallvec::smallvec![item])) + } + /// Creates an envelope from request information. pub fn from_request(event_id: Option, meta: RequestMeta) -> Box { Box::new(Self { diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 86b6d75ba8..605d9922ff 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -316,6 +316,11 @@ impl RequestMeta { pub fn is_from_internal_relay(&self) -> bool { self.from_internal_relay } + + /// Overwrite internal property. + pub fn set_from_internal_relay(&mut self, value: bool) { + self.from_internal_relay = value; + } } impl RequestMeta { diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index 1b380d83b7..feba3fea3b 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -47,31 +47,41 @@ impl Extractable for Span { /// If this is a transaction event with spans, metrics will also be extracted from the spans. pub fn extract_metrics( event: &Event, + spans_extracted: bool, config: &MetricExtractionConfig, max_tag_value_size: usize, span_extraction_sample_rate: Option, ) -> Vec { let mut metrics = generic::extract_metrics(event, config); - if !sample(span_extraction_sample_rate.unwrap_or(1.0)) { - return metrics; + // If spans were already extracted for an event, + // we rely on span processing to extract metrics. + if !spans_extracted && sample(span_extraction_sample_rate.unwrap_or(1.0)) { + extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics); } + metrics +} + +fn extract_span_metrics_for_event( + event: &Event, + config: &MetricExtractionConfig, + max_tag_value_size: usize, + output: &mut Vec, +) { relay_statsd::metric!(timer(RelayTimers::EventProcessingSpanMetricsExtraction), { if let Some(transaction_span) = extract_transaction_span(event, max_tag_value_size) { - metrics.extend(generic::extract_metrics(&transaction_span, config)); + output.extend(generic::extract_metrics(&transaction_span, config)); } if let Some(spans) = event.spans.value() { for annotated_span in spans { if let Some(span) = annotated_span.value() { - metrics.extend(generic::extract_metrics(span, config)); + output.extend(generic::extract_metrics(span, config)); } } } }); - - metrics } #[cfg(test)] @@ -1037,7 +1047,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200, None); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200, None); insta::assert_debug_snapshot!(metrics); } @@ -1172,7 +1182,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200, None); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200, None); insta::assert_debug_snapshot!((&event.value().unwrap().spans, metrics)); } @@ -1233,7 +1243,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200, None); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200, None); // When transaction.op:ui.load and mobile:true, HTTP spans still get both // exclusive_time metrics: @@ -1269,7 +1279,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200, None); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200, None); let usage_metrics = metrics .into_iter() @@ -1495,7 +1505,7 @@ mod tests { project.sanitize(); let config = project.metric_extraction.ok().unwrap(); - let metrics = extract_metrics(event.value().unwrap(), &config, 200, None); + let metrics = extract_metrics(event.value().unwrap(), false, &config, 200, None); assert_eq!(metrics.len(), 4); assert_eq!(&*metrics[0].name, "c:spans/usage@none"); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index e144a79d84..41f967bed1 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -108,7 +108,7 @@ impl ServiceState { _ => None, }; - let buffer = Arc::new(BufferGuard::new(config.envelope_buffer_size())); + let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size())); // Create an address for the `EnvelopeProcessor`, which can be injected into the // other services. @@ -173,6 +173,7 @@ impl ServiceState { #[cfg(feature = "processing")] redis_pool.clone(), processor::Addrs { + envelope_processor: processor.clone(), project_cache: project_cache.clone(), outcome_aggregator: outcome_aggregator.clone(), upstream_relay: upstream_relay.clone(), @@ -184,6 +185,8 @@ impl ServiceState { }, #[cfg(feature = "processing")] metric_stats, + #[cfg(feature = "processing")] + buffer_guard.clone(), ) .spawn_handler(processor_rx); @@ -200,7 +203,7 @@ impl ServiceState { let guard = runtimes.project.enter(); ProjectCacheService::new( config.clone(), - buffer.clone(), + buffer_guard.clone(), project_cache_services, redis_pool, ) @@ -236,7 +239,7 @@ impl ServiceState { }; let state = StateInner { - buffer_guard: buffer, + buffer_guard, config, registry, }; diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 7773f2c022..f5bb1895b9 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -82,6 +82,8 @@ use crate::services::upstream::{ SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError, }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; +#[cfg(feature = "processing")] +use crate::utils::BufferGuard; use crate::utils::{ self, ExtractionMode, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope, @@ -572,6 +574,11 @@ struct ProcessEnvelopeState<'a, Group> { /// Track whether transaction metrics were already extracted. event_metrics_extracted: bool, + /// Track whether spans and span metrics were already extracted. + /// + /// Only applies to envelopes with a transaction item. + spans_extracted: bool, + /// Partial metrics of the Event during construction. /// /// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are @@ -911,6 +918,7 @@ pub struct EnvelopeProcessorService { /// Contains the addresses of services that the processor publishes to. pub struct Addrs { + pub envelope_processor: Addr, pub project_cache: Addr, pub outcome_aggregator: Addr, #[cfg(feature = "processing")] @@ -937,6 +945,8 @@ struct InnerProcessor { cardinality_limiter: Option, #[cfg(feature = "processing")] metric_stats: MetricStats, + #[cfg(feature = "processing")] + buffer_guard: Arc, } impl EnvelopeProcessorService { @@ -948,6 +958,7 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] redis: Option, addrs: Addrs, #[cfg(feature = "processing")] metric_stats: MetricStats, + #[cfg(feature = "processing")] buffer_guard: Arc, ) -> Self { let geoip_lookup = config.geoip_path().and_then(|p| { match GeoIpLookup::open(p).context(ServiceError::GeoIp) { @@ -990,6 +1001,8 @@ impl EnvelopeProcessorService { #[cfg(feature = "processing")] metric_stats, config, + #[cfg(feature = "processing")] + buffer_guard, }; Self { @@ -1059,6 +1072,7 @@ impl EnvelopeProcessorService { ProcessEnvelopeState { event: Annotated::empty(), event_metrics_extracted: false, + spans_extracted: false, metrics: Metrics::default(), sample_rates: None, extracted_metrics: Default::default(), @@ -1157,6 +1171,7 @@ impl EnvelopeProcessorService { if let Some(config) = config { let metrics = crate::metrics_extraction::event::extract_metrics( event, + state.spans_extracted, config, self.inner .config @@ -1218,10 +1233,11 @@ impl EnvelopeProcessorService { // directly to processing relays. Events should be fully // normalized, independently of the ingestion path. if self.inner.config.processing_enabled() - && !state.envelope().meta().is_from_internal_relay() + && (!state.envelope().meta().is_from_internal_relay()) { true } else { + relay_log::trace!("Skipping event normalization"); return Ok(()); } } @@ -1544,11 +1560,16 @@ impl EnvelopeProcessorService { state: &mut ProcessEnvelopeState, ) -> Result<(), ProcessingError> { span::filter(state); + if_processing!(self.inner.config, { + let global_config = self.inner.global_config.current(); + span::process( state, self.inner.config.clone(), - &self.inner.global_config.current(), + &global_config, + &self.inner.addrs, + &self.inner.buffer_guard, ); self.enforce_quotas(state)?; }); diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index 93e69ab6f0..a59d0d0550 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -497,6 +497,7 @@ mod tests { profile_id: None, event_metrics_extracted: false, reservoir: dummy_reservoir(), + spans_extracted: false, } }; diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 5e6565c606..63dfcbb450 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -80,6 +80,7 @@ pub fn extract( relay_log::trace!("processing json transaction"); sample_rates = item.take_sample_rates(); state.event_metrics_extracted = item.metrics_extracted(); + state.spans_extracted = item.spans_extracted(); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Transaction items can only contain transaction events. Force the event type to // hint to normalization that we're dealing with a transaction now. @@ -383,6 +384,9 @@ pub fn serialize( // If transaction metrics were extracted, set the corresponding item header event_item.set_metrics_extracted(state.event_metrics_extracted); + // TODO: The state should simply maintain & update an `ItemHeaders` object. + event_item.set_spans_extracted(state.spans_extracted); + // If there are sample rates, write them back to the envelope. In processing mode, sample // rates have been removed from the state and burnt into the event via `finalize_event`. if let Some(sample_rates) = state.sample_rates.take() { diff --git a/relay-server/src/services/processor/span.rs b/relay-server/src/services/processor/span.rs index b2fe29691d..58f2e57052 100644 --- a/relay-server/src/services/processor/span.rs +++ b/relay-server/src/services/processor/span.rs @@ -6,7 +6,7 @@ use relay_event_schema::protocol::{Event, Span}; use relay_protocol::Annotated; use crate::services::processor::SpanGroup; -use crate::{envelope::ItemType, services::processor::ProcessEnvelopeState, utils::ItemAction}; +use crate::{services::processor::ProcessEnvelopeState, utils::ItemAction}; #[cfg(feature = "processing")] mod processing; @@ -14,19 +14,16 @@ mod processing; pub use processing::*; pub fn filter(state: &mut ProcessEnvelopeState) { - let standalone_span_ingestion_enabled = state + let standalone_span_ingestion_disabled = !state .project_state .has_feature(Feature::StandaloneSpanIngestion); - state.managed_envelope.retain_items(|item| match item.ty() { - ItemType::OtelSpan | ItemType::Span => { - if !standalone_span_ingestion_enabled { - relay_log::warn!("dropping span because feature is disabled"); - ItemAction::DropSilently - } else { - ItemAction::Keep - } + state.managed_envelope.retain_items(|item| { + if item.is_span() && standalone_span_ingestion_disabled { + relay_log::warn!("dropping span because feature is disabled"); + ItemAction::DropSilently + } else { + ItemAction::Keep } - _ => ItemAction::Keep, }); } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 9465c59017..72b8e64828 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -14,21 +14,23 @@ use relay_event_normalization::{ PerformanceScoreConfig, RawUserAgentInfo, TransactionsProcessor, }; use relay_event_schema::processor::{process_value, ProcessingState}; -use relay_event_schema::protocol::{BrowserContext, Contexts, Event, Span, SpanData}; +use relay_event_schema::protocol::{BrowserContext, Contexts, Event, EventId, Span, SpanData}; use relay_log::protocol::{Attachment, AttachmentType}; use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp}; use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty}; use relay_spans::{otel_to_sentry_span, otel_trace::Span as OtelSpan}; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::{ContentType, Envelope, Item, ItemType}; use crate::metrics_extraction::generic::extract_metrics; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ - ProcessEnvelopeState, ProcessingError, SpanGroup, TransactionGroup, + Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup, + TransactionGroup, }; -use crate::utils::{sample, ItemAction}; +use crate::statsd::{RelayCounters, RelayHistograms}; +use crate::utils::{sample, BufferGuard, ItemAction}; use thiserror::Error; #[derive(Error, Debug)] @@ -39,6 +41,8 @@ pub fn process( state: &mut ProcessEnvelopeState, config: Arc, global_config: &GlobalConfig, + addrs: &Addrs, + buffer_guard: &BufferGuard, ) { use relay_event_normalization::RemoveOtherProcessor; @@ -67,6 +71,11 @@ pub fn process( &user_agent_info, ); + let mut extracted_transactions = vec![]; + let should_extract_transactions = state + .project_state + .has_feature(Feature::ExtractTransactionFromSegmentSpan); + state.managed_envelope.retain_items(|item| { let mut annotated_span = match item.ty() { ItemType::OtelSpan => match serde_json::from_slice::(&item.payload()) { @@ -87,6 +96,15 @@ pub fn process( _ => return ItemAction::Keep, }; + set_segment_attributes(&mut annotated_span); + + if should_extract_transactions && !item.transaction_extracted() { + if let Some(transaction) = convert_to_transaction(&annotated_span) { + extracted_transactions.push(transaction); + item.set_transaction_extracted(true); + } + } + if let Err(e) = normalize( &mut annotated_span, normalize_span_config.clone(), @@ -101,6 +119,7 @@ pub fn process( let Some(span) = annotated_span.value_mut() else { return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; + relay_log::trace!("Extracting metrics from standalone span {:?}", span.span_id); let metrics = extract_metrics(span, config); state.extracted_metrics.project_metrics.extend(metrics); item.set_metrics_extracted(true); @@ -163,6 +182,65 @@ pub fn process( ItemAction::Keep }); + + let mut transaction_count = 0; + for mut transaction in extracted_transactions { + // Give each transaction event a new random ID: + transaction.id = EventId::new().into(); + + // Enqueue a full processing request for every extracted transaction item. + match Envelope::try_from_event(state.envelope().headers().clone(), transaction) { + Ok(mut envelope) => { + // In order to force normalization, treat as external: + envelope.meta_mut().set_from_internal_relay(false); + + // We don't want to extract spans or span metrics from a transaction extracted from spans, + // so set the spans_extracted flag: + for item in envelope.items_mut() { + item.set_spans_extracted(true); + } + + transaction_count += 1; + + let managed_envelope = buffer_guard.enter( + envelope, + addrs.outcome_aggregator.clone(), + addrs.test_store.clone(), + ProcessingGroup::Transaction, + ); + + match managed_envelope { + Ok(managed_envelope) => { + addrs.envelope_processor.send(ProcessEnvelope { + envelope: managed_envelope, + project_state: state.project_state.clone(), + sampling_project_state: state.sampling_project_state.clone(), + reservoir_counters: state.reservoir.counters(), + }); + } + Err(e) => { + relay_log::error!( + error = &e as &dyn Error, + "Failed to obtain permit for spinoff envelope:" + ); + } + } + } + Err(e) => { + relay_log::error!( + error = &e as &dyn Error, + "Failed to create spinoff envelope:" + ); + } + } + } + + if transaction_count > 0 { + relay_statsd::metric!(counter(RelayCounters::TransactionsFromSpans) += transaction_count); + relay_statsd::metric!( + histogram(RelayHistograms::TransactionsFromSpansPerEnvelope) = transaction_count as u64 + ); + } } pub fn extract_from_event( @@ -175,6 +253,10 @@ pub fn extract_from_event( return; }; + if state.spans_extracted { + return; + } + if !state .project_state .has_feature(Feature::ExtractSpansAndSpanMetricsFromEvent) @@ -263,6 +345,8 @@ pub fn extract_from_event( } add_span(transaction_span.into()); + + state.spans_extracted = true; } /// Removes the transaction in case the project has made the transition to spans-only. @@ -327,6 +411,19 @@ fn get_normalize_span_config<'a>( } } +fn set_segment_attributes(span: &mut Annotated) { + let Some(span) = span.value_mut() else { return }; + + // TODO: A span might be a segment span even if the parent_id is not empty + // (parent within a trace). I.e. do not overwrite here. + let is_segment = span.parent_span_id.is_empty(); + + span.is_segment = Annotated::new(is_segment); + if is_segment { + span.segment_id = span.span_id.clone(); + } +} + /// Normalizes a standalone span. fn normalize( annotated_span: &mut Annotated, @@ -394,14 +491,8 @@ fn normalize( ); } - let is_segment = span.parent_span_id.is_empty(); - span.is_segment = Annotated::new(is_segment); span.received = Annotated::new(received_at.into()); - if is_segment { - span.segment_id = span.span_id.clone(); - } - if let Some(transaction) = span .data .value_mut() @@ -532,6 +623,12 @@ fn validate(span: &mut Annotated) -> Result<(), ValidationError> { Ok(()) } +fn convert_to_transaction(annotated_span: &Annotated) -> Option { + let span = annotated_span.value()?; + relay_log::trace!("Extracting transaction for span {:?}", &span.span_id); + Event::try_from(span).ok() +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -603,6 +700,7 @@ mod tests { managed_envelope: managed_envelope.try_into().unwrap(), profile_id: None, event_metrics_extracted: false, + spans_extracted: false, reservoir: ReservoirEvaluator::new(ReservoirCounters::default()), } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 93edbd4cd4..c69a24e2fb 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -178,6 +178,10 @@ pub enum RelayHistograms { /// The distribution of buckets should be even. /// If it is not, this metric should expose it. PartitionKeys, + + /// Measures how many transactions were created from segment spans in a single envelope. + #[cfg(feature = "processing")] + TransactionsFromSpansPerEnvelope, } impl HistogramMetric for RelayHistograms { @@ -211,6 +215,10 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::UpstreamEnvelopeBodySize => "upstream.envelope.body_size", RelayHistograms::UpstreamMetricsBodySize => "upstream.metrics.body_size", RelayHistograms::PartitionKeys => "metrics.buckets.partition_keys", + #[cfg(feature = "processing")] + RelayHistograms::TransactionsFromSpansPerEnvelope => { + "transactions_from_spans_per_envelope" + } } } } @@ -642,6 +650,9 @@ pub enum RelayCounters { /// This metric is tagged with: /// - `decision`: "drop" if dynamic sampling drops the envelope, else "keep". DynamicSamplingDecision, + /// Counts how many transactions were created from segment spans. + #[cfg(feature = "processing")] + TransactionsFromSpans, } impl CounterMetric for RelayCounters { @@ -680,6 +691,8 @@ impl CounterMetric for RelayCounters { RelayCounters::OpenTelemetryEvent => "event.opentelemetry", RelayCounters::GlobalConfigFetched => "global_config.fetch", RelayCounters::DynamicSamplingDecision => "dynamic_sampling_decision", + #[cfg(feature = "processing")] + RelayCounters::TransactionsFromSpans => "transactions_from_spans", } } } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index 47cb68e8df..ab1541d22d 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -21,6 +21,8 @@ use crate::services::outcome::TrackOutcome; use crate::services::processor::{self, EnvelopeProcessorService}; use crate::services::project::ProjectState; use crate::services::test_store::TestStore; +#[cfg(feature = "processing")] +use crate::utils::BufferGuard; pub fn state_with_rule_and_condition( sample_rate: Option, @@ -135,6 +137,7 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { #[cfg(feature = "processing")] redis, processor::Addrs { + envelope_processor: Addr::dummy(), outcome_aggregator, project_cache, upstream_relay, @@ -150,6 +153,8 @@ pub fn create_test_processor(config: Config) -> EnvelopeProcessorService { GlobalConfigHandle::fixed(Default::default()), aggregator, ), + #[cfg(feature = "processing")] + Arc::new(BufferGuard::new(usize::MAX)), ) } diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 140f3ea314..208d43ad8f 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -314,6 +314,12 @@ def envelope_with_spans( "timestamp": end.timestamp() + 1, "exclusive_time": 345.0, # The SDK knows that this span has a lower exclusive time "trace_id": "ff62a8b040f340bda5d830223def1d81", + "measurements": { + "score.total": {"unit": "ratio", "value": 0.12121616}, + }, + "data": { + "browser.name": "Chrome", + }, }, ).encode() ), @@ -360,14 +366,50 @@ def envelope_with_spans( return envelope +def make_otel_span(start, end): + return { + "resourceSpans": [ + { + "scopeSpans": [ + { + "spans": [ + { + "traceId": "89143b0763095bd9c9955e8175d1fb24", + "spanId": "d342abb1214ca182", + "name": "my 2nd OTel span", + "startTimeUnixNano": int(start.timestamp() * 1e9), + "endTimeUnixNano": int(end.timestamp() * 1e9), + "attributes": [ + { + "key": "sentry.exclusive_time_ns", + "value": { + "intValue": int( + (end - start).total_seconds() * 1e9 + ), + }, + }, + ], + }, + ], + }, + ], + }, + ], + } + + +@pytest.mark.parametrize("extract_transaction", [False, True]) def test_span_ingestion( mini_sentry, relay_with_processing, spans_consumer, metrics_consumer, + transactions_consumer, + extract_transaction, ): spans_consumer = spans_consumer() metrics_consumer = metrics_consumer() + transactions_consumer = transactions_consumer() relay = relay_with_processing( options={ @@ -384,8 +426,12 @@ def test_span_ingestion( project_config["config"]["features"] = [ "organizations:standalone-span-ingestion", "projects:span-metrics-extraction", - "projects:span-metrics-extraction-all-modules", ] + project_config["config"]["transactionMetrics"] = {"version": 1} + if extract_transaction: + project_config["config"]["features"].append( + "projects:extract-transaction-from-segment-span" + ) duration = timedelta(milliseconds=500) end = datetime.now(timezone.utc) - timedelta(seconds=1) @@ -393,45 +439,24 @@ def test_span_ingestion( # 1 - Send OTel span and sentry span via envelope envelope = envelope_with_spans(start, end) - relay.send_envelope(project_id, envelope) + relay.send_envelope( + project_id, + envelope, + headers={ # Set browser header to verify that `d:transactions/measurements.score.total@ratio` is extracted only once. + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" + }, + ) # 2 - Send OTel json span via endpoint relay.send_otel_span( project_id, - json={ - "resourceSpans": [ - { - "scopeSpans": [ - { - "spans": [ - { - "traceId": "89143b0763095bd9c9955e8175d1fb24", - "spanId": "d342abb1214ca182", - "name": "my 2nd OTel span", - "startTimeUnixNano": int(start.timestamp() * 1e9), - "endTimeUnixNano": int(end.timestamp() * 1e9), - "attributes": [ - { - "key": "sentry.exclusive_time_ns", - "value": { - "intValue": int( - duration.total_seconds() * 1e9 - ), - }, - }, - ], - }, - ], - }, - ], - }, - ], - }, + json=make_otel_span(start, end), ) protobuf_span = Span( trace_id=bytes.fromhex("89143b0763095bd9c9955e8175d1fb24"), span_id=bytes.fromhex("f0b809703e783d00"), + parent_span_id=bytes.fromhex("f0f0f0abcdef1234"), name="my 3rd protobuf OTel span", start_time_unix_nano=int(start.timestamp() * 1e9), end_time_unix_nano=int(end.timestamp() * 1e9), @@ -474,7 +499,7 @@ def test_span_ingestion( "retention_days": 90, "segment_id": "a342abb1214ca181", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "category": "db", "op": "db.query", }, @@ -487,12 +512,13 @@ def test_span_ingestion( "duration_ms": 1500, "exclusive_time_ms": 345.0, "is_segment": True, + "measurements": {"score.total": {"value": 0.12121616}}, "organization_id": 1, "project_id": 42, "retention_days": 90, "segment_id": "bd429c44b67a3eb1", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "category": "resource", "description": "https://example.com/*/blah.js", "domain": "example.com", @@ -513,7 +539,7 @@ def test_span_ingestion( "project_id": 42, "retention_days": 90, "segment_id": "cd429c44b67a3eb1", - "sentry_tags": {"browser.name": "Python Requests", "op": "default"}, + "sentry_tags": {"browser.name": "Chrome", "op": "default"}, "span_id": "cd429c44b67a3eb1", "start_timestamp_ms": int(start.timestamp() * 1e3), "trace_id": "ff62a8b040f340bda5d830223def1d81", @@ -545,7 +571,7 @@ def test_span_ingestion( "retention_days": 90, "segment_id": "ed429c44b67a3eb1", "sentry_tags": { - "browser.name": "Python Requests", + "browser.name": "Chrome", "op": "default", }, "span_id": "ed429c44b67a3eb1", @@ -556,12 +582,11 @@ def test_span_ingestion( "description": "my 3rd protobuf OTel span", "duration_ms": 500, "exclusive_time_ms": 500.0, - "is_segment": True, + "is_segment": False, "organization_id": 1, - "parent_span_id": "", + "parent_span_id": "f0f0f0abcdef1234", "project_id": 42, "retention_days": 90, - "segment_id": "f0b809703e783d00", "sentry_tags": {"browser.name": "Python Requests", "op": "default"}, "span_id": "f0b809703e783d00", "start_timestamp_ms": int(start.timestamp() * 1e3), @@ -569,6 +594,29 @@ def test_span_ingestion( }, ] + spans_consumer.assert_empty() + + # If transaction extraction is enabled, expect transactions: + if extract_transaction: + expected_transactions = 5 + + transactions = [ + transactions_consumer.get_event()[0] for _ in range(expected_transactions) + ] + + assert len(transactions) == expected_transactions + for transaction in transactions: + # Not checking all individual fields here, most should be tested in convert.rs + + # SDK gets taken from the header: + if sdk := transaction.get("sdk"): + assert sdk == {"name": "raven-node", "version": "2.6.3"} + + # No errors during normalization: + assert not transaction.get("errors") + + transactions_consumer.assert_empty() + metrics = [metric for (metric, _headers) in metrics_consumer.get_metrics()] metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"])) for metric in metrics: @@ -578,8 +626,7 @@ def test_span_ingestion( pass expected_timestamp = int(end.timestamp()) - - assert metrics == [ + expected_span_metrics = [ { "name": "c:spans/usage@none", "org_id": 1, @@ -717,7 +764,52 @@ def test_span_ingestion( "type": "d", "value": [500.0], }, + { + "name": "d:spans/webvital.score.total@ratio", + "org_id": 1, + "project_id": 42, + "retention_days": 90, + "tags": {"span.op": "resource.script"}, + "timestamp": expected_timestamp + 1, + "type": "d", + "value": [0.12121616], + }, + ] + assert [m for m in metrics if ":spans/" in m["name"]] == expected_span_metrics + + transaction_duration_metrics = [ + m for m in metrics if m["name"] == "d:transactions/duration@millisecond" + ] + + if extract_transaction: + assert { + (m["name"], m["tags"]["transaction"]) for m in transaction_duration_metrics + } == { + ("d:transactions/duration@millisecond", ""), + ("d:transactions/duration@millisecond", "https://example.com/p/blah.js"), + ("d:transactions/duration@millisecond", "my 1st OTel span"), + ("d:transactions/duration@millisecond", "my 2nd OTel span"), + ( + "d:transactions/duration@millisecond", + 'test \\" with \\" escaped \\" chars', + ), + } + # Make sure we're not double-reporting: + for m in transaction_duration_metrics: + assert len(m["value"]) == 1 + else: + assert len(transaction_duration_metrics) == 0 + + # Regardless of whether transactions are extracted, score.total is only converted to a transaction metric once: + score_total_metrics = [ + m + for m in metrics + if m["name"] == "d:transactions/measurements.score.total@ratio" ] + assert len(score_total_metrics) == 1, score_total_metrics + assert len(score_total_metrics[0]["value"]) == 1 + + metrics_consumer.assert_empty() def test_span_extraction_with_metrics_summary( @@ -793,6 +885,60 @@ def test_span_extraction_with_metrics_summary( assert metrics_summary["mri"] == mri +def test_extracted_transaction_gets_normalized( + mini_sentry, transactions_consumer, relay_with_processing, relay, relay_credentials +): + """When normalization in processing relays has been disabled, an extracted + transaction still gets normalized. + + This test was copied and adapted from test_store::test_relay_chain_normalization + + """ + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:standalone-span-ingestion", + "projects:extract-transaction-from-segment-span", + ] + + transactions_consumer = transactions_consumer() + + credentials = relay_credentials() + processing = relay_with_processing( + static_relays={ + credentials["id"]: { + "public_key": credentials["public_key"], + "internal": True, + }, + }, + options={"processing": {"normalize": "disabled"}}, + ) + relay = relay( + processing, + credentials=credentials, + options={ + "processing": { + "normalize": "full", + } + }, + ) + + duration = timedelta(milliseconds=500) + end = datetime.now(timezone.utc) - timedelta(seconds=1) + start = end - duration + otel_payload = make_otel_span(start, end) + + # Unset name to validate transaction normalization + del otel_payload["resourceSpans"][0]["scopeSpans"][0]["spans"][0]["name"] + + relay.send_otel_span(project_id, json=otel_payload) + + ingested, _ = transactions_consumer.get_event(timeout=10) + + # "" was set by normalization: + assert ingested["transaction"] == "" + + def test_span_no_extraction_with_metrics_summary( mini_sentry, relay_with_processing,