diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a0358ffcda..e0d10983e63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Proactively move on-disk spool to memory. ([#2949](https://github.com/getsentry/relay/pull/2949)) - Default missing `Event.platform` and `Event.level` fields during light normalization. ([#2961](https://github.com/getsentry/relay/pull/2961)) +- Copy event measurements to span & normalize span measurements. ([#2953](https://github.com/getsentry/relay/pull/2953)) - Add possiblity to block metrics with glob-patterns. ([#2954](https://github.com/getsentry/relay/pull/2954)) **Bug Fixes**: diff --git a/relay-event-normalization/src/event.rs b/relay-event-normalization/src/event.rs index 7d836571841..ba9dbbbce83 100644 --- a/relay-event-normalization/src/event.rs +++ b/relay-event-normalization/src/event.rs @@ -20,7 +20,7 @@ use relay_event_schema::processor::{ use relay_event_schema::protocol::{ AsPair, Context, ContextInner, Contexts, DeviceClass, Event, EventType, Exception, Headers, IpAddr, Level, LogEntry, Measurement, Measurements, NelContext, Request, SpanAttribute, - SpanStatus, Tags, User, + SpanStatus, Tags, Timestamp, User, }; use relay_protocol::{Annotated, Empty, Error, ErrorKind, Meta, Object, Value}; use smallvec::SmallVec; @@ -262,7 +262,7 @@ fn normalize(event: &mut Event, meta: &mut Meta, config: &NormalizationConfig) - normalize_stacktraces(event); normalize_exceptions(event); // Browser extension filters look at the stacktrace normalize_user_agent(event, config.normalize_user_agent); // Legacy browsers filter - normalize_measurements( + normalize_event_measurements( event, config.measurements.clone(), config.max_name_and_unit_len, @@ -693,8 +693,8 @@ fn normalize_user_agent(_event: &mut Event, normalize_user_agent: Option<bool>) } } -/// Ensure measurements interface is only present for transaction events. -fn normalize_measurements( +/// Ensures measurements interface is only present for transaction events. +fn normalize_event_measurements( event: &mut Event, measurements_config: Option<DynamicMeasurementsConfig>, max_mri_len: Option<usize>, @@ -703,19 +703,38 @@ fn normalize_measurements( // Only transaction events may have a measurements interface event.measurements = Annotated::empty(); } else if let Annotated(Some(ref mut measurements), ref mut meta) = event.measurements { - normalize_mobile_measurements(measurements); - normalize_units(measurements); - if let Some(measurements_config) = measurements_config { - remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len); - } - - let duration_millis = match (event.start_timestamp.0, event.timestamp.0) { - (Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start), - _ => 0.0, - }; + normalize_measurements( + measurements, + meta, + measurements_config, + max_mri_len, + event.start_timestamp.0, + event.timestamp.0, + ); + } +} - compute_measurements(duration_millis, measurements); +/// Ensure only valid measurements are ingested. +pub fn normalize_measurements( + measurements: &mut Measurements, + meta: &mut Meta, + measurements_config: Option<DynamicMeasurementsConfig>, + max_mri_len: Option<usize>, + start_timestamp: Option<Timestamp>, + end_timestamp: Option<Timestamp>, +) { + normalize_mobile_measurements(measurements); + normalize_units(measurements); + if let Some(measurements_config) = measurements_config { + remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len); } + + let duration_millis = match (start_timestamp, end_timestamp) { + (Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start), + _ => 0.0, + }; + + compute_measurements(duration_millis, measurements); } /// Computes performance score measurements. @@ -1188,7 +1207,7 @@ mod tests { let mut event = Annotated::<Event>::from_json(json).unwrap().0.unwrap(); - normalize_measurements(&mut event, None, None); + normalize_event_measurements(&mut event, None, None); insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#" { @@ -1260,7 +1279,7 @@ mod tests { let dynamic_measurement_config = DynamicMeasurementsConfig::new(Some(&project_measurement_config), None); - normalize_measurements(&mut event, Some(dynamic_measurement_config), None); + normalize_event_measurements(&mut event, Some(dynamic_measurement_config), None); // Only two custom measurements are retained, in alphabetic order (1 and 2) insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#" diff --git a/relay-event-normalization/src/lib.rs b/relay-event-normalization/src/lib.rs index 83d319de94c..87a5431d48d 100644 --- a/relay-event-normalization/src/lib.rs +++ b/relay-event-normalization/src/lib.rs @@ -35,7 +35,7 @@ mod transactions; mod trimming; pub mod replay; -pub use event::{normalize_event, NormalizationConfig}; +pub use event::{normalize_event, normalize_measurements, NormalizationConfig}; pub use normalize::breakdowns::*; pub use normalize::*; pub use remove_other::RemoveOtherProcessor; diff --git a/relay-event-schema/src/protocol/span.rs b/relay-event-schema/src/protocol/span.rs index 6541d4f01bc..7c04fb7a3d3 100644 --- a/relay-event-schema/src/protocol/span.rs +++ b/relay-event-schema/src/protocol/span.rs @@ -97,23 +97,24 @@ pub struct Span { impl From<&Event> for Span { fn from(event: &Event) -> Self { let mut span = Self { + _metrics_summary: event._metrics_summary.clone(), description: event.transaction.clone(), is_segment: Some(true).into(), received: event.received.clone(), start_timestamp: event.start_timestamp.clone(), timestamp: event.timestamp.clone(), - _metrics_summary: event._metrics_summary.clone(), + measurements: event.measurements.clone(), ..Default::default() }; if let Some(trace_context) = event.context::<TraceContext>().cloned() { span.exclusive_time = trace_context.exclusive_time; span.op = trace_context.op; - span.span_id = trace_context.span_id; span.parent_span_id = trace_context.parent_span_id; - span.trace_id = trace_context.trace_id; - span.segment_id = span.span_id.clone(); // a transaction is a segment + span.segment_id = trace_context.span_id.clone(); // a transaction is a segment + span.span_id = trace_context.span_id; span.status = trace_context.status; + span.trace_id = trace_context.trace_id; } if let Some(profile_context) = event.context::<ProfileContext>() { diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 2d4cce04b9a..d4aa1a77b2c 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1254,7 +1254,11 @@ impl EnvelopeProcessorService { span::filter(state); if_processing!(self.inner.config, { self.enforce_quotas(state)?; - span::process(state, self.inner.config.clone()); + span::process( + state, + self.inner.config.clone(), + self.inner.global_config.current().measurements.as_ref(), + ); }); Ok(()) } @@ -1318,7 +1322,11 @@ impl EnvelopeProcessorService { self.enforce_quotas(state)?; profile::process(state, &self.inner.config); self.process_check_ins(state); - span::process(state, self.inner.config.clone()); + span::process( + state, + self.inner.config.clone(), + self.inner.global_config.current().measurements.as_ref(), + ); }); if state.has_event() { diff --git a/relay-server/src/services/processor/profile.rs b/relay-server/src/services/processor/profile.rs index 8b56013cabf..56476f21766 100644 --- a/relay-server/src/services/processor/profile.rs +++ b/relay-server/src/services/processor/profile.rs @@ -112,7 +112,6 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) { #[cfg(test)] mod tests { - use std::sync::Arc; use insta::assert_debug_snapshot; diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index b8ff31fb166..a484c390c2c 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -8,6 +8,9 @@ use relay_base_schema::events::EventType; use relay_config::Config; use relay_dynamic_config::{ErrorBoundary, Feature, ProjectConfig}; use relay_event_normalization::span::tag_extraction; +use relay_event_normalization::{ + normalize_measurements, DynamicMeasurementsConfig, MeasurementsConfig, +}; use relay_event_schema::processor::{process_value, ProcessingState}; use relay_event_schema::protocol::Span; use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp}; @@ -20,24 +23,23 @@ use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::{ProcessEnvelopeState, ProcessingError}; use crate::utils::ItemAction; -pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) { +pub fn process( + state: &mut ProcessEnvelopeState, + config: Arc<Config>, + global_measurements_config: Option<&MeasurementsConfig>, +) { use relay_event_normalization::RemoveOtherProcessor; let span_metrics_extraction_config = match state.project_state.config.metric_extraction { ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config), _ => None, }; - - let config = NormalizeSpanConfig { - received_at: state.managed_envelope.received_at(), - transaction_range: AggregatorConfig::from( - config.aggregator_config_for(MetricNamespace::Transactions), - ) - .timestamp_range(), - max_tag_value_size: config - .aggregator_config_for(MetricNamespace::Spans) - .max_tag_value_length, - }; + let normalize_span_config = get_normalize_span_config( + config, + state.managed_envelope.received_at(), + global_measurements_config, + state.project_state.config().measurements.as_ref(), + ); state.managed_envelope.retain_items(|item| { let mut annotated_span = match item.ty() { @@ -61,16 +63,15 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) { _ => return ItemAction::Keep, }; - if let Err(e) = normalize(&mut annotated_span, config.clone()) { + if let Err(e) = normalize(&mut annotated_span, normalize_span_config.clone()) { relay_log::debug!("failed to normalize span: {}", e); return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; - let Some(span) = annotated_span.value_mut() else { - return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); - }; - if let Some(config) = span_metrics_extraction_config { + let Some(span) = annotated_span.value_mut() else { + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); + }; let metrics = extract_metrics(span, config); state.extracted_metrics.project_metrics.extend(metrics); item.set_metrics_extracted(true); @@ -220,13 +221,51 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) { /// Config needed to normalize a standalone span. #[derive(Clone, Debug)] -struct NormalizeSpanConfig { +struct NormalizeSpanConfig<'a> { /// The time at which the event was received in this Relay. pub received_at: DateTime<Utc>, /// Allowed time range for transactions. - pub transaction_range: std::ops::Range<UnixTimestamp>, + pub timestamp_range: std::ops::Range<UnixTimestamp>, /// The maximum allowed size of tag values in bytes. Longer values will be cropped. pub max_tag_value_size: usize, + /// Configuration for measurement normalization in transaction events. + /// + /// Has an optional [`relay_event_normalization::MeasurementsConfig`] from both the project and the global level. + /// If at least one is provided, then normalization will truncate custom measurements + /// and add units of known built-in measurements. + pub measurements: Option<DynamicMeasurementsConfig<'a>>, + /// The maximum length for names of custom measurements. + /// + /// Measurements with longer names are removed from the transaction event and replaced with a + /// metadata entry. + pub max_name_and_unit_len: Option<usize>, +} + +fn get_normalize_span_config<'a>( + config: Arc<Config>, + received_at: DateTime<Utc>, + global_measurements_config: Option<&'a MeasurementsConfig>, + project_measurements_config: Option<&'a MeasurementsConfig>, +) -> NormalizeSpanConfig<'a> { + let aggregator_config = + AggregatorConfig::from(config.aggregator_config_for(MetricNamespace::Spans)); + + NormalizeSpanConfig { + received_at, + timestamp_range: aggregator_config.timestamp_range(), + max_tag_value_size: config + .aggregator_config_for(MetricNamespace::Spans) + .max_tag_value_length, + measurements: Some(DynamicMeasurementsConfig::new( + project_measurements_config, + global_measurements_config, + )), + max_name_and_unit_len: Some( + aggregator_config + .max_name_length + .saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD), + ), + } } /// Normalizes a standalone span. @@ -240,8 +279,10 @@ fn normalize( let NormalizeSpanConfig { received_at, - transaction_range, + timestamp_range, max_tag_value_size, + measurements, + max_name_and_unit_len, } = config; // This follows the steps of `NormalizeProcessor::process_event`. @@ -262,7 +303,7 @@ fn normalize( process_value( annotated_span, - &mut TransactionsProcessor::new(Default::default(), Some(transaction_range)), + &mut TransactionsProcessor::new(Default::default(), Some(timestamp_range)), ProcessingState::root(), )?; @@ -270,6 +311,17 @@ fn normalize( return Err(ProcessingError::NoEventPayload); }; + if let Annotated(Some(ref mut measurement_values), ref mut meta) = span.measurements { + normalize_measurements( + measurement_values, + meta, + measurements, + max_name_and_unit_len, + span.start_timestamp.0, + span.timestamp.0, + ); + } + let is_segment = span.parent_span_id.is_empty(); span.is_segment = Annotated::new(is_segment); span.received = Annotated::new(received_at.into()); diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 2bec27d68fd..cf77f1c201e 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -1,6 +1,7 @@ //! This module contains the service that forwards events and attachments to the Sentry store. //! The service uses kafka topics to forward data to Sentry +use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; @@ -865,6 +866,33 @@ impl StoreService { } }; + if let Some(measurements) = &mut span.measurements { + measurements.retain(|_, v| { + v.as_ref() + .and_then(|v| v.value) + .map_or(false, f64::is_finite) + }); + } + + if let Some(metrics_summary) = &mut span.metrics_summary { + metrics_summary.retain(|_, mut v| { + if let Some(v) = &mut v { + v.retain(|v| { + if let Some(v) = v { + return v.min.is_some() + || v.max.is_some() + || v.sum.is_some() + || v.count.is_some(); + } + false + }); + !v.is_empty() + } else { + false + } + }); + } + span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32; span.event_id = event_id; span.project_id = scoping.project_id.value(); @@ -1170,6 +1198,28 @@ struct CheckInKafkaMessage { retention_days: u16, } +#[derive(Debug, Deserialize, Serialize)] +struct SpanMeasurement { + #[serde(default, skip_serializing_if = "Option::is_none")] + value: Option<f64>, +} + +#[derive(Debug, Deserialize, Serialize)] +struct SpanMetricsSummary { + #[serde(default, skip_serializing_if = "Option::is_none")] + count: Option<u64>, + #[serde(default, skip_serializing_if = "Option::is_none")] + max: Option<f64>, + #[serde(default, skip_serializing_if = "Option::is_none")] + min: Option<f64>, + #[serde(default, skip_serializing_if = "Option::is_none")] + sum: Option<f64>, + #[serde(default, skip_serializing_if = "Option::is_none")] + tags: Option<BTreeMap<String, String>>, +} + +type SpanMetricsSummaries = Vec<Option<SpanMetricsSummary>>; + #[derive(Debug, Deserialize, Serialize)] struct SpanKafkaMessage<'a> { #[serde(skip_serializing)] @@ -1188,14 +1238,15 @@ struct SpanKafkaMessage<'a> { exclusive_time_ms: f64, is_segment: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] - measurements: Option<&'a RawValue>, + #[serde(borrow, default, skip_serializing_if = "Option::is_none")] + measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement>>>, #[serde( + borrow, default, rename = "_metrics_summary", skip_serializing_if = "Option::is_none" )] - metrics_summary: Option<&'a RawValue>, + metrics_summary: Option<BTreeMap<Cow<'a, str>, Option<SpanMetricsSummaries>>>, #[serde(default, skip_serializing_if = "Option::is_none")] parent_span_id: Option<&'a str>, #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index c49a563ef42..f84a988f83f 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -1812,3 +1812,72 @@ def test_span_extraction_with_ddm( } spans_consumer.assert_empty() + + +def test_span_extraction_with_ddm_missing_values( + mini_sentry, + relay_with_processing, + spans_consumer, +): + spans_consumer = spans_consumer() + + relay = relay_with_processing() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["spanAttributes"] = ["exclusive-time"] + project_config["config"]["features"] = [ + "organizations:custom-metrics", + ] + + event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) + metrics_summary = { + "c:spans/some_metric@none": [ + { + "min": None, + "max": 2.0, + "count": 4, + "tags": { + "environment": "test", + }, + }, + ], + } + event["_metrics_summary"] = metrics_summary + event["measurements"] = { + "somemeasurement": None, + "anothermeasurement": { + "value": None, + "unit": "byte", + }, + } + + relay.send_event(project_id, event) + + start_timestamp = datetime.fromisoformat(event["start_timestamp"]) + end_timestamp = datetime.fromisoformat(event["timestamp"]) + duration_ms = int((end_timestamp - start_timestamp).total_seconds() * 1e3) + + metrics_summary["c:spans/some_metric@none"][0].pop("min", None) + + transaction_span = spans_consumer.get_span() + del transaction_span["received"] + assert transaction_span == { + "duration_ms": duration_ms, + "event_id": "cbf6960622e14a45abc1f03b2055b186", + "project_id": 42, + "retention_days": 90, + "description": "hi", + "exclusive_time_ms": 2000.0, + "is_segment": True, + "segment_id": "968cff94913ebb07", + "sentry_tags": {"transaction": "hi", "transaction.op": "hi"}, + "span_id": "968cff94913ebb07", + "start_timestamp_ms": int( + start_timestamp.replace(tzinfo=timezone.utc).timestamp() * 1e3 + ), + "trace_id": "a0fa8803753e40fd8124b21eeb2986b5", + "_metrics_summary": metrics_summary, + "measurements": {}, + } + + spans_consumer.assert_empty()