Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(server): Maintain copy of event item headers #3414

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 26 additions & 16 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,34 +481,34 @@ fn is_false(val: &bool) -> bool {
pub struct ItemHeaders {
/// The type of the item.
#[serde(rename = "type")]
ty: ItemType,
pub ty: ItemType,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Item and ItemHeaders were some of the few serializable data types on which we restricted field access. Making them public seems more aligned with other data types, and is necessary to access its fields during processing.


/// Content length of the item.
///
/// Can be omitted if the item does not contain new lines. In this case, the item payload is
/// parsed until the first newline is encountered.
#[serde(default, skip_serializing_if = "Option::is_none")]
length: Option<u32>,
pub length: Option<u32>,

/// If this is an attachment item, this may contain the attachment type.
#[serde(default, skip_serializing_if = "Option::is_none")]
attachment_type: Option<AttachmentType>,
pub attachment_type: Option<AttachmentType>,

/// Content type of the payload.
#[serde(default, skip_serializing_if = "Option::is_none")]
content_type: Option<ContentType>,
pub content_type: Option<ContentType>,

/// If this is an attachment item, this may contain the original file name.
#[serde(default, skip_serializing_if = "Option::is_none")]
filename: Option<String>,
pub filename: Option<String>,

/// The routing_hint may be used to specify how the envelpope should be routed in when
/// published to kafka.
///
/// XXX(epurkhiser): This is currently ONLY used for [`ItemType::CheckIn`]'s when publishing
/// the envelope into kafka.
#[serde(default, skip_serializing_if = "Option::is_none")]
routing_hint: Option<Uuid>,
pub routing_hint: Option<Uuid>,

/// Indicates that this item is being rate limited.
///
Expand All @@ -519,12 +519,12 @@ pub struct ItemHeaders {
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
rate_limited: bool,
pub rate_limited: bool,

/// Indicates that this item should be combined into one payload with other replay item.
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
replay_combined_payload: bool,
pub replay_combined_payload: bool,

/// Contains the amount of events this item was generated and aggregated from.
///
Expand All @@ -536,14 +536,14 @@ pub struct ItemHeaders {
///
/// NOTE: This is internal-only and not exposed into the Envelope.
#[serde(default, skip)]
source_quantities: Option<SourceQuantities>,
pub source_quantities: Option<SourceQuantities>,

/// A list of cumulative sample rates applied to this event.
///
/// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The
/// effective sample rate is multiplied.
#[serde(default, skip_serializing_if = "Option::is_none")]
sample_rates: Option<Value>,
pub sample_rates: Option<Value>,

/// Flag indicating if metrics have already been extracted from the item.
///
Expand All @@ -552,11 +552,11 @@ pub struct ItemHeaders {
/// the first Relay) MUST set this flat to true so that upstream Relays do
/// not extract the metric again causing double counting of the metric.
#[serde(default, skip_serializing_if = "is_false")]
metrics_extracted: bool,
pub metrics_extracted: bool,

/// Whether or not a transaction has been extracted from a segment span.
#[serde(default, skip_serializing_if = "is_false")]
transaction_extracted: bool,
pub transaction_extracted: bool,

/// Whether or not spans and span metrics have been extracted from a transaction.
///
Expand All @@ -566,15 +566,23 @@ pub struct ItemHeaders {
/// NOTE: This header is also set to `true` for transactions that are themselves extracted
/// from spans (the opposite direction), to prevent going in circles.
#[serde(default, skip_serializing_if = "is_false")]
spans_extracted: bool,
pub spans_extracted: bool,

/// `false` if the sampling decision is "drop".
///
/// In the most common use case, the item is dropped when the sampling decision is "drop".
/// For profiles with the feature enabled, however, we keep all profile items and mark the ones
/// for which the transaction was dropped as `sampled: false`.
#[serde(default = "default_true", skip_serializing_if = "is_true")]
sampled: bool,
pub sampled: bool,

/// The release, used by security reports.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sentry_release: Option<String>,

/// The environment, used by security reports.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sentry_environment: Option<String>,
Comment on lines +579 to +585
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were previously set and read using string access.


/// Other attributes for forward compatibility.
#[serde(flatten)]
Expand Down Expand Up @@ -621,8 +629,8 @@ impl AddAssign for SourceQuantities {

#[derive(Clone, Debug)]
pub struct Item {
headers: ItemHeaders,
payload: Bytes,
pub headers: ItemHeaders,
pub payload: Bytes,
}

impl Item {
Expand All @@ -645,6 +653,8 @@ impl Item {
transaction_extracted: false,
spans_extracted: false,
sampled: true,
sentry_release: None,
sentry_environment: None,
},
payload: Bytes::new(),
}
Expand Down
43 changes: 19 additions & 24 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use relay_metrics::{
};
use relay_pii::PiiConfigError;
use relay_profiling::ProfileId;
use relay_protocol::{Annotated, Value};
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, Scoping};
use relay_sampling::config::RuleId;
use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator};
Expand Down Expand Up @@ -65,7 +65,7 @@ use {
};

use crate::envelope::{
self, ContentType, Envelope, EnvelopeError, Item, ItemType, SourceQuantities,
self, ContentType, Envelope, EnvelopeError, Item, ItemHeaders, ItemType, SourceQuantities,
};
use crate::extractors::{PartialDsn, RequestMeta};
use crate::http;
Expand Down Expand Up @@ -571,26 +571,17 @@ struct ProcessEnvelopeState<'a, Group> {
/// extracted.
event: Annotated<Event>,

/// Track whether transaction metrics were already extracted.
event_metrics_extracted: bool,

/// Track whether spans and span metrics were already extracted.
/// Item headers of the extracted event.
///
/// Only applies to envelopes with a transaction item.
spans_extracted: bool,
/// `None` if the envelope does not contain an event item.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use a default here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ItemHeaders does not implement Default atm, it would require a default ItemType which sounds dangerous. In the end I think we should remove this from the state (see other comment).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use something like item headers instead which can represent the "default" case and also can be constructed from ItemHeaders as well as be turned into Option<ItemHeaders> again?

event_headers: Option<ItemHeaders>,

/// Partial metrics of the Event during construction.
///
/// The pipeline stages can add to this metrics objects. In `finalize_event`, the metrics are
/// persisted into the Event. All modifications afterwards will have no effect.
metrics: Metrics,

/// A list of cumulative sample rates applied to this event.
///
/// This element is obtained from the event or transaction item and re-serialized into the
/// resulting item.
sample_rates: Option<Value>,

/// Metrics extracted from items in the envelope.
///
/// Relay can extract metrics for sessions and transactions, which is controlled by
Expand Down Expand Up @@ -1071,10 +1062,8 @@ impl EnvelopeProcessorService {

ProcessEnvelopeState {
event: Annotated::empty(),
event_metrics_extracted: false,
spans_extracted: false,
event_headers: None,
metrics: Metrics::default(),
sample_rates: None,
extracted_metrics: Default::default(),
project_state,
sampling_project_state,
Expand Down Expand Up @@ -1115,7 +1104,13 @@ impl EnvelopeProcessorService {
// Tell the envelope limiter about the event, since it has been removed from the Envelope at
// this stage in processing.
if let Some(category) = event_category {
envelope_limiter.assume_event(category, state.event_metrics_extracted);
envelope_limiter.assume_event(
category,
state
.event_headers
.as_ref()
.map_or(false, |h| h.metrics_extracted),
);
}

let scoping = state.managed_envelope.scoping();
Expand Down Expand Up @@ -1163,15 +1158,15 @@ impl EnvelopeProcessorService {
_ => None,
};

if let Some(event) = state.event.value() {
if state.event_metrics_extracted {
if let (Some(event), Some(headers)) = (state.event.value(), &mut state.event_headers) {
if headers.metrics_extracted {
return Ok(());
}

if let Some(config) = config {
let metrics = crate::metrics_extraction::event::extract_metrics(
event,
state.spans_extracted,
headers.spans_extracted,
config,
self.inner
.config
Expand All @@ -1183,7 +1178,7 @@ impl EnvelopeProcessorService {
.options
.span_extraction_sample_rate,
);
state.event_metrics_extracted |= !metrics.is_empty();
headers.metrics_extracted |= !metrics.is_empty();
state.extracted_metrics.project_metrics.extend(metrics);
}

Expand All @@ -1208,11 +1203,11 @@ impl EnvelopeProcessorService {
};

state.extracted_metrics.extend(extractor.extract(event)?);
state.event_metrics_extracted |= true;
headers.metrics_extracted |= true;
}
}

if state.event_metrics_extracted {
if headers.metrics_extracted {
state.managed_envelope.set_event_metrics_extracted();
}
}
Expand Down
4 changes: 1 addition & 3 deletions relay-server/src/services/processor/dynamic_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ mod tests {

ProcessEnvelopeState {
event: Annotated::from(event),
event_headers: None,
metrics: Default::default(),
sample_rates: None,
extracted_metrics: Default::default(),
project_state: Arc::new(project_state),
sampling_project_state: None,
Expand All @@ -495,9 +495,7 @@ mod tests {
.try_into()
.unwrap(),
profile_id: None,
event_metrics_extracted: false,
reservoir: dummy_reservoir(),
spans_extracted: false,
}
};

Expand Down
Loading
Loading