Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Extract transaction from segment span #3375

Merged
merged 29 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d907bfd
ref: Drive-by refactor
jjbayer Apr 4, 2024
aa8df48
wip
jjbayer Apr 4, 2024
e16ad3a
Compiling envelope copy
jjbayer Apr 4, 2024
0f1526d
test: See test fail because of duplicate spans
jjbayer Apr 4, 2024
b8177bc
fix: Prevent duplicate span ingestion
jjbayer Apr 5, 2024
3896772
fix: extract after normalize
jjbayer Apr 8, 2024
39f7005
fix: No empty profile context
jjbayer Apr 8, 2024
30d1bc1
Merge remote-tracking branch 'origin/master' into feat/spans-to-trans…
jjbayer Apr 8, 2024
dd2af1e
fix: span metrics
jjbayer Apr 9, 2024
bb6467b
test
jjbayer Apr 9, 2024
5220d1d
fix: test & lint
jjbayer Apr 9, 2024
9461d2a
Merge branch 'master' into feat/spans-to-transaction
jjbayer Apr 9, 2024
800ff44
cleanup
jjbayer Apr 9, 2024
3b5189f
Merge branch 'master' into feat/spans-to-transaction
jjbayer Apr 9, 2024
d13abf8
Merge branch 'master' into feat/spans-to-transaction
jjbayer Apr 9, 2024
3247cd6
test
jjbayer Apr 10, 2024
ac29ffe
Adapt test
jjbayer Apr 10, 2024
a6b15d3
Merge branch 'master' into feat/spans-to-transaction
jjbayer Apr 10, 2024
d31d662
fix: no duplicate transaction metric
jjbayer Apr 10, 2024
fbdffe7
Merge remote-tracking branch 'origin/master' into feat/spans-to-trans…
jjbayer Apr 11, 2024
bac3566
ref: review comments
jjbayer Apr 11, 2024
7a9fcef
more instr
jjbayer Apr 11, 2024
8d55e8d
Merge remote-tracking branch 'origin/master' into feat/spans-to-trans…
jjbayer Apr 11, 2024
ef0c15c
Update relay-event-normalization/src/normalize/span/exclusive_time.rs
jjbayer Apr 11, 2024
34c8012
fix: metric in wrong place
jjbayer Apr 11, 2024
469e96c
fix: always normalize
jjbayer Apr 11, 2024
0cf16a2
clean
jjbayer Apr 11, 2024
e0fae23
fix: Get permit for spin-off envelope
jjbayer Apr 11, 2024
6580ad8
more fixes
jjbayer Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions relay-sampling/src/evaluation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl<'a> ReservoirEvaluator<'a> {
}
}

/// Returns a shared reference to the reservoir counters.
/// Gets shared ownership of the reference counters.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
pub fn counters(&self) -> ReservoirCounters {
self.counters.clone()
Arc::clone(&self.counters)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to split off a ProcessEnvelope.


/// Sets the Redis pool and organiation ID for the [`ReservoirEvaluator`].
Expand Down
54 changes: 40 additions & 14 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use relay_event_normalization::{
};
use relay_event_schema::processor::{process_value, ProcessingState};
use relay_event_schema::protocol::{BrowserContext, Contexts, Event, EventId, Span, SpanData};
use relay_log::protocol::{Attachment, AttachmentType};
use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp};
use relay_pii::PiiProcessor;
use relay_protocol::{Annotated, Empty};
Expand All @@ -28,7 +29,13 @@ use crate::services::processor::{
Addrs, ProcessEnvelope, ProcessEnvelopeState, ProcessingError, ProcessingGroup, SpanGroup,
TransactionGroup,
};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{sample, ItemAction, ManagedEnvelope};
use thiserror::Error;

#[derive(Error, Debug)]
#[error(transparent)]
struct ValidationError(#[from] anyhow::Error);

pub fn process(
state: &mut ProcessEnvelopeState<SpanGroup>,
Expand Down Expand Up @@ -88,6 +95,15 @@ pub fn process(
_ => return ItemAction::Keep,
};

set_segment_attributes(&mut annotated_span);

if should_extract_transactions && !item.transaction_extracted() {
if let Some(transaction) = convert_to_transaction(&annotated_span) {
extracted_transactions.push(transaction);
item.set_transaction_extracted(true);
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
}
}

if let Err(e) = normalize(
&mut annotated_span,
normalize_span_config.clone(),
Expand All @@ -98,13 +114,6 @@ pub fn process(
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

if should_extract_transactions && !item.transaction_extracted() {
if let Some(transaction) = convert_to_transaction(&annotated_span) {
extracted_transactions.push(transaction);
item.set_transaction_extracted(true);
}
}

if let Some(config) = span_metrics_extraction_config {
let Some(span) = annotated_span.value_mut() else {
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
Expand Down Expand Up @@ -173,6 +182,7 @@ pub fn process(
ItemAction::Keep
});

let mut transaction_count = 0;
for mut transaction in extracted_transactions {
// Give each transaction event a new random ID:
transaction.id = EventId::new().into();
Expand All @@ -186,6 +196,8 @@ pub fn process(
item.set_spans_extracted(true);
}

transaction_count += 1;

addrs.envelope_processor.send(ProcessEnvelope {
envelope: ManagedEnvelope::standalone(
envelope,
Expand All @@ -195,14 +207,21 @@ pub fn process(
),
project_state: state.project_state.clone(),
sampling_project_state: state.sampling_project_state.clone(),
reservoir_counters: state.reservoir.counters().clone(),
reservoir_counters: state.reservoir.counters(),
});
}
Err(e) => {
relay_log::error!("Failed to create event envelope: {e}");
iker-barriocanal marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

if transaction_count > 0 {
relay_statsd::metric!(counter(RelayCounters::TransactionsFromSpans) += transaction_count);
relay_statsd::metric!(
histogram(RelayHistograms::TransactionsFromSpansPerEnvelope) = transaction_count as u64
);
}
}

pub fn extract_from_event(
Expand Down Expand Up @@ -371,6 +390,19 @@ fn get_normalize_span_config<'a>(
}
}

fn set_segment_attributes(span: &mut Annotated<Span>) {
let Some(span) = span.value_mut() else { return };

// TODO: A span might be a segment span even if the parent_id is not empty
// (parent within a trace). I.e. do not overwrite here.
let is_segment = span.parent_span_id.is_empty();
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved

span.is_segment = Annotated::new(is_segment);
if is_segment {
span.segment_id = span.span_id.clone();
}
}

/// Normalizes a standalone span.
fn normalize(
annotated_span: &mut Annotated<Span>,
Expand Down Expand Up @@ -438,14 +470,8 @@ fn normalize(
);
}

let is_segment = span.parent_span_id.is_empty();
span.is_segment = Annotated::new(is_segment);
span.received = Annotated::new(received_at.into());

if is_segment {
span.segment_id = span.span_id.clone();
}

if let Some(transaction) = span
.data
.value_mut()
Expand Down
7 changes: 7 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ pub enum RelayTimers {
/// This metric is tagged with:
/// - `type`: The type of the health check, `liveness` or `readiness`.
HealthCheckDuration,

/// Measurees how many transactions were created from segment spans in a single envelope.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Measurees how many transactions were created from segment spans in a single envelope.
/// Measures how many transactions were created from segment spans in a single envelope.

TransactionsFromSpansPerEnvelope,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the variant to RelayHistograms?

}

impl TimerMetric for RelayTimers {
Expand Down Expand Up @@ -420,6 +423,7 @@ impl TimerMetric for RelayTimers {
RelayTimers::BufferMessageProcessDuration => "buffer.message.duration",
RelayTimers::ProjectCacheTaskDuration => "project_cache.task.duration",
RelayTimers::HealthCheckDuration => "health.message.duration",
RelayTimers::TransactionsFromSpansPerEnvelope => "transactions_from_spans_per_envelope",
}
}
}
Expand Down Expand Up @@ -642,6 +646,8 @@ pub enum RelayCounters {
/// This metric is tagged with:
/// - `decision`: "drop" if dynamic sampling drops the envelope, else "keep".
DynamicSamplingDecision,
/// Counts how many transactions were created from segment spans.
TransactionsFromSpans,
}

impl CounterMetric for RelayCounters {
Expand Down Expand Up @@ -680,6 +686,7 @@ impl CounterMetric for RelayCounters {
RelayCounters::OpenTelemetryEvent => "event.opentelemetry",
RelayCounters::GlobalConfigFetched => "global_config.fetch",
RelayCounters::DynamicSamplingDecision => "dynamic_sampling_decision",
RelayCounters::TransactionsFromSpans => "transactions_from_spans",
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def test_span_ingestion(
relay.send_envelope(
project_id,
envelope,
headers={
headers={ # Set browser header to verify that `d:transactions/measurements.score.total@ratio` is extracted only once.
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
},
)
Expand Down Expand Up @@ -475,9 +475,7 @@ def test_span_ingestion(
headers={"Content-Type": "application/x-protobuf"},
)

print("Waiting for spans..")
spans = list(spans_consumer.get_spans(timeout=10.0, max_attempts=6))
print("Done waiting for spans.")

for span in spans:
span.pop("received", None)
Expand Down Expand Up @@ -592,7 +590,6 @@ def test_span_ingestion(
},
]

print("Asserting emptiness")
spans_consumer.assert_empty()

# If transaction extraction is enabled, expect transactions:
Expand All @@ -614,12 +611,9 @@ def test_span_ingestion(
# No errors during normalization:
assert not transaction.get("errors")

print("Asserting emptiness")
transactions_consumer.assert_empty()

print("Waiting for metrics")
metrics = [metric for (metric, _headers) in metrics_consumer.get_metrics()]
print("/Waiting for metrics")
metrics.sort(key=lambda m: (m["name"], sorted(m["tags"].items()), m["timestamp"]))
for metric in metrics:
try:
Expand Down
Loading