Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Normalize and copy measurements to segments #2953

Merged
merged 20 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Proactively move on-disk spool to memory. ([#2949](https://github.com/getsentry/relay/pull/2949))
- Default missing `Event.platform` and `Event.level` fields during light normalization. ([#2961](https://github.com/getsentry/relay/pull/2961))
- Copy event measurements to span & normalize span measurements. ([#2953](https://github.com/getsentry/relay/pull/2953))
- Add possiblity to block metrics with glob-patterns. ([#2954](https://github.com/getsentry/relay/pull/2954))

**Bug Fixes**:
Expand Down
53 changes: 36 additions & 17 deletions relay-event-normalization/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use relay_event_schema::processor::{
use relay_event_schema::protocol::{
AsPair, Context, ContextInner, Contexts, DeviceClass, Event, EventType, Exception, Headers,
IpAddr, Level, LogEntry, Measurement, Measurements, NelContext, Request, SpanAttribute,
SpanStatus, Tags, User,
SpanStatus, Tags, Timestamp, User,
};
use relay_protocol::{Annotated, Empty, Error, ErrorKind, Meta, Object, Value};
use smallvec::SmallVec;
Expand Down Expand Up @@ -262,7 +262,7 @@ fn normalize(event: &mut Event, meta: &mut Meta, config: &NormalizationConfig) -
normalize_stacktraces(event);
normalize_exceptions(event); // Browser extension filters look at the stacktrace
normalize_user_agent(event, config.normalize_user_agent); // Legacy browsers filter
normalize_measurements(
normalize_event_measurements(
event,
config.measurements.clone(),
config.max_name_and_unit_len,
Expand Down Expand Up @@ -693,8 +693,8 @@ fn normalize_user_agent(_event: &mut Event, normalize_user_agent: Option<bool>)
}
}

/// Ensure measurements interface is only present for transaction events.
fn normalize_measurements(
/// Ensures measurements interface is only present for transaction events.
fn normalize_event_measurements(
event: &mut Event,
measurements_config: Option<DynamicMeasurementsConfig>,
max_mri_len: Option<usize>,
Expand All @@ -703,19 +703,38 @@ fn normalize_measurements(
// Only transaction events may have a measurements interface
event.measurements = Annotated::empty();
} else if let Annotated(Some(ref mut measurements), ref mut meta) = event.measurements {
normalize_mobile_measurements(measurements);
normalize_units(measurements);
if let Some(measurements_config) = measurements_config {
remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len);
}

let duration_millis = match (event.start_timestamp.0, event.timestamp.0) {
(Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start),
_ => 0.0,
};
normalize_measurements(
measurements,
meta,
measurements_config,
max_mri_len,
event.start_timestamp.0,
event.timestamp.0,
);
}
}

compute_measurements(duration_millis, measurements);
/// Ensure only valid measurements are ingested.
phacops marked this conversation as resolved.
Show resolved Hide resolved
pub fn normalize_measurements(
measurements: &mut Measurements,
meta: &mut Meta,
measurements_config: Option<DynamicMeasurementsConfig>,
max_mri_len: Option<usize>,
start_timestamp: Option<Timestamp>,
end_timestamp: Option<Timestamp>,
) {
normalize_mobile_measurements(measurements);
normalize_units(measurements);
if let Some(measurements_config) = measurements_config {
remove_invalid_measurements(measurements, meta, measurements_config, max_mri_len);
}

let duration_millis = match (start_timestamp, end_timestamp) {
(Some(start), Some(end)) => relay_common::time::chrono_to_positive_millis(end - start),
_ => 0.0,
};

compute_measurements(duration_millis, measurements);
}

/// Computes performance score measurements.
Expand Down Expand Up @@ -1188,7 +1207,7 @@ mod tests {

let mut event = Annotated::<Event>::from_json(json).unwrap().0.unwrap();

normalize_measurements(&mut event, None, None);
normalize_event_measurements(&mut event, None, None);

insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#"
{
Expand Down Expand Up @@ -1260,7 +1279,7 @@ mod tests {
let dynamic_measurement_config =
DynamicMeasurementsConfig::new(Some(&project_measurement_config), None);

normalize_measurements(&mut event, Some(dynamic_measurement_config), None);
normalize_event_measurements(&mut event, Some(dynamic_measurement_config), None);

// Only two custom measurements are retained, in alphabetic order (1 and 2)
insta::assert_ron_snapshot!(SerializableAnnotated(&Annotated::new(event)), {}, @r#"
Expand Down
2 changes: 1 addition & 1 deletion relay-event-normalization/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod transactions;
mod trimming;

pub mod replay;
pub use event::{normalize_event, NormalizationConfig};
pub use event::{normalize_event, normalize_measurements, NormalizationConfig};
pub use normalize::breakdowns::*;
pub use normalize::*;
pub use remove_other::RemoveOtherProcessor;
Expand Down
9 changes: 5 additions & 4 deletions relay-event-schema/src/protocol/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,24 @@ pub struct Span {
impl From<&Event> for Span {
fn from(event: &Event) -> Self {
let mut span = Self {
_metrics_summary: event._metrics_summary.clone(),
description: event.transaction.clone(),
is_segment: Some(true).into(),
received: event.received.clone(),
start_timestamp: event.start_timestamp.clone(),
timestamp: event.timestamp.clone(),
_metrics_summary: event._metrics_summary.clone(),
measurements: event.measurements.clone(),
..Default::default()
};

if let Some(trace_context) = event.context::<TraceContext>().cloned() {
span.exclusive_time = trace_context.exclusive_time;
span.op = trace_context.op;
span.span_id = trace_context.span_id;
span.parent_span_id = trace_context.parent_span_id;
span.trace_id = trace_context.trace_id;
span.segment_id = span.span_id.clone(); // a transaction is a segment
span.segment_id = trace_context.span_id.clone(); // a transaction is a segment
span.span_id = trace_context.span_id;
span.status = trace_context.status;
span.trace_id = trace_context.trace_id;
}

if let Some(profile_context) = event.context::<ProfileContext>() {
Expand Down
12 changes: 10 additions & 2 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,11 @@ impl EnvelopeProcessorService {
span::filter(state);
if_processing!(self.inner.config, {
self.enforce_quotas(state)?;
span::process(state, self.inner.config.clone());
span::process(
state,
self.inner.config.clone(),
self.inner.global_config.current().measurements.as_ref(),
);
});
Ok(())
}
Expand Down Expand Up @@ -1318,7 +1322,11 @@ impl EnvelopeProcessorService {
self.enforce_quotas(state)?;
profile::process(state, &self.inner.config);
self.process_check_ins(state);
span::process(state, self.inner.config.clone());
span::process(
state,
self.inner.config.clone(),
self.inner.global_config.current().measurements.as_ref(),
);
});

if state.has_event() {
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub fn process(state: &mut ProcessEnvelopeState, config: &Config) {

#[cfg(test)]
mod tests {

use std::sync::Arc;

use insta::assert_debug_snapshot;
Expand Down
94 changes: 73 additions & 21 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use relay_base_schema::events::EventType;
use relay_config::Config;
use relay_dynamic_config::{ErrorBoundary, Feature, ProjectConfig};
use relay_event_normalization::span::tag_extraction;
use relay_event_normalization::{
normalize_measurements, DynamicMeasurementsConfig, MeasurementsConfig,
};
use relay_event_schema::processor::{process_value, ProcessingState};
use relay_event_schema::protocol::Span;
use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp};
Expand All @@ -20,24 +23,23 @@ use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessEnvelopeState, ProcessingError};
use crate::utils::ItemAction;

pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
pub fn process(
state: &mut ProcessEnvelopeState,
config: Arc<Config>,
global_measurements_config: Option<&MeasurementsConfig>,
) {
use relay_event_normalization::RemoveOtherProcessor;

let span_metrics_extraction_config = match state.project_state.config.metric_extraction {
ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config),
_ => None,
};

let config = NormalizeSpanConfig {
received_at: state.managed_envelope.received_at(),
transaction_range: AggregatorConfig::from(
config.aggregator_config_for(MetricNamespace::Transactions),
)
.timestamp_range(),
max_tag_value_size: config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
};
let normalize_span_config = get_normalize_span_config(
config,
state.managed_envelope.received_at(),
global_measurements_config,
state.project_state.config().measurements.as_ref(),
);

state.managed_envelope.retain_items(|item| {
let mut annotated_span = match item.ty() {
Expand All @@ -61,16 +63,15 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
_ => return ItemAction::Keep,
};

if let Err(e) = normalize(&mut annotated_span, config.clone()) {
if let Err(e) = normalize(&mut annotated_span, normalize_span_config.clone()) {
relay_log::debug!("failed to normalize span: {}", e);
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

let Some(span) = annotated_span.value_mut() else {
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

if let Some(config) = span_metrics_extraction_config {
let Some(span) = annotated_span.value_mut() else {
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};
let metrics = extract_metrics(span, config);
state.extracted_metrics.project_metrics.extend(metrics);
item.set_metrics_extracted(true);
Expand Down Expand Up @@ -220,13 +221,51 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) {

/// Config needed to normalize a standalone span.
#[derive(Clone, Debug)]
struct NormalizeSpanConfig {
struct NormalizeSpanConfig<'a> {
/// The time at which the event was received in this Relay.
pub received_at: DateTime<Utc>,
/// Allowed time range for transactions.
pub transaction_range: std::ops::Range<UnixTimestamp>,
pub timestamp_range: std::ops::Range<UnixTimestamp>,
/// The maximum allowed size of tag values in bytes. Longer values will be cropped.
pub max_tag_value_size: usize,
/// Configuration for measurement normalization in transaction events.
///
/// Has an optional [`relay_event_normalization::MeasurementsConfig`] from both the project and the global level.
/// If at least one is provided, then normalization will truncate custom measurements
/// and add units of known built-in measurements.
pub measurements: Option<DynamicMeasurementsConfig<'a>>,
/// The maximum length for names of custom measurements.
///
/// Measurements with longer names are removed from the transaction event and replaced with a
/// metadata entry.
pub max_name_and_unit_len: Option<usize>,
}

fn get_normalize_span_config<'a>(
config: Arc<Config>,
received_at: DateTime<Utc>,
global_measurements_config: Option<&'a MeasurementsConfig>,
project_measurements_config: Option<&'a MeasurementsConfig>,
) -> NormalizeSpanConfig<'a> {
let aggregator_config =
AggregatorConfig::from(config.aggregator_config_for(MetricNamespace::Spans));

NormalizeSpanConfig {
received_at,
timestamp_range: aggregator_config.timestamp_range(),
max_tag_value_size: config
.aggregator_config_for(MetricNamespace::Spans)
.max_tag_value_length,
measurements: Some(DynamicMeasurementsConfig::new(
project_measurements_config,
global_measurements_config,
)),
max_name_and_unit_len: Some(
aggregator_config
.max_name_length
.saturating_sub(MeasurementsConfig::MEASUREMENT_MRI_OVERHEAD),
),
}
}

/// Normalizes a standalone span.
Expand All @@ -240,8 +279,10 @@ fn normalize(

let NormalizeSpanConfig {
received_at,
transaction_range,
timestamp_range,
max_tag_value_size,
measurements,
max_name_and_unit_len,
} = config;

// This follows the steps of `NormalizeProcessor::process_event`.
Expand All @@ -262,14 +303,25 @@ fn normalize(

process_value(
annotated_span,
&mut TransactionsProcessor::new(Default::default(), Some(transaction_range)),
&mut TransactionsProcessor::new(Default::default(), Some(timestamp_range)),
ProcessingState::root(),
)?;

let Some(span) = annotated_span.value_mut() else {
return Err(ProcessingError::NoEventPayload);
};

if let Annotated(Some(ref mut measurement_values), ref mut meta) = span.measurements {
normalize_measurements(
measurement_values,
meta,
measurements,
max_name_and_unit_len,
span.start_timestamp.0,
span.timestamp.0,
);
}

let is_segment = span.parent_span_id.is_empty();
span.is_segment = Annotated::new(is_segment);
span.received = Annotated::new(received_at.into());
Expand Down
Loading