diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 3dba72ef5a..f9d4d44903 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -481,26 +481,26 @@ fn is_false(val: &bool) -> bool { pub struct ItemHeaders { /// The type of the item. #[serde(rename = "type")] - ty: ItemType, + pub ty: ItemType, /// Content length of the item. /// /// Can be omitted if the item does not contain new lines. In this case, the item payload is /// parsed until the first newline is encountered. #[serde(default, skip_serializing_if = "Option::is_none")] - length: Option, + pub length: Option, /// If this is an attachment item, this may contain the attachment type. #[serde(default, skip_serializing_if = "Option::is_none")] - attachment_type: Option, + pub attachment_type: Option, /// Content type of the payload. #[serde(default, skip_serializing_if = "Option::is_none")] - content_type: Option, + pub content_type: Option, /// If this is an attachment item, this may contain the original file name. #[serde(default, skip_serializing_if = "Option::is_none")] - filename: Option, + pub filename: Option, /// The routing_hint may be used to specify how the envelpope should be routed in when /// published to kafka. @@ -508,7 +508,7 @@ pub struct ItemHeaders { /// XXX(epurkhiser): This is currently ONLY used for [`ItemType::CheckIn`]'s when publishing /// the envelope into kafka. #[serde(default, skip_serializing_if = "Option::is_none")] - routing_hint: Option, + pub routing_hint: Option, /// Indicates that this item is being rate limited. /// @@ -519,12 +519,12 @@ pub struct ItemHeaders { /// /// NOTE: This is internal-only and not exposed into the Envelope. #[serde(default, skip)] - rate_limited: bool, + pub rate_limited: bool, /// Indicates that this item should be combined into one payload with other replay item. /// NOTE: This is internal-only and not exposed into the Envelope. #[serde(default, skip)] - replay_combined_payload: bool, + pub replay_combined_payload: bool, /// Contains the amount of events this item was generated and aggregated from. /// @@ -536,14 +536,14 @@ pub struct ItemHeaders { /// /// NOTE: This is internal-only and not exposed into the Envelope. #[serde(default, skip)] - source_quantities: Option, + pub source_quantities: Option, /// A list of cumulative sample rates applied to this event. /// /// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The /// effective sample rate is multiplied. #[serde(default, skip_serializing_if = "Option::is_none")] - sample_rates: Option, + pub sample_rates: Option, /// Flag indicating if metrics have already been extracted from the item. /// @@ -552,11 +552,11 @@ pub struct ItemHeaders { /// the first Relay) MUST set this flat to true so that upstream Relays do /// not extract the metric again causing double counting of the metric. #[serde(default, skip_serializing_if = "is_false")] - metrics_extracted: bool, + pub metrics_extracted: bool, /// Whether or not a transaction has been extracted from a segment span. #[serde(default, skip_serializing_if = "is_false")] - transaction_extracted: bool, + pub transaction_extracted: bool, /// Whether or not spans and span metrics have been extracted from a transaction. /// @@ -566,7 +566,7 @@ pub struct ItemHeaders { /// NOTE: This header is also set to `true` for transactions that are themselves extracted /// from spans (the opposite direction), to prevent going in circles. #[serde(default, skip_serializing_if = "is_false")] - spans_extracted: bool, + pub spans_extracted: bool, /// `false` if the sampling decision is "drop". /// @@ -574,7 +574,15 @@ pub struct ItemHeaders { /// For profiles with the feature enabled, however, we keep all profile items and mark the ones /// for which the transaction was dropped as `sampled: false`. #[serde(default = "default_true", skip_serializing_if = "is_true")] - sampled: bool, + pub sampled: bool, + + /// The release, used by security reports. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sentry_release: Option, + + /// The environment, used by security reports. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sentry_environment: Option, /// Other attributes for forward compatibility. #[serde(flatten)] @@ -621,8 +629,8 @@ impl AddAssign for SourceQuantities { #[derive(Clone, Debug)] pub struct Item { - headers: ItemHeaders, - payload: Bytes, + pub headers: ItemHeaders, + pub payload: Bytes, } impl Item { @@ -645,6 +653,8 @@ impl Item { transaction_extracted: false, spans_extracted: false, sampled: true, + sentry_release: None, + sentry_environment: None, }, payload: Bytes::new(), } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 5e65922a13..b5630cdf14 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -36,7 +36,7 @@ use relay_metrics::{ }; use relay_pii::PiiConfigError; use relay_profiling::ProfileId; -use relay_protocol::{Annotated, Value}; +use relay_protocol::Annotated; use relay_quotas::{DataCategory, Scoping}; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator}; @@ -65,7 +65,7 @@ use { }; use crate::envelope::{ - self, ContentType, Envelope, EnvelopeError, Item, ItemType, SourceQuantities, + self, ContentType, Envelope, EnvelopeError, Item, ItemHeaders, ItemType, SourceQuantities, }; use crate::extractors::{PartialDsn, RequestMeta}; use crate::http; @@ -571,13 +571,10 @@ struct ProcessEnvelopeState<'a, Group> { /// extracted. event: Annotated, - /// Track whether transaction metrics were already extracted. - event_metrics_extracted: bool, - - /// Track whether spans and span metrics were already extracted. + /// Item headers of the extracted event. /// - /// Only applies to envelopes with a transaction item. - spans_extracted: bool, + /// `None` if the envelope does not contain an event item. + event_headers: Option, /// Partial metrics of the Event during construction. /// @@ -585,12 +582,6 @@ struct ProcessEnvelopeState<'a, Group> { /// persisted into the Event. All modifications afterwards will have no effect. metrics: Metrics, - /// A list of cumulative sample rates applied to this event. - /// - /// This element is obtained from the event or transaction item and re-serialized into the - /// resulting item. - sample_rates: Option, - /// Metrics extracted from items in the envelope. /// /// Relay can extract metrics for sessions and transactions, which is controlled by @@ -1071,10 +1062,8 @@ impl EnvelopeProcessorService { ProcessEnvelopeState { event: Annotated::empty(), - event_metrics_extracted: false, - spans_extracted: false, + event_headers: None, metrics: Metrics::default(), - sample_rates: None, extracted_metrics: Default::default(), project_state, sampling_project_state, @@ -1115,7 +1104,13 @@ impl EnvelopeProcessorService { // Tell the envelope limiter about the event, since it has been removed from the Envelope at // this stage in processing. if let Some(category) = event_category { - envelope_limiter.assume_event(category, state.event_metrics_extracted); + envelope_limiter.assume_event( + category, + state + .event_headers + .as_ref() + .map_or(false, |h| h.metrics_extracted), + ); } let scoping = state.managed_envelope.scoping(); @@ -1163,15 +1158,15 @@ impl EnvelopeProcessorService { _ => None, }; - if let Some(event) = state.event.value() { - if state.event_metrics_extracted { + if let (Some(event), Some(headers)) = (state.event.value(), &mut state.event_headers) { + if headers.metrics_extracted { return Ok(()); } if let Some(config) = config { let metrics = crate::metrics_extraction::event::extract_metrics( event, - state.spans_extracted, + headers.spans_extracted, config, self.inner .config @@ -1183,7 +1178,7 @@ impl EnvelopeProcessorService { .options .span_extraction_sample_rate, ); - state.event_metrics_extracted |= !metrics.is_empty(); + headers.metrics_extracted |= !metrics.is_empty(); state.extracted_metrics.project_metrics.extend(metrics); } @@ -1208,11 +1203,11 @@ impl EnvelopeProcessorService { }; state.extracted_metrics.extend(extractor.extract(event)?); - state.event_metrics_extracted |= true; + headers.metrics_extracted |= true; } } - if state.event_metrics_extracted { + if headers.metrics_extracted { state.managed_envelope.set_event_metrics_extracted(); } } diff --git a/relay-server/src/services/processor/dynamic_sampling.rs b/relay-server/src/services/processor/dynamic_sampling.rs index a59d0d0550..652aebf562 100644 --- a/relay-server/src/services/processor/dynamic_sampling.rs +++ b/relay-server/src/services/processor/dynamic_sampling.rs @@ -479,8 +479,8 @@ mod tests { ProcessEnvelopeState { event: Annotated::from(event), + event_headers: None, metrics: Default::default(), - sample_rates: None, extracted_metrics: Default::default(), project_state: Arc::new(project_state), sampling_project_state: None, @@ -495,9 +495,7 @@ mod tests { .try_into() .unwrap(), profile_id: None, - event_metrics_extracted: false, reservoir: dummy_reservoir(), - spans_extracted: false, } }; diff --git a/relay-server/src/services/processor/event.rs b/relay-server/src/services/processor/event.rs index 0d5fa060f4..2038af5c55 100644 --- a/relay-server/src/services/processor/event.rs +++ b/relay-server/src/services/processor/event.rs @@ -3,6 +3,7 @@ use std::error::Error; use std::sync::OnceLock; +use bytes::Bytes; use chrono::Duration as SignedDuration; use relay_auth::RelayVersion; use relay_base_schema::events::EventType; @@ -15,12 +16,12 @@ use relay_event_schema::protocol::{ OtelContext, RelayInfo, SecurityReportType, Values, }; use relay_pii::PiiProcessor; -use relay_protocol::{Annotated, Array, Empty, FromValue, Object, Value}; +use relay_protocol::{Annotated, Array, Empty, FromValue, Object}; use relay_quotas::DataCategory; use relay_statsd::metric; use serde_json::Value as SerdeValue; -use crate::envelope::{AttachmentType, ContentType, Item, ItemType}; +use crate::envelope::{AttachmentType, ContentType, Item, ItemHeaders, ItemType}; use crate::extractors::RequestMeta; use crate::services::outcome::Outcome; use crate::services::processor::{ @@ -67,24 +68,23 @@ pub fn extract( return Err(ProcessingError::DuplicateItem(duplicate.ty().clone())); } - let mut sample_rates = None; - let (event, event_len) = if let Some(mut item) = event_item.or(security_item) { + let (event, event_len) = if let Some(item) = event_item.or(security_item) { relay_log::trace!("processing json event"); - sample_rates = item.take_sample_rates(); + let Item { headers, payload } = item; + state.event_headers = Some(headers); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Event items can never include transactions, so retain the event type and let // inference deal with this during store normalization. - event_from_json_payload(item, None)? + event_from_json_payload(&payload, None)? }) - } else if let Some(mut item) = transaction_item { + } else if let Some(item) = transaction_item { relay_log::trace!("processing json transaction"); - sample_rates = item.take_sample_rates(); - state.event_metrics_extracted = item.metrics_extracted(); - state.spans_extracted = item.spans_extracted(); + let Item { headers, payload } = item; + state.event_headers = Some(headers); metric!(timer(RelayTimers::EventProcessingDeserialize), { // Transaction items can only contain transaction events. Force the event type to // hint to normalization that we're dealing with a transaction now. - event_from_json_payload(item, Some(EventType::Transaction))? + event_from_json_payload(&payload, Some(EventType::Transaction))? }) } else if let Some(item) = user_report_v2_item { relay_log::trace!("processing user_report_v2"); @@ -93,22 +93,26 @@ pub fn extract( if !user_report_v2_ingest { return Err(ProcessingError::NoEventPayload); } - event_from_json_payload(item, Some(EventType::UserReportV2))? - } else if let Some(mut item) = raw_security_item { + event_from_json_payload(&item.payload, Some(EventType::UserReportV2))? + } else if let Some(item) = raw_security_item { relay_log::trace!("processing security report"); - sample_rates = item.take_sample_rates(); - event_from_security_report(item, envelope.meta()).map_err(|error| { - if !matches!(error, ProcessingError::UnsupportedSecurityType) { - relay_log::error!( - error = &error as &dyn Error, - "failed to extract security report" - ); - } - error - })? + let Item { headers, payload } = item; + let event = + event_from_security_report(payload, &headers, envelope.meta()).map_err(|error| { + if !matches!(error, ProcessingError::UnsupportedSecurityType) { + relay_log::error!( + error = &error as &dyn Error, + "failed to extract security report" + ); + } + error + })?; + state.event_headers = Some(headers); + + event } else if let Some(item) = nel_item { relay_log::trace!("processing nel report"); - event_from_nel_item(item, envelope.meta()).map_err(|error| { + event_from_nel_item(&item.payload, envelope.meta()).map_err(|error| { relay_log::error!(error = &error as &dyn Error, "failed to extract NEL report"); error })? @@ -130,7 +134,6 @@ pub fn extract( }; state.event = event; - state.sample_rates = sample_rates; state.metrics.bytes_ingested_event = Annotated::new(event_len as u64); Ok(()) @@ -189,8 +192,9 @@ pub fn finalize( } let sample_rates = state - .sample_rates - .take() + .event_headers + .as_mut() + .and_then(|h| h.sample_rates.take()) .and_then(|value| Array::from_value(Annotated::new(value)).into_value()); if let Some(rates) = sample_rates { @@ -372,16 +376,8 @@ pub fn serialize( let mut event_item = Item::new(ItemType::from_event_type(event_type)); event_item.set_payload(ContentType::Json, data); - // If transaction metrics were extracted, set the corresponding item header - event_item.set_metrics_extracted(state.event_metrics_extracted); - - // TODO: The state should simply maintain & update an `ItemHeaders` object. - event_item.set_spans_extracted(state.spans_extracted); - - // If there are sample rates, write them back to the envelope. In processing mode, sample - // rates have been removed from the state and burnt into the event via `finalize_event`. - if let Some(sample_rates) = state.sample_rates.take() { - event_item.set_sample_rates(sample_rates); + if let Some(headers) = state.event_headers.take() { + event_item.headers = headers; } state.envelope_mut().add_item(event_item); @@ -450,65 +446,61 @@ fn is_duplicate(item: &Item, processing_enabled: bool) -> bool { } fn event_from_json_payload( - item: Item, + payload: &[u8], event_type: Option, ) -> Result { - let mut event = Annotated::::from_json_bytes(&item.payload()) - .map_err(ProcessingError::InvalidJson)?; + let mut event = + Annotated::::from_json_bytes(payload).map_err(ProcessingError::InvalidJson)?; if let Some(event_value) = event.value_mut() { event_value.ty.set_value(event_type); } - Ok((event, item.len())) + Ok((event, payload.len())) } fn event_from_security_report( - item: Item, + payload: Bytes, + headers: &ItemHeaders, meta: &RequestMeta, ) -> Result { - let len = item.len(); + let len = payload.len(); let mut event = Event::default(); - let bytes = item.payload(); - let data = &bytes; let Some(report_type) = - SecurityReportType::from_json(data).map_err(ProcessingError::InvalidJson)? + SecurityReportType::from_json(&payload).map_err(ProcessingError::InvalidJson)? else { - return Err(ProcessingError::InvalidSecurityType(bytes)); + return Err(ProcessingError::InvalidSecurityType(payload)); }; let (apply_result, event_type) = match report_type { - SecurityReportType::Csp => (Csp::apply_to_event(data, &mut event), EventType::Csp), + SecurityReportType::Csp => (Csp::apply_to_event(&payload, &mut event), EventType::Csp), SecurityReportType::ExpectCt => ( - ExpectCt::apply_to_event(data, &mut event), + ExpectCt::apply_to_event(&payload, &mut event), EventType::ExpectCt, ), SecurityReportType::ExpectStaple => ( - ExpectStaple::apply_to_event(data, &mut event), + ExpectStaple::apply_to_event(&payload, &mut event), EventType::ExpectStaple, ), - SecurityReportType::Hpkp => (Hpkp::apply_to_event(data, &mut event), EventType::Hpkp), + SecurityReportType::Hpkp => (Hpkp::apply_to_event(&payload, &mut event), EventType::Hpkp), SecurityReportType::Unsupported => return Err(ProcessingError::UnsupportedSecurityType), }; if let Err(json_error) = apply_result { // logged in extract_event relay_log::configure_scope(|scope| { - scope.set_extra("payload", String::from_utf8_lossy(data).into()); + scope.set_extra("payload", String::from_utf8_lossy(&payload).into()); }); return Err(ProcessingError::InvalidSecurityReport(json_error)); } - if let Some(release) = item.get_header("sentry_release").and_then(Value::as_str) { + if let Some(release) = headers.sentry_release.as_ref() { event.release = Annotated::from(LenientString(release.to_owned())); } - if let Some(env) = item - .get_header("sentry_environment") - .and_then(Value::as_str) - { + if let Some(env) = headers.sentry_environment.as_ref() { event.environment = Annotated::from(env.to_owned()); } @@ -528,16 +520,17 @@ fn event_from_security_report( Ok((Annotated::new(event), len)) } -fn event_from_nel_item(item: Item, _meta: &RequestMeta) -> Result { - let len = item.len(); +fn event_from_nel_item( + payload: &[u8], + _meta: &RequestMeta, +) -> Result { + let len = payload.len(); let mut event = Event { ty: Annotated::new(EventType::Nel), ..Default::default() }; - let data: &[u8] = &item.payload(); - // Try to get the raw network report. - let report = Annotated::from_json_bytes(data).map_err(NetworkReportError::InvalidJson); + let report = Annotated::from_json_bytes(payload).map_err(NetworkReportError::InvalidJson); match report { // If the incoming payload could be converted into the raw network error, try @@ -548,7 +541,7 @@ fn event_from_nel_item(item: Item, _meta: &RequestMeta) -> Result { // logged in extract_event relay_log::configure_scope(|scope| { - scope.set_extra("payload", String::from_utf8_lossy(data).into()); + scope.set_extra("payload", String::from_utf8_lossy(payload).into()); }); return Err(ProcessingError::InvalidNelReport(err)); } diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index 72b8e64828..2ba545187e 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -253,7 +253,11 @@ pub fn extract_from_event( return; }; - if state.spans_extracted { + let Some(event_headers) = state.event_headers.as_mut() else { + return; + }; + + if event_headers.spans_extracted { return; } @@ -304,7 +308,7 @@ pub fn extract_from_event( let mut item = Item::new(ItemType::Span); item.set_payload(ContentType::Json, span); // If metrics extraction happened for the event, it also happened for its spans: - item.set_metrics_extracted(state.event_metrics_extracted); + item.set_metrics_extracted(event_headers.metrics_extracted); relay_log::trace!("Adding span to envelope"); state.managed_envelope.envelope_mut().add_item(item); @@ -346,7 +350,7 @@ pub fn extract_from_event( add_span(transaction_span.into()); - state.spans_extracted = true; + event_headers.spans_extracted = true; } /// Removes the transaction in case the project has made the transition to spans-only. @@ -691,16 +695,14 @@ mod tests { ProcessEnvelopeState { event: Annotated::from(event), + event_headers: None, metrics: Default::default(), - sample_rates: None, extracted_metrics: Default::default(), project_state: Arc::new(project_state), sampling_project_state: None, project_id: ProjectId::new(42), managed_envelope: managed_envelope.try_into().unwrap(), profile_id: None, - event_metrics_extracted: false, - spans_extracted: false, reservoir: ReservoirEvaluator::new(ReservoirCounters::default()), } }