Skip to content

Commit

Permalink
Filter values when producing to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops committed Jan 19, 2024
1 parent ed16613 commit 32d66ee
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 17 deletions.
15 changes: 0 additions & 15 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,6 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error>
ref mut timestamp,
ref mut span_id,
ref mut trace_id,
ref mut measurements,
ref mut _metrics_summary,
..
} = inner;

Expand Down Expand Up @@ -467,19 +465,6 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error>
if let Some(tags) = tags.value_mut() {
tags.retain(|_, value| !value.value().is_empty())
}
if let Some(measurements) = measurements.value_mut() {
measurements.retain(|_, v| {
v.value()
.and_then(|v| v.value.value().cloned())
.map_or(false, f64::is_finite)
});
}
if let Some(metrics_summary) = _metrics_summary.value_mut() {
metrics_summary.0.retain(|_, value| match value.value() {
Some(v) => !v.is_empty() && v.iter().all(|v| !v.is_empty()),
None => false,
});
}

Ok(span)
}
Expand Down
53 changes: 51 additions & 2 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,35 @@ impl StoreService {
}
};

if let Some(measurements) = &mut span.measurements {
measurements.retain(|_, v| {
if let Some(v) = v {
v.value.map_or(false, f64::is_finite)
} else {
false
}
});
}

if let Some(metrics_summary) = &mut span.metrics_summary {
metrics_summary.retain(|_, mut v| {
if let Some(v) = &mut v {
v.retain(|v| {
if let Some(v) = v {
return v.min.is_some()
|| v.max.is_some()
|| v.sum.is_some()
|| v.count.is_some();
}
false
});
!v.is_empty()
} else {
false
}
});
}

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = scoping.project_id.value();
Expand Down Expand Up @@ -1170,6 +1199,26 @@ struct CheckInKafkaMessage {
retention_days: u16,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanMeasurement {
#[serde(default, skip_serializing_if = "Option::is_none")]
value: Option<f64>,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanMetricsSummary {
#[serde(default, skip_serializing_if = "Option::is_none")]
count: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
max: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
min: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sum: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
tags: Option<BTreeMap<String, String>>,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanKafkaMessage<'a> {
#[serde(skip_serializing)]
Expand All @@ -1189,13 +1238,13 @@ struct SpanKafkaMessage<'a> {
is_segment: bool,

#[serde(default, skip_serializing_if = "Option::is_none")]
measurements: Option<&'a RawValue>,
measurements: Option<BTreeMap<&'a str, Option<SpanMeasurement>>>,
#[serde(
default,
rename = "_metrics_summary",
skip_serializing_if = "Option::is_none"
)]
metrics_summary: Option<&'a RawValue>,
metrics_summary: Option<BTreeMap<&'a str, Option<Vec<Option<SpanMetricsSummary>>>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_span_id: Option<&'a str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down
62 changes: 62 additions & 0 deletions tests/integration/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1812,3 +1812,65 @@ def test_span_extraction_with_ddm(
}

spans_consumer.assert_empty()


def test_span_extraction_with_ddm_missing_values(
mini_sentry,
relay_with_processing,
spans_consumer,
):
spans_consumer = spans_consumer()

relay = relay_with_processing()
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["spanAttributes"] = ["exclusive-time"]
project_config["config"]["features"] = [
"organizations:custom-metrics",
]

event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"})
metrics_summary = {
"c:spans/some_metric@none": [
{
"min": None,
"max": 2.0,
"count": 4,
"tags": {
"environment": "test",
},
},
],
}
event["_metrics_summary"] = metrics_summary

relay.send_event(project_id, event)

start_timestamp = datetime.fromisoformat(event["start_timestamp"])
end_timestamp = datetime.fromisoformat(event["timestamp"])
duration_ms = int((end_timestamp - start_timestamp).total_seconds() * 1e3)

metrics_summary["c:spans/some_metric@none"][0].pop("min", None)
metrics_summary["c:spans/some_metric@none"][0]["tags"].pop("random", None)

transaction_span = spans_consumer.get_span()
del transaction_span["received"]
assert transaction_span == {
"duration_ms": duration_ms,
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"project_id": 42,
"retention_days": 90,
"description": "hi",
"exclusive_time_ms": 2000.0,
"is_segment": True,
"segment_id": "968cff94913ebb07",
"sentry_tags": {"transaction": "hi", "transaction.op": "hi"},
"span_id": "968cff94913ebb07",
"start_timestamp_ms": int(
start_timestamp.replace(tzinfo=timezone.utc).timestamp() * 1e3
),
"trace_id": "a0fa8803753e40fd8124b21eeb2986b5",
"_metrics_summary": metrics_summary,
}

spans_consumer.assert_empty()

0 comments on commit 32d66ee

Please sign in to comment.