diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b4cdea0f1..40be5f6720 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Use a Lua script and in-memory cache for the cardinality limiting to reduce load on Redis. ([#2849](https://github.com/getsentry/relay/pull/2849)) - Extract metrics for file spans. ([#2874](https://github.com/getsentry/relay/pull/2874)) - Add an internal endpoint that allows Relays to submit metrics from multiple projects in a single request. ([#2869](https://github.com/getsentry/relay/pull/2869)) +- Introduce the configuration option `http.global_metrics`. When enabled, Relay submits metric buckets not through regular project-scoped Envelopes, but instead through the global endpoint. When this Relay serves a high number of projects, this can reduce the overall request volume. ([#2902](https://github.com/getsentry/relay/pull/2902)) - Emit a `processor.message.duration` metric to assess the throughput of the internal CPU pool. ([#2877](https://github.com/getsentry/relay/pull/2877)) ## 23.12.0 diff --git a/Cargo.lock b/Cargo.lock index c239ec21a9..c640e844a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3921,6 +3921,7 @@ dependencies = [ "chrono", "data-encoding", "flate2", + "fnv", "futures", "hash32", "hashbrown 0.13.2", diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 9edb928946..bfb65d3349 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -71,13 +71,9 @@ impl BucketKey { /// /// This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { - BucketKeyRef { - project_key: self.project_key, - timestamp: self.timestamp, - metric_name: &self.metric_name, - tags: &self.tags, - } - .hash64() + let mut hasher = FnvHasher::default(); + std::hash::Hash::hash(self, &mut hasher); + hasher.finish() } /// Estimates the number of bytes needed to encode the bucket key. @@ -97,29 +93,6 @@ impl BucketKey { } } -/// Pendant to [`BucketKey`] for referenced data, not owned data. -/// -/// This makes it possible to compute a hash for a [`Bucket`] -/// without destructing the bucket into a [`BucketKey`]. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -struct BucketKeyRef<'a> { - project_key: ProjectKey, - timestamp: UnixTimestamp, - metric_name: &'a str, - tags: &'a BTreeMap, -} - -impl<'a> BucketKeyRef<'a> { - /// Creates a 64-bit hash of the bucket key using FnvHasher. - /// - /// This is used for partition key computation and statsd logging. - fn hash64(&self) -> u64 { - let mut hasher = FnvHasher::default(); - std::hash::Hash::hash(self, &mut hasher); - hasher.finish() - } -} - /// Estimates the number of bytes needed to encode the tags. /// /// Note that this does not necessarily match the exact memory footprint of the tags, @@ -888,36 +861,6 @@ impl fmt::Debug for Aggregator { } } -/// Splits buckets into N logical partitions, determined by the bucket key. -pub fn partition_buckets( - project_key: ProjectKey, - buckets: impl IntoIterator, - flush_partitions: Option, -) -> BTreeMap, Vec> { - let flush_partitions = match flush_partitions { - None => return BTreeMap::from([(None, buckets.into_iter().collect())]), - Some(x) => x.max(1), // handle 0, - }; - let mut partitions = BTreeMap::<_, Vec>::new(); - for bucket in buckets { - let key = BucketKeyRef { - project_key, - timestamp: bucket.timestamp, - metric_name: &bucket.name, - tags: &bucket.tags, - }; - - let partition_key = key.hash64() % flush_partitions; - partitions - .entry(Some(partition_key)) - .or_default() - .push(bucket); - - relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key); - } - partitions -} - #[cfg(test)] mod tests { diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index 3c64fbd9d8..0fe6844535 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -124,13 +124,6 @@ pub enum MetricHistograms { /// time period (`false`) or after the initial delay has expired (`true`). BucketsDelay, - /// - /// Distribution of flush buckets over partition keys. - /// - /// The distribution of buckets should be even. - /// If it is not, this metric should expose it. - PartitionKeys, - /// Distribution of invalid bucket timestamps observed, relative to the time of observation. /// /// This is a temporary metric to better understand why we see so many invalid timestamp errors. @@ -143,7 +136,6 @@ impl HistogramMetric for MetricHistograms { Self::BucketsFlushed => "metrics.buckets.flushed", Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project", Self::BucketsDelay => "metrics.buckets.delay", - Self::PartitionKeys => "metrics.buckets.partition_keys", Self::InvalidBucketTimestamp => "metrics.buckets.invalid_timestamp", } } diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs index 85572b26bd..a0b88a7ea2 100644 --- a/relay-metrics/src/view.rs +++ b/relay-metrics/src/view.rs @@ -15,6 +15,11 @@ use crate::BucketValue; /// and buckets larger will be split up. const BUCKET_SPLIT_FACTOR: usize = 32; +/// The base size of a serialized bucket in bytes. +/// +/// This is the size of a bucket's fixed fields in JSON format, excluding the value and tags. +const BUCKET_SIZE: usize = 50; + /// The average size of values when serialized. const AVG_VALUE_SIZE: usize = 8; @@ -276,14 +281,10 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> { } SplitDecision::MoveToNextBatch => break, SplitDecision::Split(at) => { - // Only certain buckets can be split, if the bucket can't be split, - // move it to the next batch. - if bucket.can_split() { - self.current = Index { - slice: self.current.slice, - bucket: self.current.bucket + at, - }; - } + self.current = Index { + slice: self.current.slice, + bucket: self.current.bucket + at, + }; break; } } @@ -332,6 +333,7 @@ impl<'a> Serialize for BucketsView<'a> { /// ``` /// /// A view can be split again into multiple smaller views. +#[derive(Clone)] pub struct BucketView<'a> { /// The source bucket. inner: &'a Bucket, @@ -427,6 +429,46 @@ impl<'a> BucketView<'a> { Some(self) } + /// Estimates the number of bytes needed to serialize the bucket without value. + /// + /// Note that this does not match the exact size of the serialized payload. Instead, the size is + /// approximated through tags and a static overhead. + fn estimated_base_size(&self) -> usize { + BUCKET_SIZE + self.name().len() + aggregator::tags_cost(self.tags()) + } + + /// Estimates the number of bytes needed to serialize the bucket. + /// + /// Note that this does not match the exact size of the serialized payload. Instead, the size is + /// approximated through the number of contained values, assuming an average size of serialized + /// values. + pub fn estimated_size(&self) -> usize { + self.estimated_base_size() + self.len() * AVG_VALUE_SIZE + } + + /// Calculates a split for this bucket if its estimated serialization size exceeds a threshold. + /// + /// There are three possible return values: + /// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split. + /// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no + /// split, the entire bucket is moved. + /// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket + /// with all other information cloned. + /// + /// This is an approximate function. The bucket is not actually serialized, but rather its + /// footprint is estimated through the number of data points contained. See + /// [`estimated_size`](Self::estimated_size) for more information. + pub fn split(self, size: usize, max_size: Option) -> (Option, Option) { + match split_at(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) { + SplitDecision::BucketFits(_) => (Some(self), None), + SplitDecision::MoveToNextBatch => (None, Some(self)), + SplitDecision::Split(at) => { + let Range { start, end } = self.range.clone(); + (self.clone().select(start..at), self.select(at..end)) + } + } + } + /// Whether the bucket can be split into multiple. /// /// Only set and distribution buckets can be split. @@ -624,14 +666,18 @@ enum SplitDecision { /// `estimate_size` for more information. fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision { // If there's enough space for the entire bucket, do not perform a split. - let bucket_size = estimate_size(bucket); + let bucket_size = bucket.estimated_size(); if max_size >= bucket_size { return SplitDecision::BucketFits(bucket_size); } + if !bucket.can_split() { + return SplitDecision::MoveToNextBatch; + } + // If the bucket key can't even fit into the remaining length, move the entire bucket into // the right-hand side. - let own_size = estimate_base_size(bucket); + let own_size = bucket.estimated_base_size(); if max_size < (own_size + AVG_VALUE_SIZE) { // split_at must not be zero return SplitDecision::MoveToNextBatch; @@ -644,27 +690,9 @@ fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> // Perform a split with the remaining space after adding the key. We assume an average // length of 8 bytes per value and compute the number of items fitting into the left side. let split_at = (max_size - own_size) / AVG_VALUE_SIZE; - SplitDecision::Split(split_at) } -/// Estimates the number of bytes needed to serialize the bucket without value. -/// -/// Note that this does not match the exact size of the serialized payload. Instead, the size is -/// approximated through tags and a static overhead. -fn estimate_base_size(bucket: &BucketView<'_>) -> usize { - 50 + bucket.name().len() + aggregator::tags_cost(bucket.tags()) -} - -/// Estimates the number of bytes needed to serialize the bucket. -/// -/// Note that this does not match the exact size of the serialized payload. Instead, the size is -/// approximated through the number of contained values, assuming an average size of serialized -/// values. -fn estimate_size(bucket: &BucketView<'_>) -> usize { - estimate_base_size(bucket) + bucket.len() * AVG_VALUE_SIZE -} - #[cfg(test)] mod tests { use insta::assert_json_snapshot; @@ -919,7 +947,7 @@ b3:42:75|s"#; .by_size(100) .map(|bv| { let len: usize = bv.iter().map(|b| b.len()).sum(); - let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + let size: usize = bv.iter().map(|b| b.estimated_size()).sum(); (len, size) }) .collect::>(); @@ -945,7 +973,7 @@ b3:42:75|s"#; .by_size(250) .map(|bv| { let len: usize = bv.iter().map(|b| b.len()).sum(); - let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + let size: usize = bv.iter().map(|b| b.estimated_size()).sum(); (len, size) }) .collect::>(); @@ -971,7 +999,7 @@ b3:42:75|s"#; .by_size(500) .map(|bv| { let len: usize = bv.iter().map(|b| b.len()).sum(); - let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + let size: usize = bv.iter().map(|b| b.estimated_size()).sum(); (len, size) }) .collect::>(); diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 22433d7c87..c0fcee4a5c 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -50,6 +50,7 @@ bytes = { version = "1.4.0" } chrono = { workspace = true, features = ["clock"] } data-encoding = "2.3.3" flate2 = "1.0.19" +fnv = "1.0.7" futures = { workspace = true } hash32 = { workspace = true } hashbrown = { workspace = true } @@ -95,7 +96,7 @@ rmp-serde = "1.1.1" rust-embed = { version = "8.0.0", optional = true } serde = { workspace = true } serde_json = { workspace = true } -smallvec = { workspace = true, features = ["drain_filter"] } +smallvec = { workspace = true, features = ["drain_filter"] } sqlx = { version = "0.7.0", features = [ "macros", "migrate", diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 2fb489934d..271676a74f 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -1,6 +1,8 @@ -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::error::Error; +use std::future::Future; use std::io::Write; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -10,6 +12,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::Compression; +use fnv::FnvHasher; use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_common::time::UnixTimestamp; use relay_config::{Config, HttpEncoding}; @@ -22,9 +25,8 @@ use relay_event_normalization::{GeoIpLookup, RawUserAgentInfo}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::{Event, EventType, IpAddr, Metrics, NetworkReportError}; use relay_filter::FilterStatKey; -use relay_metrics::aggregator::partition_buckets; use relay_metrics::aggregator::AggregatorConfig; -use relay_metrics::{Bucket, BucketsView, MergeBuckets, MetricMeta, MetricNamespace}; +use relay_metrics::{Bucket, BucketView, BucketsView, MergeBuckets, MetricMeta, MetricNamespace}; use relay_pii::PiiConfigError; use relay_profiling::ProfileId; use relay_protocol::{Annotated, Value}; @@ -32,6 +34,7 @@ use relay_quotas::{DataCategory, Scoping}; use relay_sampling::evaluation::{MatchedRuleIds, ReservoirCounters, ReservoirEvaluator}; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; +use reqwest::header; use tokio::sync::Semaphore; #[cfg(feature = "processing")] @@ -51,9 +54,10 @@ use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::actors::project::ProjectState; use crate::actors::project_cache::{AddMetricMeta, ProjectCache}; use crate::actors::test_store::TestStore; -use crate::actors::upstream::{SendRequest, UpstreamRelay}; -use crate::envelope::{ContentType, Envelope, Item, ItemType}; +use crate::actors::upstream::{SendRequest, UpstreamRelay, UpstreamRequest, UpstreamRequestError}; +use crate::envelope::{ContentType, Envelope, Item, ItemType, SourceQuantities}; use crate::extractors::{PartialDsn, RequestMeta}; +use crate::http; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; @@ -209,7 +213,6 @@ impl From for ProcessingError { type ExtractedEvent = (Annotated, usize); impl ExtractedMetrics { - // TODO(ja): Move fn send_metrics(self, envelope: &Envelope, project_cache: Addr) { let project_key = envelope.meta().public_key(); @@ -1197,9 +1200,14 @@ impl EnvelopeProcessorService { let clock_drift_processor = ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); - match serde_json::from_slice::>>(&payload) { - Ok(batch_map) => { - for (public_key, mut buckets) in batch_map { + #[derive(serde::Deserialize)] + struct Wrapper { + buckets: HashMap>, + } + + match serde_json::from_slice(&payload) { + Ok(Wrapper { buckets }) => { + for (public_key, mut buckets) in buckets { for bucket in &mut buckets { clock_drift_processor.process_timestamp(&mut bucket.timestamp); } @@ -1314,36 +1322,9 @@ impl EnvelopeProcessorService { } } - fn encode_envelope_body( - body: Bytes, - http_encoding: HttpEncoding, - ) -> Result { - let envelope_body: Vec = match http_encoding { - HttpEncoding::Identity => return Ok(body), - HttpEncoding::Deflate => { - let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(body.as_ref())?; - encoder.finish()? - } - HttpEncoding::Gzip => { - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(body.as_ref())?; - encoder.finish()? - } - HttpEncoding::Br => { - // Use default buffer size (via 0), medium quality (5), and the default lgwin (22). - let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22); - encoder.write_all(body.as_ref())?; - encoder.into_inner() - } - }; - - Ok(envelope_body.into()) - } - fn handle_encode_envelope(&self, message: EncodeEnvelope) { let mut request = message.request; - match Self::encode_envelope_body(request.envelope_body, request.http_encoding) { + match encode_payload(&request.envelope_body, request.http_encoding) { Err(e) => { request .response_sender @@ -1485,7 +1466,6 @@ impl EnvelopeProcessorService { /// access to the central Redis instance. Cached rate limits are applied in the project cache /// already. fn encode_metrics_envelope(&self, message: EncodeMetrics) { - let partition_count = self.inner.config.metrics_partitions(); let batch_size = self.inner.config.metrics_max_batch_size_bytes(); let upstream = self.inner.config.upstream_descriptor(); @@ -1495,17 +1475,30 @@ impl EnvelopeProcessorService { project_state, } = message; + let project_key = scoping.project_key; + let dsn = PartialDsn::outbound(&scoping, upstream); let mode = match project_state.config.transaction_metrics { Some(ErrorBoundary::Ok(ref c)) if c.usage_metric() => ExtractionMode::Usage, _ => ExtractionMode::Duration, }; - let dsn = PartialDsn::outbound(&scoping, upstream); - let partitions = partition_buckets(scoping.project_key, buckets, partition_count); + let partitions = if let Some(count) = self.inner.config.metrics_partitions() { + let mut partitions: BTreeMap, Vec> = BTreeMap::new(); + for bucket in buckets { + let partition_key = partition_key(project_key, &bucket, Some(count)); + partitions.entry(partition_key).or_default().push(bucket); + } + partitions + } else { + BTreeMap::from([(None, buckets)]) + }; for (partition_key, buckets) in partitions { - let mut num_batches = 0; + if let Some(key) = partition_key { + relay_statsd::metric!(histogram(RelayHistograms::PartitionKeys) = key); + } + let mut num_batches = 0; for batch in BucketsView::new(&buckets).by_size(batch_size) { let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); @@ -1540,6 +1533,91 @@ impl EnvelopeProcessorService { } } + /// Creates a [`SendMetricsRequest`] and sends it to the upstream relay. + fn send_global_partition(&self, key: Option, partition: &mut Partition<'_>) { + if partition.is_empty() { + return; + } + + let (unencoded, quantities) = partition.take_parts(); + let http_encoding = self.inner.config.http_encoding(); + let encoded = match encode_payload(&unencoded, http_encoding) { + Ok(payload) => payload, + Err(error) => { + let error = &error as &dyn std::error::Error; + relay_log::error!(error, "failed to encode metrics payload"); + return; + } + }; + + let request = SendMetricsRequest { + partition_key: key.map(|k| k.to_string()), + unencoded, + encoded, + http_encoding, + quantities, + outcome_aggregator: self.inner.outcome_aggregator.clone(), + }; + + self.inner.upstream_relay.send(SendRequest(request)); + } + + /// Serializes metric buckets to JSON and sends them to the upstream via the global endpoint. + /// + /// This function is similar to [`Self::encode_metrics_envelope`], but sends a global batched + /// payload directly instead of per-project Envelopes. + /// + /// This function runs the following steps: + /// - partitioning + /// - batching by configured size limit + /// - serialize to JSON + /// - submit the directly to the upstream + /// + /// Cardinality limiting and rate limiting run only in processing Relays as they both require + /// access to the central Redis instance. Cached rate limits are applied in the project cache + /// already. + fn encode_metrics_global(&self, message: EncodeMetrics) { + let partition_count = self.inner.config.metrics_partitions(); + let batch_size = self.inner.config.metrics_max_batch_size_bytes(); + + let mut partitions = BTreeMap::new(); + + for (scoping, message) in &message.scopes { + let ProjectMetrics { + buckets, + project_state, + } = message; + + let mode = match project_state.config.transaction_metrics { + Some(ErrorBoundary::Ok(ref c)) if c.usage_metric() => ExtractionMode::Usage, + _ => ExtractionMode::Duration, + }; + + for bucket in buckets { + let partition_key = partition_key(scoping.project_key, bucket, partition_count); + + let mut remaining = Some(BucketView::new(bucket)); + while let Some(bucket) = remaining.take() { + let partition = partitions + .entry(partition_key) + .or_insert_with(|| Partition::new(batch_size, mode)); + + if let Some(next) = partition.insert(bucket, *scoping) { + // A part of the bucket could not be inserted. Take the partition and submit + // it immediately. Repeat until the final part was inserted. This should + // always result in a request, otherwise we would enter an endless loop. + self.send_global_partition(partition_key, partition); + remaining = Some(next); + } + } + } + } + + for (partition_key, mut partition) in partitions { + self.send_global_partition(partition_key, &mut partition); + } + } + fn handle_encode_metrics(&self, message: EncodeMetrics) { #[cfg(feature = "processing")] if self.inner.config.processing_enabled() { @@ -1548,7 +1626,11 @@ impl EnvelopeProcessorService { } } - self.encode_metrics_envelope(message) + if self.inner.config.http_global_metrics() { + self.encode_metrics_global(message) + } else { + self.encode_metrics_envelope(message) + } } fn handle_encode_metric_meta(&self, message: EncodeMetricMeta) { @@ -1655,6 +1737,198 @@ impl Service for EnvelopeProcessorService { } } +fn encode_payload(body: &Bytes, http_encoding: HttpEncoding) -> Result { + let envelope_body: Vec = match http_encoding { + HttpEncoding::Identity => return Ok(body.clone()), + HttpEncoding::Deflate => { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(body.as_ref())?; + encoder.finish()? + } + HttpEncoding::Gzip => { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(body.as_ref())?; + encoder.finish()? + } + HttpEncoding::Br => { + // Use default buffer size (via 0), medium quality (5), and the default lgwin (22). + let mut encoder = BrotliEncoder::new(Vec::new(), 0, 5, 22); + encoder.write_all(body.as_ref())?; + encoder.into_inner() + } + }; + + Ok(envelope_body.into()) +} + +/// Computes a stable partitioning key for sharded metric requests. +fn partition_key(project_key: ProjectKey, bucket: &Bucket, partitions: Option) -> Option { + use std::hash::{Hash, Hasher}; + + let partitions = partitions?.max(1); + let key = (project_key, bucket.timestamp, &bucket.name, &bucket.tags); + + let mut hasher = FnvHasher::default(); + key.hash(&mut hasher); + Some(hasher.finish() % partitions) +} + +/// A container for metric buckets from multiple projects. +/// +/// This container is used to send metrics to the upstream in global batches as part of the +/// [`EncodeMetrics`] message if the `http.global_metrics` option is enabled. The container monitors +/// the size of all metrics and allows to split them into multiple batches. See +/// [`insert`](Self::insert) for more information. +#[derive(Debug)] +struct Partition<'a> { + max_size: usize, + remaining: usize, + views: HashMap>>, + quantities: Vec<(Scoping, SourceQuantities)>, + mode: ExtractionMode, +} + +impl<'a> Partition<'a> { + /// Creates a new partition with the given maximum size in bytes. + pub fn new(size: usize, mode: ExtractionMode) -> Self { + Self { + max_size: size, + remaining: size, + views: HashMap::new(), + quantities: Vec::new(), + mode, + } + } + + /// Inserts a bucket into the partition, splitting it if necessary. + /// + /// This function attempts to add the bucket to this partition. If the bucket does not fit + /// entirely into the partition given its maximum size, the remaining part of the bucket is + /// returned from this function call. + /// + /// If this function returns `Some(_)`, the partition is full and should be submitted to the + /// upstream immediately. Use [`take_parts`](Self::take_parts) to retrieve the contents of the + /// partition. Afterwards, the caller is responsible to call this function again with the + /// remaining bucket until it is fully inserted. + pub fn insert(&mut self, bucket: BucketView<'a>, scoping: Scoping) -> Option> { + let (current, next) = bucket.split(self.remaining, Some(self.max_size)); + + if let Some(current) = current { + self.remaining = self.remaining.saturating_sub(current.estimated_size()); + let quantities = utils::extract_metric_quantities([current.clone()], self.mode); + self.quantities.push((scoping, quantities)); + self.views + .entry(scoping.project_key) + .or_default() + .push(current); + } + + next + } + + /// Returns `true` if the partition does not hold any data. + fn is_empty(&self) -> bool { + self.views.is_empty() + } + + /// Returns the serialized buckets and the source quantities for this partition. + /// + /// This empties the partition, so that it can be reused. + fn take_parts(&mut self) -> (Bytes, Vec<(Scoping, SourceQuantities)>) { + #[derive(serde::Serialize)] + struct Wrapper<'a> { + buckets: &'a HashMap>>, + } + + let buckets = &self.views; + let payload = serde_json::to_vec(&Wrapper { buckets }).unwrap().into(); + let quantities = self.quantities.clone(); + + self.views.clear(); + self.quantities.clear(); + self.remaining = self.max_size; + + (payload, quantities) + } +} + +/// An upstream request that submits metric buckets via HTTP. +/// +/// This request is not awaited. It automatically tracks outcomes if the request is not received. +#[derive(Debug)] +struct SendMetricsRequest { + /// If the partition key is set, the request is marked with `X-Sentry-Relay-Shard`. + partition_key: Option, + /// Serialized metric buckets without encoding applied, used for signing. + unencoded: Bytes, + /// Serialized metric buckets with the stated HTTP encoding applied. + encoded: Bytes, + /// Encoding (compression) of the payload. + http_encoding: HttpEncoding, + /// Information about the metric quantities in the payload for outcomes. + quantities: Vec<(Scoping, SourceQuantities)>, + /// Address of the outcome aggregator to send outcomes to on error. + outcome_aggregator: Addr, +} + +impl UpstreamRequest for SendMetricsRequest { + fn set_relay_id(&self) -> bool { + true + } + + fn sign(&mut self) -> Option { + Some(self.unencoded.clone()) + } + + fn method(&self) -> reqwest::Method { + reqwest::Method::POST + } + + fn path(&self) -> std::borrow::Cow<'_, str> { + "/api/0/relays/metrics/".into() + } + + fn route(&self) -> &'static str { + "global_metrics" + } + + fn build(&mut self, builder: &mut http::RequestBuilder) -> Result<(), http::HttpError> { + builder + .content_encoding(self.http_encoding) + .header_opt("X-Sentry-Relay-Shard", self.partition_key.as_ref()) + .header(header::CONTENT_TYPE, b"application/json") + .body(self.encoded.clone()); + + Ok(()) + } + + fn respond( + self: Box, + result: Result, + ) -> Pin + Send + Sync>> { + Box::pin(async { + match result { + Ok(mut response) => { + response.consume().await.ok(); + } + // Request did not arrive, we are responsible for outcomes. + Err(error) if !error.is_received() => { + for (scoping, quantities) in self.quantities { + utils::reject_metrics( + &self.outcome_aggregator, + quantities, + scoping, + Outcome::Invalid(DiscardReason::Internal), + ); + } + } + // Upstream is responsible to log outcomes. + Err(_received) => (), + } + }) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index 43dbe0c0d4..d671d5cd37 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -299,12 +299,16 @@ pub trait UpstreamRequest: Send + Sync + fmt::Debug { /// Add the `X-Sentry-Relay-Signature` header to the outgoing request. /// + /// When no signature should be added, this method should return `None`. Otherwise, this method + /// should return the payload to sign. For requests with content encoding, this must be the + /// **uncompressed** payload. + /// /// This requires configuration of the Relay's credentials. If the credentials are not /// configured, the request will fail with [`UpstreamRequestError::NoCredentials`]. /// - /// Defaults to `false`. - fn sign(&self) -> bool { - false + /// Defaults to `None`. + fn sign(&mut self) -> Option { + None } /// Returns the name of the logical route. @@ -420,6 +424,16 @@ where sender, } } + + /// Memoize the serialized body for retries and signing. + fn body(&mut self) -> Result { + let body = match self.body { + Some(ref body) => body, + None => self.body.insert(serde_json::to_vec(&self.query)?.into()), + }; + + Ok(body.clone()) + } } impl UpstreamRequest for UpstreamQueryRequest @@ -442,8 +456,11 @@ where true } - fn sign(&self) -> bool { - true + fn sign(&mut self) -> Option { + // Computing the body is practically infallible since we're serializing standard structures + // into a string. Even if it fails, `sign` is called after `build` and the error will be + // reported there. + self.body().ok() } fn method(&self) -> Method { @@ -465,11 +482,7 @@ where } fn build(&mut self, builder: &mut RequestBuilder) -> Result<(), HttpError> { - // Memoize the serialized body for retries. - let body = match self.body { - Some(ref body) => body, - None => self.body.insert(serde_json::to_vec(&self.query)?.into()), - }; + let body = self.body()?; relay_statsd::metric!( histogram(RelayHistograms::UpstreamQueryBodySize) = body.len() as u64 @@ -761,7 +774,7 @@ impl SharedClient { let mut builder = RequestBuilder::reqwest(self.reqwest.request(request.method(), url)); builder.header("Host", host_header.as_bytes()); - if request.set_relay_id() || request.sign() { + if request.set_relay_id() { if let Some(credentials) = self.config.credentials() { builder.header("X-Sentry-Relay-Id", credentials.id.to_string()); } @@ -769,14 +782,13 @@ impl SharedClient { request.build(&mut builder)?; - if request.sign() { + if let Some(payload) = request.sign() { let credentials = self .config .credentials() .ok_or(UpstreamRequestError::NoCredentials)?; - let body = builder.get_body().unwrap_or_default(); - let signature = credentials.secret_key.sign(body); + let signature = credentials.secret_key.sign(&payload); builder.header("X-Sentry-Relay-Signature", signature.as_bytes()); } diff --git a/relay-server/src/endpoints/batch_metrics.rs b/relay-server/src/endpoints/batch_metrics.rs index c91580379c..a05f9bb992 100644 --- a/relay-server/src/endpoints/batch_metrics.rs +++ b/relay-server/src/endpoints/batch_metrics.rs @@ -1,6 +1,5 @@ use axum::http::StatusCode; use axum::response::IntoResponse; -use relay_config::EmitOutcomes; use serde::{Deserialize, Serialize}; use crate::actors::processor::ProcessBatchedMetrics; @@ -15,7 +14,7 @@ pub async fn handle( start_time: StartTime, body: SignedBytes, ) -> impl IntoResponse { - if !body.relay.internal || state.config().emit_outcomes() != EmitOutcomes::AsOutcomes { + if !body.relay.internal { return StatusCode::FORBIDDEN.into_response(); } diff --git a/relay-server/src/http.rs b/relay-server/src/http.rs index 5124e11291..98ef6eb533 100644 --- a/relay-server/src/http.rs +++ b/relay-server/src/http.rs @@ -46,28 +46,24 @@ pub struct Request(pub reqwest::Request); pub struct RequestBuilder { builder: Option, - body: Option, } impl RequestBuilder { pub fn reqwest(builder: reqwest::RequestBuilder) -> Self { RequestBuilder { builder: Some(builder), - body: None, } } pub fn finish(self) -> Result { - let mut builder = self.builder.unwrap(); - if let Some(body) = self.body { - builder = builder.body(body); - } - Ok(Request(builder.build()?)) + // The builder is not optional, instead the option is used inside `build` so that it can be + // moved temporarily. Therefore, the `unwrap` here is infallible. + Ok(Request(self.builder.unwrap().build()?)) } fn build(&mut self, f: F) -> &mut Self where - F: FnMut(reqwest::RequestBuilder) -> reqwest::RequestBuilder, + F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder, { self.builder = self.builder.take().map(f); self @@ -98,12 +94,7 @@ impl RequestBuilder { } pub fn body(&mut self, body: Bytes) -> &mut Self { - self.body = Some(body); - self - } - - pub fn get_body(&self) -> Option<&[u8]> { - self.body.as_deref() + self.build(|builder| builder.body(body)) } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 391ea47acc..3bb529dacb 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -158,6 +158,12 @@ pub enum RelayHistograms { /// Size of queries (projectconfig queries, i.e. the request payload, not the response) sent by /// Relay over HTTP in bytes. UpstreamEnvelopeBodySize, + + /// Distribution of flush buckets over partition keys. + /// + /// The distribution of buckets should be even. + /// If it is not, this metric should expose it. + PartitionKeys, } impl HistogramMetric for RelayHistograms { @@ -188,6 +194,7 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::UpstreamRetries => "upstream.retries", RelayHistograms::UpstreamQueryBodySize => "upstream.query.body_size", RelayHistograms::UpstreamEnvelopeBodySize => "upstream.envelope.body_size", + RelayHistograms::PartitionKeys => "metrics.buckets.partition_keys", } } } diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 59d7decadf..7e56837276 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -1,8 +1,9 @@ +import datetime import gzip +import json import os import re import uuid -import datetime from copy import deepcopy from queue import Queue @@ -37,6 +38,7 @@ def __init__(self, server_address, app): self.project_configs = {} self.captured_events = Queue() self.captured_outcomes = Queue() + self.captured_metrics = Queue() self.test_failures = [] self.hits = {} self.known_relays = {} @@ -387,6 +389,24 @@ def outcomes(): sentry.captured_outcomes.put(outcomes_batch) return jsonify({}) + @app.route("/api/0/relays/metrics/", methods=["POST"]) + def global_metrics(): + """ + Mock endpoint for global batched metrics. SENTRY DOES NOT IMPLEMENT THIS ENDPOINT! This is + just used to verify Relay's batching behavior. + """ + relay_id = flask_request.headers["x-sentry-relay-id"] + if relay_id not in authenticated_relays: + abort(403, "relay not registered") + + encoding = flask_request.headers.get("Content-Encoding", "") + assert encoding == "gzip", "Relay should always compress store requests" + data = gzip.decompress(flask_request.data) + + metrics_batch = json.loads(data)["buckets"] + sentry.captured_metrics.put(metrics_batch) + return jsonify({}) + @app.errorhandler(500) def fail(e): sentry.test_failures.append((flask_request.url, e)) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 308c1586ae..0240ef646d 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -199,6 +199,96 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve mini_sentry.captured_events.get(timeout=1) +def test_global_metrics(mini_sentry, relay): + relay = relay( + mini_sentry, options={"http": {"global_metrics": True}, **TEST_CONFIG} + ) + + project_id = 42 + config = mini_sentry.add_basic_project_config(project_id) + public_key = config["publicKeys"][0]["publicKey"] + + timestamp = int(datetime.now(tz=timezone.utc).timestamp()) + metrics_payload = f"transactions/foo:42|c\ntransactions/bar:17|c|T{timestamp}" + relay.send_metrics(project_id, metrics_payload) + + metrics_batch = mini_sentry.captured_metrics.get(timeout=5) + assert mini_sentry.captured_metrics.qsize() == 0 # we had only one batch + + metrics = sorted(metrics_batch[public_key], key=lambda x: x["name"]) + + assert metrics == [ + { + "timestamp": timestamp, + "width": 1, + "name": "c:transactions/bar@none", + "value": 17.0, + "type": "c", + }, + { + "timestamp": timestamp, + "width": 1, + "name": "c:transactions/foo@none", + "value": 42.0, + "type": "c", + }, + ] + + +def test_global_metrics_batching(mini_sentry, relay): + # See `test_metrics_max_batch_size`: 200 should lead to 2 batches + MAX_FLUSH_SIZE = 200 + + relay = relay( + mini_sentry, + options={ + "http": {"global_metrics": True}, + "limits": {"max_concurrent_requests": 1}, # deterministic submission order + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + "debounce_delay": 0, + "max_flush_bytes": MAX_FLUSH_SIZE, + }, + }, + ) + + project_id = 42 + config = mini_sentry.add_basic_project_config(project_id) + public_key = config["publicKeys"][0]["publicKey"] + + timestamp = int(datetime.now(tz=timezone.utc).timestamp()) + metrics_payload = ( + f"transactions/foo:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17|d|T{timestamp}" + ) + relay.send_metrics(project_id, metrics_payload) + + batch1 = mini_sentry.captured_metrics.get(timeout=5) + batch2 = mini_sentry.captured_metrics.get(timeout=1) + with pytest.raises(queue.Empty): + mini_sentry.captured_metrics.get(timeout=1) + + assert batch1[public_key] == [ + { + "timestamp": timestamp, + "width": 1, + "name": "d:transactions/foo@none", + "value": [float(i) for i in range(1, 16)], + "type": "d", + } + ] + + assert batch2[public_key] == [ + { + "timestamp": timestamp, + "width": 1, + "name": "d:transactions/foo@none", + "value": [16.0, 17.0], + "type": "d", + } + ] + + def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_consumer): relay = relay_with_processing(options=TEST_CONFIG) metrics_consumer = metrics_consumer() @@ -240,6 +330,55 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con } +def test_global_metrics_with_processing( + mini_sentry, relay, relay_with_processing, metrics_consumer +): + # Set up a relay chain where the outer relay has global metrics enabled + # and forwards to a processing Relay. + processing_relay = relay_with_processing(options=TEST_CONFIG) + relay = relay( + processing_relay, options={"http": {"global_metrics": True}, **TEST_CONFIG} + ) + + metrics_consumer = metrics_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = ["organizations:custom-metrics"] + + timestamp = int(datetime.now(tz=timezone.utc).timestamp()) + metrics_payload = f"transactions/foo:42|c\nbar@second:17|c|T{timestamp}" + relay.send_metrics(project_id, metrics_payload) + + metrics = metrics_by_name(metrics_consumer, 2) + + assert metrics["headers"]["c:transactions/foo@none"] == [ + ("namespace", b"transactions") + ] + assert metrics["c:transactions/foo@none"] == { + "org_id": 1, + "project_id": project_id, + "retention_days": 90, + "name": "c:transactions/foo@none", + "tags": {}, + "value": 42.0, + "type": "c", + "timestamp": timestamp, + } + + assert metrics["headers"]["c:custom/bar@second"] == [("namespace", b"custom")] + assert metrics["c:custom/bar@second"] == { + "org_id": 1, + "project_id": project_id, + "retention_days": 90, + "name": "c:custom/bar@second", + "tags": {}, + "value": 17.0, + "type": "c", + "timestamp": timestamp, + } + + def test_metrics_with_sharded_kafka( get_topic_name, mini_sentry,