Skip to content

Commit

Permalink
Filter values when producing to Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops committed Jan 19, 2024
1 parent ed16613 commit 7f356d2
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,27 @@ impl StoreService {
}
};

if let Some(measurements) = &mut span.measurements {
measurements.retain(|_, v| {
if let Some(v) = v {
v.value.map_or(false, f64::is_finite)
} else {
false
}
});
}

if let Some(metrics_summary) = &mut span.metrics_summary {
metrics_summary.retain(|_, v| {
if let Some(v) = v {
v.retain(|v| v.is_some());
true
} else {
false
}
});
}

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = scoping.project_id.value();
Expand Down Expand Up @@ -1170,6 +1191,26 @@ struct CheckInKafkaMessage {
retention_days: u16,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanMeasurement {
#[serde(default, skip_serializing_if = "Option::is_none")]
value: Option<f64>,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanMetricsSummary {
#[serde(default, skip_serializing_if = "Option::is_none")]
count: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
max: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
min: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sum: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
tags: Option<BTreeMap<String, Option<String>>>,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanKafkaMessage<'a> {
#[serde(skip_serializing)]
Expand All @@ -1189,13 +1230,13 @@ struct SpanKafkaMessage<'a> {
is_segment: bool,

#[serde(default, skip_serializing_if = "Option::is_none")]
measurements: Option<&'a RawValue>,
measurements: Option<BTreeMap<&'a str, Option<SpanMeasurement>>>,
#[serde(
default,
rename = "_metrics_summary",
skip_serializing_if = "Option::is_none"
)]
metrics_summary: Option<&'a RawValue>,
metrics_summary: Option<BTreeMap<&'a str, Option<Vec<Option<SpanMetricsSummary>>>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_span_id: Option<&'a str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down

0 comments on commit 7f356d2

Please sign in to comment.