Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(metrics-summaries): Remove metrics summaries from the codebase #4278

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))

**Internal**:

- Remove metrics summaries. ([#4278](https://github.com/getsentry/relay/pull/4278))

## 24.11.0

**Breaking Changes**:
Expand Down
151 changes: 0 additions & 151 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,8 +901,6 @@ impl StoreService {
});
}

self.produce_metrics_summary(item, &span);

self.produce(
KafkaTopic::Spans,
KafkaMessage::Span {
Expand Down Expand Up @@ -930,101 +928,6 @@ impl StoreService {
Ok(())
}

fn produce_metrics_summary(&self, item: &Item, span: &SpanKafkaMessage) {
let payload = item.payload();
let d = &mut Deserializer::from_slice(&payload);
let mut metrics_summary: SpanWithMetricsSummary = match serde_path_to_error::deserialize(d)
{
Ok(span) => span,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to parse metrics summary of span"
);
return;
}
};
let Some(metrics_summary) = &mut metrics_summary.metrics_summary else {
return;
};
let &SpanKafkaMessage {
duration_ms,
end_timestamp_precise,
is_segment,
project_id,
received,
retention_days,
segment_id,
span_id,
trace_id,
..
} = span;
let group = span
.sentry_tags
.as_ref()
.and_then(|sentry_tags| sentry_tags.get("group"))
.map_or("", String::as_str);

for (mri, summaries) in metrics_summary {
let Some(summaries) = summaries else {
continue;
};
for summary in summaries {
let Some(SpanMetricsSummary {
count,
max,
min,
sum,
tags,
}) = summary
else {
continue;
};

let &mut Some(count) = count else {
continue;
};

if count == 0 {
continue;
}

let tags = tags
.iter_mut()
.filter_map(|(k, v)| Some((k.as_str(), v.as_deref()?)))
.collect();

// Ignore immediate errors on produce.
if let Err(error) = self.produce(
KafkaTopic::MetricsSummaries,
KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage {
count,
duration_ms,
end_timestamp: end_timestamp_precise,
group,
is_segment,
max,
min,
mri,
project_id,
received,
retention_days,
segment_id: segment_id.unwrap_or_default(),
span_id,
sum,
tags,
trace_id,
}),
) {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to push metrics summary to kafka",
);
}
}
}
}

fn produce_profile_chunk(
&self,
organization_id: OrganizationId,
Expand Down Expand Up @@ -1371,53 +1274,6 @@ fn none_or_empty_object(value: &Option<&RawValue>) -> bool {
}
}

#[derive(Debug, Deserialize)]
struct SpanMetricsSummary {
#[serde(default)]
count: Option<u64>,
#[serde(default)]
max: Option<f64>,
#[serde(default)]
min: Option<f64>,
#[serde(default)]
sum: Option<f64>,
#[serde(default)]
tags: BTreeMap<String, Option<String>>,
}

type SpanMetricsSummaries = Vec<Option<SpanMetricsSummary>>;

#[derive(Debug, Deserialize)]
struct SpanWithMetricsSummary {
#[serde(default, rename(deserialize = "_metrics_summary"))]
metrics_summary: Option<BTreeMap<String, Option<SpanMetricsSummaries>>>,
}

#[derive(Debug, Serialize)]
struct MetricsSummaryKafkaMessage<'a> {
duration_ms: u32,
end_timestamp: f64,
group: &'a str,
is_segment: bool,
mri: &'a str,
project_id: u64,
received: f64,
retention_days: u16,
segment_id: &'a str,
span_id: &'a str,
trace_id: EventId,

count: u64,
#[serde(skip_serializing_if = "Option::is_none")]
max: &'a Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
min: &'a Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
sum: &'a Option<f64>,
#[serde(skip_serializing_if = "BTreeMap::is_empty")]
tags: BTreeMap<&'a str, &'a str>,
}

#[derive(Clone, Debug, Serialize)]
struct ProfileChunkKafkaMessage {
organization_id: OrganizationId,
Expand Down Expand Up @@ -1452,7 +1308,6 @@ enum KafkaMessage<'a> {
#[serde(flatten)]
message: SpanKafkaMessage<'a>,
},
MetricsSummary(MetricsSummaryKafkaMessage<'a>),
ProfileChunk(ProfileChunkKafkaMessage),
}

Expand All @@ -1477,7 +1332,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span { .. } => "span",
KafkaMessage::MetricsSummary(_) => "metrics_summary",
KafkaMessage::ProfileChunk(_) => "profile_chunk",
}
}
Expand All @@ -1501,7 +1355,6 @@ impl Message for KafkaMessage<'_> {
Self::Profile(_)
| Self::Span { .. }
| Self::ReplayRecordingNotChunked(_)
| Self::MetricsSummary(_)
| Self::ProfileChunk(_) => Uuid::nil(),

// TODO(ja): Determine a partitioning key
Expand Down Expand Up @@ -1551,10 +1404,6 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::Span { message, .. } => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),
KafkaMessage::MetricsSummary(message) => serde_json::to_vec(message)
.map(Cow::Owned)
.map_err(ClientError::InvalidJson),

_ => rmp_serde::to_vec_named(&self)
.map(Cow::Owned)
.map_err(ClientError::InvalidMsgPack),
Expand Down
1 change: 0 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
monitors_consumer,
spans_consumer,
profiles_consumer,
metrics_summaries_consumer,
feedback_consumer,
)

Expand Down
24 changes: 0 additions & 24 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def inner(options=None):
"monitors": get_topic_name("monitors"),
"spans": get_topic_name("spans"),
"profiles": get_topic_name("profiles"),
"metrics_summaries": get_topic_name("metrics_summaries"),
"feedback": get_topic_name("feedback"),
}

Expand Down Expand Up @@ -354,11 +353,6 @@ def profiles_consumer(consumer_fixture):
yield from consumer_fixture(ProfileConsumer, "profiles")


@pytest.fixture
def metrics_summaries_consumer(consumer_fixture):
yield from consumer_fixture(MetricsSummariesConsumer, "metrics_summaries")


class MetricsConsumer(ConsumerBase):
def get_metric(self, timeout=None):
message = self.poll(timeout=timeout)
Expand Down Expand Up @@ -523,24 +517,6 @@ def get_profile(self):
return msgpack.loads(message.value()), message.headers()


class MetricsSummariesConsumer(ConsumerBase):
def get_metrics_summary(self):
message = self.poll()
assert message is not None
assert message.error() is None

return json.loads(message.value())

def get_metrics_summaries(self, timeout=None, n=None):
metrics_summaries = []

for message in self.poll_many(timeout=timeout, n=n):
assert message.error() is None
metrics_summaries.append(json.loads(message.value()))

return metrics_summaries


class CogsConsumer(ConsumerBase):
def get_measurement(self, timeout=None):
message = self.poll(timeout=timeout)
Expand Down
Loading
Loading