Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Normalize and copy measurements to segments #2953

Merged
merged 20 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- Validate error_id and trace_id vectors in replay deserializer. ([#2931](https://github.com/getsentry/relay/pull/2931))
- Add a data category for indexed spans. ([#2937](https://github.com/getsentry/relay/pull/2937))
- Add nested Android app start span ops to span ingestion ([#2927](https://github.com/getsentry/relay/pull/2927))
- Normalize and copy measurements on spans when appropriate. ([#2953](https://github.com/getsentry/relay/pull/2953))
phacops marked this conversation as resolved.
Show resolved Hide resolved
- Create rate limited outcomes for cardinality limited metrics ([#2947](https://github.com/getsentry/relay/pull/2947))

## 23.12.1
Expand Down
51 changes: 35 additions & 16 deletions relay-event-normalization/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use relay_event_schema::processor::{
use relay_event_schema::protocol::{
AsPair, Context, ContextInner, Contexts, DeviceClass, Event, EventType, Exception, Headers,
IpAddr, Level, LogEntry, Measurement, Measurements, NelContext, Request, SpanAttribute,
SpanStatus, Tags, User,
SpanStatus, Tags, Timestamp, User,
};
use relay_protocol::{Annotated, Empty, Error, ErrorKind, Meta, Object, Value};
use smallvec::SmallVec;
Expand Down Expand Up @@ -262,7 +262,7 @@ fn normalize(event: &mut Event, meta: &mut Meta, config: &NormalizationConfig) -
normalize_stacktraces(event);
normalize_exceptions(event); // Browser extension filters look at the stacktrace
normalize_user_agent(event, config.normalize_user_agent); // Legacy browsers filter
normalize_measurements(
normalize_measurements_from_event(
event,
config.measurements.clone(),
config.max_name_and_unit_len,
Expand Down Expand Up @@ -694,7 +694,7 @@ fn normalize_user_agent(_event: &mut Event, normalize_user_agent: Option<bool>)
}

/// Ensure measurements interface is only present for transaction events.
fn normalize_measurements(
fn normalize_measurements_from_event(
phacops marked this conversation as resolved.
Show resolved Hide resolved
event: &mut Event,
measurements_config: Option<DynamicMeasurementsConfig>,
max_mri_len: Option<usize>,
Expand All @@ -703,19 +703,38 @@ fn normalize_measurements(
// Only transaction events may have a measurements interface
event.measurements = Annotated::empty();
} else if let Annotated(Some(ref mut measurements), ref mut meta) = event.measurements {
normalize_mobile_measurements(measurements);
normalize_units(measurements);
if let Some(measurements_config) = measurements_config {
remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len);
}

let duration_millis = match (event.start_timestamp.0, event.timestamp.0) {
(Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start),
_ => 0.0,
};
normalize_measurements(
measurements,
meta,
measurements_config,
max_mri_len,
event.start_timestamp.0,
event.timestamp.0,
);
}
}

compute_measurements(duration_millis, measurements);
/// Ensure only valid measurements are ingested.
phacops marked this conversation as resolved.
Show resolved Hide resolved
pub fn normalize_measurements(
measurements: &mut Measurements,
meta: &mut Meta,
measurements_config: Option<DynamicMeasurementsConfig>,
max_mri_len: Option<usize>,
start_timestamp: Option<Timestamp>,
end_timestamp: Option<Timestamp>,
) {
normalize_mobile_measurements(measurements);
normalize_units(measurements);
if let Some(measurements_config) = measurements_config {
remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len);
}

let duration_millis = match (start_timestamp, end_timestamp) {
(Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start),
_ => 0.0,
};

compute_measurements(duration_millis, measurements);
}

/// Computes performance score measurements.
Expand Down Expand Up @@ -1184,7 +1203,7 @@ mod tests {

let mut event = Annotated::<Event>::from_json(json).unwrap().0.unwrap();

normalize_measurements(&mut event, None, None);
normalize_measurements_from_event(&mut event, None, None);

insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#"
{
Expand Down Expand Up @@ -1256,7 +1275,7 @@ mod tests {
let dynamic_measurement_config =
DynamicMeasurementsConfig::new(Some(&project_measurement_config), None);

normalize_measurements(&mut event, Some(dynamic_measurement_config), None);
normalize_measurements_from_event(&mut event, Some(dynamic_measurement_config), None);

// Only two custom measurements are retained, in alphabetic order (1 and 2)
insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#"
Expand Down
2 changes: 1 addition & 1 deletion relay-event-normalization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod transactions;
mod trimming;

pub mod replay;
pub use event::{normalize_event, NormalizationConfig};
pub use event::{normalize_event, normalize_measurements, NormalizationConfig};
pub use normalize::breakdowns::*;
pub use normalize::*;
pub use remove_other::RemoveOtherProcessor;
Expand Down
9 changes: 5 additions & 4 deletions relay-event-schema/src/protocol/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,24 @@ pub struct Span {
impl From<&Event> for Span {
fn from(event: &Event) -> Self {
let mut span = Self {
_metrics_summary: event._metrics_summary.clone(),
description: event.transaction.clone(),
is_segment: Some(true).into(),
received: event.received.clone(),
start_timestamp: event.start_timestamp.clone(),
timestamp: event.timestamp.clone(),
_metrics_summary: event._metrics_summary.clone(),
measurements: event.measurements.clone(),
..Default::default()
};

if let Some(trace_context) = event.context::<TraceContext>().cloned() {
span.exclusive_time = trace_context.exclusive_time;
span.op = trace_context.op;
span.span_id = trace_context.span_id;
span.parent_span_id = trace_context.parent_span_id;
span.trace_id = trace_context.trace_id;
span.segment_id = span.span_id.clone(); // a transaction is a segment
span.segment_id = trace_context.span_id.clone(); // a transaction is a segment
span.span_id = trace_context.span_id;
span.status = trace_context.status;
span.trace_id = trace_context.trace_id;
}

if let Some(profile_context) = event.context::<ProfileContext>() {
Expand Down
12 changes: 10 additions & 2 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,11 @@ impl EnvelopeProcessorService {
span::filter(state);
if_processing!(self.inner.config, {
self.enforce_quotas(state)?;
span::process(state, self.inner.config.clone());
span::process(
state,
self.inner.config.clone(),
self.inner.global_config.current().measurements.clone(),
);
});
Ok(())
}
Expand Down Expand Up @@ -1318,7 +1322,11 @@ impl EnvelopeProcessorService {
self.enforce_quotas(state)?;
profile::process(state, &self.inner.config);
self.process_check_ins(state);
span::process(state, self.inner.config.clone());
span::process(
state,
self.inner.config.clone(),
self.inner.global_config.current().measurements.clone(),
);
});

if state.has_event() {
Expand Down
91 changes: 77 additions & 14 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use relay_base_schema::events::EventType;
use relay_config::Config;
use relay_dynamic_config::{ErrorBoundary, Feature, ProjectConfig};
use relay_event_normalization::span::tag_extraction;
use relay_event_normalization::{
normalize_measurements, DynamicMeasurementsConfig, MeasurementsConfig,
};
use relay_event_schema::processor::{process_value, ProcessingState};
use relay_event_schema::protocol::Span;
use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp};
Expand All @@ -20,24 +23,50 @@ use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessEnvelopeState, ProcessingError};
use crate::utils::ItemAction;

pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
fn get_normalize_span_config<'a>(
phacops marked this conversation as resolved.
Show resolved Hide resolved
config: Arc<Config>,
received_at: DateTime<Utc>,
global_measurements_config: Option<&'a MeasurementsConfig>,
project_measurements_config: Option<&'a MeasurementsConfig>,
) -> NormalizeSpanConfig<'a> {
let transaction_aggregator_config =
AggregatorConfig::from(config.aggregator_config_for(MetricNamespace::Transactions));
phacops marked this conversation as resolved.
Show resolved Hide resolved

NormalizeSpanConfig {
received_at,
transaction_range: transaction_aggregator_config.timestamp_range(),
max_tag_value_size: config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
measurements: Some(DynamicMeasurementsConfig::new(
project_measurements_config,
global_measurements_config,
)),
max_name_and_unit_len: Some(
transaction_aggregator_config
.max_name_length
.saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
),
}
}

pub fn process(
state: &mut ProcessEnvelopeState,
config: Arc<Config>,
global_measurements_config: Option<MeasurementsConfig>,
phacops marked this conversation as resolved.
Show resolved Hide resolved
) {
use relay_event_normalization::RemoveOtherProcessor;

let span_metrics_extraction_config = match state.project_state.config.metric_extraction {
ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config),
_ => None,
};

let config = NormalizeSpanConfig {
received_at: state.managed_envelope.received_at(),
transaction_range: AggregatorConfig::from(
config.aggregator_config_for(MetricNamespace::Transactions),
)
.timestamp_range(),
max_tag_value_size: config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
};
let normalize_span_config = get_normalize_span_config(
config,
state.managed_envelope.received_at(),
global_measurements_config.as_ref(),
state.project_state.config().measurements.as_ref(),
);

state.managed_envelope.retain_items(|item| {
let mut annotated_span = match item.ty() {
Expand All @@ -61,7 +90,7 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
_ => return ItemAction::Keep,
};

if let Err(e) = normalize(&mut annotated_span, config.clone()) {
if let Err(e) = normalize(&mut annotated_span, normalize_span_config.clone()) {
relay_log::debug!("failed to normalize span: {}", e);
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};
Expand Down Expand Up @@ -220,13 +249,24 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) {

/// Config needed to normalize a standalone span.
#[derive(Clone, Debug)]
struct NormalizeSpanConfig {
struct NormalizeSpanConfig<'a> {
/// The time at which the event was received in this Relay.
pub received_at: DateTime<Utc>,
/// Allowed time range for transactions.
pub transaction_range: std::ops::Range<UnixTimestamp>,
/// The maximum allowed size of tag values in bytes. Longer values will be cropped.
pub max_tag_value_size: usize,
/// Configuration for measurement normalization in transaction events.
///
/// Has an optional [`relay_event_normalization::MeasurementsConfig`] from both the project and the global level.
/// If at least one is provided, then normalization will truncate custom measurements
/// and add units of known built-in measurements.
pub measurements: Option<DynamicMeasurementsConfig<'a>>,
/// The maximum length for names of custom measurements.
///
/// Measurements with longer names are removed from the transaction event and replaced with a
/// metadata entry.
pub max_name_and_unit_len: Option<usize>,
}

/// Normalizes a standalone span.
Expand All @@ -242,6 +282,8 @@ fn normalize(
received_at,
transaction_range,
max_tag_value_size,
measurements,
max_name_and_unit_len,
} = config;

// This follows the steps of `NormalizeProcessor::process_event`.
Expand Down Expand Up @@ -270,6 +312,17 @@ fn normalize(
return Err(ProcessingError::NoEventPayload);
};

if let Annotated(Some(ref mut measurement_values), ref mut meta) = span.measurements {
normalize_measurements(
measurement_values,
meta,
measurements,
max_name_and_unit_len,
span.start_timestamp.0,
span.timestamp.0,
);
}

let is_segment = span.parent_span_id.is_empty();
span.is_segment = Annotated::new(is_segment);
span.received = Annotated::new(received_at.into());
Expand Down Expand Up @@ -363,6 +416,8 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error>
ref mut timestamp,
ref mut span_id,
ref mut trace_id,
ref mut measurements,
ref mut _metrics_summary,
..
} = inner;

Expand Down Expand Up @@ -413,6 +468,14 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error>
if let Some(tags) = tags.value_mut() {
tags.retain(|_, value| !value.value().is_empty())
}
if let Some(measurements) = measurements.value_mut() {
measurements.retain(|_, value| !value.value().is_empty())
}
if let Some(metrics_summary) = _metrics_summary.value_mut() {
metrics_summary
.0
.retain(|_, value| !value.value().is_empty())
}

Ok(span)
}