Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Convert a Sentry span to the Snuba span schema #2917

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add `duration` metric for mobile app start spans. ([#2906](https://github.com/getsentry/relay/pull/2906))
- Introduce the configuration option `http.global_metrics`. When enabled, Relay submits metric buckets not through regular project-scoped Envelopes, but instead through the global endpoint. When this Relay serves a high number of projects, this can reduce the overall request volume. ([#2902](https://github.com/getsentry/relay/pull/2902))
- Record the size of global metrics requests in statsd as `upstream.metrics.body_size`. ([#2908](https://github.com/getsentry/relay/pull/2908))
- Make Kafka spans compatible with the Snuba span schema. ([#2917](https://github.com/getsentry/relay/pull/2917))
- Only extract span metrics / tags when they are needed. ([#2907](https://github.com/getsentry/relay/pull/2907))

## 23.12.1
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ processing = [

[dependencies]
anyhow = { workspace = true }
serde_path_to_error = "0.1.14"
axum = { version = "0.6.20", features = [
"headers",
"macros",
Expand Down
8 changes: 7 additions & 1 deletion relay-server/src/actors/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,15 @@ fn normalize(
let Some(span) = annotated_span.value_mut() else {
return Err(ProcessingError::NoEventPayload);
};
span.is_segment = Annotated::new(span.parent_span_id.is_empty());

let is_segment = span.parent_span_id.is_empty();
span.is_segment = Annotated::new(is_segment);
span.received = Annotated::new(received_at.into());

if is_segment {
span.segment_id = span.span_id.clone();
}

// Tag extraction:
let config = tag_extraction::Config { max_tag_value_size };
let is_mobile = false; // TODO: find a way to determine is_mobile from a standalone span.
Expand Down
85 changes: 54 additions & 31 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, FromMessage, Interface, NoResponse, Sender, Service};
use serde::ser::Error;
use serde::Serialize;
use serde_json::value::RawValue;
use serde::{Deserialize, Serialize};
use serde_json::value::Value;
use serde_json::Deserializer;
use uuid::Uuid;

use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
Expand Down Expand Up @@ -824,12 +825,13 @@ impl StoreService {
organization_id: u64,
project_id: ProjectId,
event_id: Option<EventId>,
start_time: Instant,
_: Instant,
retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
let payload = item.payload();
let span = match serde_json::from_slice(&payload) {
let d = &mut Deserializer::from_slice(&payload);
let mut span: SpanKafkaMessage = match serde_path_to_error::deserialize(d) {
Ok(span) => span,
Err(error) => {
relay_log::error!(
Expand All @@ -839,19 +841,14 @@ impl StoreService {
return Ok(());
}
};
let message = SpanKafkaMessage {
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
event_id,
organization_id,
project_id,
retention_days,
span,
};
self.produce(
KafkaTopic::Spans,
organization_id,
KafkaMessage::Span(message),
)?;

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64;

self.produce(KafkaTopic::Spans, organization_id, KafkaMessage::Span(span))?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
Expand Down Expand Up @@ -1136,24 +1133,50 @@ struct CheckInKafkaMessage {
retention_days: u16,
}

#[derive(Debug, Serialize)]
struct SpanKafkaMessage<'a> {
/// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
start_time: u64,
#[derive(Debug, Deserialize, Serialize)]
struct SpanKafkaMessage {
#[serde(skip_serializing)]
start_timestamp: f64,
#[serde(rename(deserialize = "timestamp"), skip_serializing)]
end_timestamp: f64,

description: String,
#[serde(default)]
duration_ms: u32,
/// The ID of the transaction event associated to this span, if any.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default, skip_serializing_if = "Option::is_none")]
event_id: Option<EventId>,
/// The numeric ID of the organization.
organization_id: u64,
#[serde(rename(deserialize = "exclusive_time"))]
exclusive_time_ms: f64,
is_segment: bool,
#[serde(default, skip_serializing_if = "Value::is_null")]
measurements: Value,
#[serde(
default,
rename = "_metrics_summary",
skip_serializing_if = "Value::is_null"
)]
metrics_summary: Value,
#[serde(default, skip_serializing_if = "String::is_empty")]
parent_span_id: String,
/// The numeric ID of the project.
project_id: ProjectId,
#[serde(default, skip_serializing_if = "Option::is_none")]
profile_id: Option<EventId>,
#[serde(default)]
project_id: u64,
/// Time at which the event was received by Relay. Not to be confused with `start_timestamp_ms`.
received: f64,
/// Number of days until these data should be deleted.
#[serde(default)]
retention_days: u16,
/// Fields from the original span payload.
/// See [`relay-event-schema::protocol::span::Span`] for schema.
///
/// By using a [`RawValue`] here, we can embed the span's JSON without additional parsing.
span: &'a RawValue,
#[serde(default)]
segment_id: String,
sentry_tags: BTreeMap<String, String>,
span_id: String,
#[serde(default)]
start_timestamp_ms: u64,
tags: BTreeMap<String, String>,
trace_id: EventId,
}

/// An enum over all possible ingest messages.
Expand All @@ -1176,7 +1199,7 @@ enum KafkaMessage<'a> {
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
CheckIn(CheckInKafkaMessage),
Span(SpanKafkaMessage<'a>),
Span(SpanKafkaMessage),
}

impl Message for KafkaMessage<'_> {
Expand Down
Loading