From 6446b9381918d48602ef60afcd35c8d580ae48ec Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 17 Nov 2023 16:16:55 +0100 Subject: [PATCH 01/10] split and partition buckets in envelope processor --- CHANGELOG.md | 1 + relay-config/src/config.rs | 47 + relay-metrics/src/aggregator.rs | 110 +- relay-metrics/src/aggregatorservice.rs | 444 +------ relay-metrics/src/lib.rs | 2 + ...tests__buckets_view_serialize_partial.snap | 85 ++ relay-metrics/src/statsd.rs | 18 - relay-metrics/src/view.rs | 1061 +++++++++++++++++ relay-server/src/actors/envelopes.rs | 65 +- relay-server/src/actors/processor.rs | 89 +- relay-server/src/actors/project.rs | 13 +- relay-server/src/actors/project_cache.rs | 3 +- relay-server/src/service.rs | 1 - relay-server/src/statsd.rs | 8 + relay-server/src/utils/managed_envelope.rs | 11 + tests/integration/test_dynamic_sampling.py | 2 + tests/integration/test_metrics.py | 43 +- 17 files changed, 1437 insertions(+), 566 deletions(-) create mode 100644 relay-metrics/src/snapshots/relay_metrics__view__tests__buckets_view_serialize_partial.snap create mode 100644 relay-metrics/src/view.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bd7dab2b4..060f4a00e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ - Scrub all DB Core Data spans differently. ([#2686](https://github.com/getsentry/relay/pull/2686)) - Support generic metrics extraction version 2. ([#2692](https://github.com/getsentry/relay/pull/2692)) - Emit error on continued project config fetch failures after a time interval. ([#2700](https://github.com/getsentry/relay/pull/2700)) +- Partition and split metric buckets just before sending. ([#2682](https://github.com/getsentry/relay/pull/2682)) ## 23.10.1 diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index e9dea1718c..01bb427243 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -924,6 +924,10 @@ fn default_max_rate_limit() -> Option { Some(300) // 5 minutes } +fn default_metrics_max_batch_size() -> ByteSize { + ByteSize::mebibytes(5) +} + /// Controls Sentry-internal event processing. #[derive(Serialize, Deserialize, Debug)] pub struct Processing { @@ -979,6 +983,26 @@ pub struct Processing { /// Maximum rate limit to report to clients. #[serde(default = "default_max_rate_limit")] pub max_rate_limit: Option, + /// The number of logical partitions that can receive flushed buckets. + /// + /// If set, buckets are partitioned by (bucket key % partitions), and routed + /// by setting the header `X-Sentry-Relay-Shard`. + #[serde(default)] + pub metrics_partitions: Option, + /// The approximate maximum number of bytes submitted in one metrics batch. + /// + /// This controls how big flushed batches of buckets get, depending on the number of buckets, + /// the cumulative length of their keys, and the number of raw values. Since final serialization + /// adds some additional overhead, this number is approxmate and some safety margin should be + /// left to hard limits. + #[serde(default = "default_metrics_max_batch_size")] + pub metrics_max_batch_size: ByteSize, + /// The approximate maximum number of bytes submitted in one metrics batch for processing + /// relays. + /// + /// Overrides [`Self::metrics_max_batch_size`] when specified on processing relays. + #[serde(default)] + pub metrics_max_batch_size_processing: Option, } impl Default for Processing { @@ -997,6 +1021,9 @@ impl Default for Processing { attachment_chunk_size: default_chunk_size(), projectconfig_cache_prefix: default_projectconfig_cache_prefix(), max_rate_limit: default_max_rate_limit(), + metrics_partitions: None, + metrics_max_batch_size: default_metrics_max_batch_size(), + metrics_max_batch_size_processing: None, } } } @@ -2039,6 +2066,26 @@ impl Config { self.values.processing.attachment_chunk_size.as_bytes() } + /// Amount of metric partitions. + pub fn metrics_partitions(&self) -> Option { + self.values.processing.metrics_partitions + } + + /// Maximum metrics batch size in bytes. + pub fn metrics_max_batch_size_bytes(&self) -> usize { + self.values.processing.metrics_max_batch_size.as_bytes() + } + + /// Maximum metrics batch size in bytes for processing relays. + pub fn metrics_max_batch_size_bytes_processing(&self) -> usize { + self.values + .processing + .metrics_max_batch_size_processing + .as_ref() + .map(|s| s.as_bytes()) + .unwrap_or_else(|| self.metrics_max_batch_size_bytes()) + } + /// Default prefix to use when looking up project configs in Redis. This is only done when /// Relay is in processing mode. pub fn projectconfig_cache_prefix(&self) -> &str { diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index ca78d665fd..1d11b7b016 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -70,9 +70,13 @@ impl BucketKey { // Create a 64-bit hash of the bucket key using FnvHasher. // This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { - let mut hasher = FnvHasher::default(); - std::hash::Hash::hash(self, &mut hasher); - hasher.finish() + BucketKeyRef { + project_key: self.project_key, + timestamp: self.timestamp, + metric_name: &self.metric_name, + tags: &self.tags, + } + .hash64() } /// Estimates the number of bytes needed to encode the bucket key. @@ -92,6 +96,28 @@ impl BucketKey { } } +/// Pendant to [`BucketKey`] for referenced data, not owned data. +/// +/// This makes it possible to compute a hash for a [`Bucket`] +/// without destructing the bucket into a [`BucketKey`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +struct BucketKeyRef<'a> { + project_key: ProjectKey, + timestamp: UnixTimestamp, + metric_name: &'a str, + tags: &'a BTreeMap, +} + +impl<'a> BucketKeyRef<'a> { + // Create a 64-bit hash of the bucket key using FnvHasher. + // This is used for partition key computation and statsd logging. + fn hash64(&self) -> u64 { + let mut hasher = FnvHasher::default(); + std::hash::Hash::hash(self, &mut hasher); + hasher.finish() + } +} + /// Estimates the number of bytes needed to encode the tags. /// /// Note that this does not necessarily match the exact memory footprint of the tags, @@ -348,14 +374,6 @@ impl Ord for QueuedBucket { } } -/// A Bucket and its hashed key. -/// This is cheaper to pass around than a (BucketKey, Bucket) pair. -pub struct HashedBucket { - // This is only public because pop_flush_buckets is used in benchmark. - hashed_key: u64, - bucket: Bucket, -} - #[derive(Default)] struct CostTracker { total_cost: usize, @@ -513,7 +531,7 @@ impl Aggregator { /// Pop and return the buckets that are eligible for flushing out according to bucket interval. /// /// Note that this function is primarily intended for tests. - pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap> { + pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap> { relay_statsd::metric!( gauge(MetricGauges::Buckets) = self.bucket_count() as u64, aggregator = &self.name, @@ -526,7 +544,7 @@ impl Aggregator { aggregator = &self.name, ); - let mut buckets = HashMap::>::new(); + let mut buckets = HashMap::new(); let mut stats = HashMap::new(); relay_statsd::metric!( @@ -558,11 +576,8 @@ impl Aggregator { buckets .entry(key.project_key) - .or_default() - .push(HashedBucket { - hashed_key: key.hash64(), - bucket, - }); + .or_insert_with(Vec::new) + .push(bucket); false } else { @@ -845,35 +860,6 @@ impl Aggregator { } } - /// Split buckets into N logical partitions, determined by the bucket key. - pub fn partition_buckets( - &self, - buckets: Vec, - flush_partitions: Option, - ) -> BTreeMap, Vec> { - let flush_partitions = match flush_partitions { - None => { - return BTreeMap::from([(None, buckets.into_iter().map(|x| x.bucket).collect())]); - } - Some(x) => x.max(1), // handle 0, - }; - let mut partitions = BTreeMap::<_, Vec>::new(); - for bucket in buckets { - let partition_key = bucket.hashed_key % flush_partitions; - partitions - .entry(Some(partition_key)) - .or_default() - .push(bucket.bucket); - - // Log the distribution of buckets over partition key - relay_statsd::metric!( - histogram(MetricHistograms::PartitionKeys) = partition_key as f64, - aggregator = &self.name, - ); - } - partitions - } - /// Create a new aggregator. pub fn new(config: AggregatorConfig) -> Self { Self::named("default".to_owned(), config) @@ -900,6 +886,36 @@ impl fmt::Debug for Aggregator { } } +/// Split buckets into N logical partitions, determined by the bucket key. +pub fn partition_buckets( + project_key: ProjectKey, + buckets: Vec, + flush_partitions: Option, +) -> BTreeMap, Vec> { + let flush_partitions = match flush_partitions { + None => return BTreeMap::from([(None, buckets)]), + Some(x) => x.max(1), // handle 0, + }; + let mut partitions = BTreeMap::<_, Vec>::new(); + for bucket in buckets { + let key = BucketKeyRef { + project_key, + timestamp: bucket.timestamp, + metric_name: &bucket.name, + tags: &bucket.tags, + }; + + let partition_key = key.hash64() % flush_partitions; + partitions + .entry(Some(partition_key)) + .or_default() + .push(bucket); + + relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key); + } + partitions +} + #[cfg(test)] mod tests { diff --git a/relay-metrics/src/aggregatorservice.rs b/relay-metrics/src/aggregatorservice.rs index 5587c1c7a4..419fad296d 100644 --- a/relay-metrics/src/aggregatorservice.rs +++ b/relay-metrics/src/aggregatorservice.rs @@ -1,4 +1,3 @@ -use std::iter::FusedIterator; use std::time::Duration; use relay_base_schema::project::ProjectKey; @@ -11,31 +10,14 @@ use serde::{Deserialize, Serialize}; use crate::aggregator::{self, AggregatorConfig, ShiftKey}; use crate::bucket::Bucket; use crate::statsd::{MetricCounters, MetricHistograms}; -use crate::{BucketValue, DistributionValue}; /// Interval for the flush cycle of the [`AggregatorService`]. const FLUSH_INTERVAL: Duration = Duration::from_millis(100); -/// The fraction of [`AggregatorServiceConfig::max_flush_bytes`] at which buckets will be split. A value of -/// `2` means that all buckets smaller than half of max_flush_bytes will be moved in their entirety, -/// and buckets larger will be split up. -const BUCKET_SPLIT_FACTOR: usize = 32; - -/// The average size of values when serialized. -const AVG_VALUE_SIZE: usize = 8; - /// Parameters used by the [`AggregatorService`]. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] pub struct AggregatorServiceConfig { - /// The approximate maximum number of bytes submitted within one flush cycle. - /// - /// This controls how big flushed batches of buckets get, depending on the number of buckets, - /// the cumulative length of their keys, and the number of raw values. Since final serialization - /// adds some additional overhead, this number is approxmate and some safety margin should be - /// left to hard limits. - pub max_flush_bytes: usize, - /// Maximum amount of bytes used for metrics aggregation. /// /// When aggregating metrics, Relay keeps track of how many bytes a metric takes in memory. @@ -45,12 +27,6 @@ pub struct AggregatorServiceConfig { /// Defaults to `None`, i.e. no limit. pub max_total_bucket_bytes: Option, - /// The number of logical partitions that can receive flushed buckets. - /// - /// If set, buckets are partitioned by (bucket key % flush_partitions), and routed - /// by setting the header `X-Sentry-Relay-Shard`. - pub flush_partitions: Option, - /// Determines the wall clock time interval for buckets in seconds. /// /// Defaults to `10` seconds. Every metric is sorted into a bucket of this size based on its @@ -119,8 +95,6 @@ pub struct AggregatorServiceConfig { impl Default for AggregatorServiceConfig { fn default() -> Self { Self { - max_flush_bytes: 5_000_000, // 5 MB - flush_partitions: None, max_total_bucket_bytes: None, bucket_interval: 10, initial_delay: 30, @@ -218,8 +192,6 @@ pub struct BucketCountInquiry; pub struct FlushBuckets { /// The project key. pub project_key: ProjectKey, - /// The logical partition to send this batch to. - pub partition_key: Option, /// The buckets to be flushed. pub buckets: Vec, } @@ -234,9 +206,7 @@ pub struct AggregatorService { aggregator: aggregator::Aggregator, state: AggregatorState, receiver: Option>, - max_flush_bytes: usize, max_total_bucket_bytes: Option, - flush_partitions: Option, } impl AggregatorService { @@ -260,9 +230,7 @@ impl AggregatorService { Self { receiver, state: AggregatorState::Running, - max_flush_bytes: config.max_flush_bytes, max_total_bucket_bytes: config.max_total_bucket_bytes, - flush_partitions: config.flush_partitions, aggregator: aggregator::Aggregator::named(name, AggregatorConfig::from(&config)), } } @@ -275,30 +243,6 @@ impl AggregatorService { sender.send(result); } - /// Split the provided buckets into batches and process each batch with the given function. - /// - /// For each batch, log a histogram metric. - fn process_batches(&self, buckets: impl IntoIterator, mut process: F) - where - F: FnMut(Vec), - { - let capped_batches = CappedBucketIter::new(buckets.into_iter(), self.max_flush_bytes); - let num_batches = capped_batches - .map(|batch| { - relay_statsd::metric!( - histogram(MetricHistograms::BucketsPerBatch) = batch.len() as f64, - aggregator = self.aggregator.name(), - ); - process(batch); - }) - .count(); - - relay_statsd::metric!( - histogram(MetricHistograms::BatchesPerPartition) = num_batches as f64, - aggregator = self.aggregator.name(), - ); - } - /// Sends the [`FlushBuckets`] message to the receiver in the fire and forget fashion. It is up /// to the receiver to send the [`MergeBuckets`] message back if buckets could not be flushed /// and we require another re-try. @@ -317,26 +261,18 @@ impl AggregatorService { relay_log::trace!("flushing {} projects to receiver", flush_buckets.len()); let mut total_bucket_count = 0u64; - for (project_key, project_buckets) in flush_buckets.into_iter() { - let bucket_count = project_buckets.len() as u64; + for (project_key, buckets) in flush_buckets.into_iter() { + let bucket_count = buckets.len() as u64; relay_statsd::metric!( histogram(MetricHistograms::BucketsFlushedPerProject) = bucket_count, aggregator = self.aggregator.name(), ); total_bucket_count += bucket_count; - let partitioned_buckets = self - .aggregator - .partition_buckets(project_buckets, self.flush_partitions); - for (partition_key, buckets) in partitioned_buckets { - self.process_batches(buckets, |batch| { - if let Some(ref receiver) = self.receiver { - receiver.send(FlushBuckets { - project_key, - partition_key, - buckets: batch, - }); - } + if let Some(ref receiver) = self.receiver { + receiver.send(FlushBuckets { + project_key, + buckets, }); } } @@ -414,153 +350,6 @@ impl Drop for AggregatorService { } } -/// An iterator returning batches of buckets fitting into a size budget. -/// -/// The size budget is given through `max_flush_bytes`, though this is an approximate number. On -/// every iteration, this iterator returns a `Vec` which serializes into a buffer of the -/// specified size. Buckets at the end of each batch may be split to fit into the batch. -/// -/// Since this uses an approximate function to estimate the size of buckets, the actual serialized -/// payload may exceed the size. The estimation function is built in a way to guarantee the same -/// order of magnitude. -struct CappedBucketIter> { - buckets: T, - next_bucket: Option, - max_flush_bytes: usize, -} - -impl> CappedBucketIter { - /// Creates a new `CappedBucketIter`. - pub fn new(mut buckets: T, max_flush_bytes: usize) -> Self { - let next_bucket = buckets.next(); - - Self { - buckets, - next_bucket, - max_flush_bytes, - } - } -} - -impl> Iterator for CappedBucketIter { - type Item = Vec; - - fn next(&mut self) -> Option { - let mut current_batch = Vec::new(); - let mut remaining_bytes = self.max_flush_bytes; - - while let Some(bucket) = self.next_bucket.take() { - let bucket_size = estimate_size(&bucket); - if bucket_size <= remaining_bytes { - // the bucket fits - remaining_bytes -= bucket_size; - current_batch.push(bucket); - self.next_bucket = self.buckets.next(); - } else if bucket_size < self.max_flush_bytes / BUCKET_SPLIT_FACTOR { - // the bucket is too small to split, move it entirely - self.next_bucket = Some(bucket); - break; - } else { - // the bucket is big enough to split - let (left, right) = split_at(bucket, remaining_bytes); - if let Some(left) = left { - current_batch.push(left); - } - - self.next_bucket = right; - break; - } - } - - if current_batch.is_empty() { - // There is still leftover data not returned by the iterator after it has ended. - if self.next_bucket.take().is_some() { - relay_log::error!("CappedBucketIter swallowed bucket"); - } - None - } else { - Some(current_batch) - } - } -} - -impl> FusedIterator for CappedBucketIter {} - -/// Splits this bucket if its estimated serialization size exceeds a threshold. -/// -/// There are three possible return values: -/// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split. -/// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no -/// split, the entire bucket is moved. -/// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket -/// with all other information cloned. -/// -/// This is an approximate function. The bucket is not actually serialized, but rather its -/// footprint is estimated through the number of data points contained. See -/// `estimate_size` for more information. -fn split_at(mut bucket: Bucket, size: usize) -> (Option, Option) { - // If there's enough space for the entire bucket, do not perform a split. - if size >= estimate_size(&bucket) { - return (Some(bucket), None); - } - - // If the bucket key can't even fit into the remaining length, move the entire bucket into - // the right-hand side. - let own_size = estimate_base_size(&bucket); - if size < (own_size + AVG_VALUE_SIZE) { - // split_at must not be zero - return (None, Some(bucket)); - } - - // Perform a split with the remaining space after adding the key. We assume an average - // length of 8 bytes per value and compute the number of items fitting into the left side. - let split_at = (size - own_size) / AVG_VALUE_SIZE; - - match bucket.value { - BucketValue::Counter(_) => (None, Some(bucket)), - BucketValue::Distribution(ref mut distribution) => { - let mut org = std::mem::take(distribution); - - let mut new_bucket = bucket.clone(); - new_bucket.value = - BucketValue::Distribution(DistributionValue::from_slice(&org[split_at..])); - - org.truncate(split_at); - bucket.value = BucketValue::Distribution(org); - - (Some(bucket), Some(new_bucket)) - } - BucketValue::Set(ref mut set) => { - let org = std::mem::take(set); - let mut new_bucket = bucket.clone(); - - let mut iter = org.into_iter(); - bucket.value = BucketValue::Set((&mut iter).take(split_at).collect()); - new_bucket.value = BucketValue::Set(iter.collect()); - - (Some(bucket), Some(new_bucket)) - } - BucketValue::Gauge(_) => (None, Some(bucket)), - } -} - -/// Estimates the number of bytes needed to serialize the bucket without value. -/// -/// Note that this does not match the exact size of the serialized payload. Instead, the size is -/// approximated through tags and a static overhead. -fn estimate_base_size(bucket: &Bucket) -> usize { - 50 + bucket.name.len() + aggregator::tags_cost(&bucket.tags) -} - -/// Estimates the number of bytes needed to serialize the bucket. -/// -/// Note that this does not match the exact size of the serialized payload. Instead, the size is -/// approximated through the number of contained values, assuming an average size of serialized -/// values. -fn estimate_size(bucket: &Bucket) -> usize { - estimate_base_size(bucket) + bucket.value.len() * AVG_VALUE_SIZE -} - /// A message containing a list of [`Bucket`]s to be inserted into the aggregator. #[derive(Debug)] pub struct MergeBuckets { @@ -592,7 +381,7 @@ impl MergeBuckets { #[cfg(test)] mod tests { - use std::collections::{BTreeMap, BTreeSet}; + use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; use relay_common::time::UnixTimestamp; @@ -732,223 +521,4 @@ mod tests { assert_eq!(bucket_count, 1); assert_eq!(receiver.bucket_count(), 0); } - - fn test_config() -> AggregatorServiceConfig { - AggregatorServiceConfig { - bucket_interval: 1, - initial_delay: 0, - debounce_delay: 0, - max_secs_in_past: 50 * 365 * 24 * 60 * 60, - max_secs_in_future: 50 * 365 * 24 * 60 * 60, - max_name_length: 200, - max_tag_key_length: 200, - max_tag_value_length: 200, - max_project_key_bucket_bytes: None, - max_total_bucket_bytes: None, - max_flush_bytes: 50_000_000, - ..Default::default() - } - } - - #[must_use] - pub fn run_test_bucket_partitioning(flush_partitions: Option) -> Vec { - let config = AggregatorServiceConfig { - max_flush_bytes: 1000, - flush_partitions, - ..test_config() - }; - - let bucket1 = Bucket { - timestamp: UnixTimestamp::from_secs(999994711), - width: 0, - name: "c:transactions/foo".to_owned(), - value: BucketValue::counter(42.), - tags: BTreeMap::new(), - }; - - let bucket2 = Bucket { - timestamp: UnixTimestamp::from_secs(999994711), - width: 0, - name: "c:transactions/bar".to_owned(), - value: BucketValue::counter(43.), - tags: BTreeMap::new(), - }; - - let mut aggregator = AggregatorService::new(config.clone(), None); - let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); - let captures = relay_statsd::with_capturing_test_client(|| { - aggregator - .aggregator - .merge(project_key, bucket1, config.max_total_bucket_bytes) - .ok(); - aggregator - .aggregator - .merge(project_key, bucket2, config.max_total_bucket_bytes) - .ok(); - aggregator.try_flush(); - }); - captures - .into_iter() - .filter(|x| { - [ - "metrics.buckets.batches_per_partition", - "metrics.buckets.per_batch", - "metrics.buckets.partition_keys", - ] - .contains(&x.split_once(':').unwrap().0) - }) - .collect::>() - } - - #[test] - fn test_bucket_partitioning_dummy() { - let output = run_test_bucket_partitioning(None); - insta::assert_debug_snapshot!(output, @r#" - [ - "metrics.buckets.per_batch:2|h|#aggregator:default", - "metrics.buckets.batches_per_partition:1|h|#aggregator:default", - ] - "#); - } - - #[test] - fn test_bucket_partitioning_128() { - let output = run_test_bucket_partitioning(Some(128)); - // Because buckets are stored in a HashMap, we do not know in what order the buckets will - // be processed, so we need to convert them to a set: - let (partition_keys, tail) = output.split_at(2); - insta::assert_debug_snapshot!(BTreeSet::from_iter(partition_keys), @r#" - { - "metrics.buckets.partition_keys:59|h|#aggregator:default", - "metrics.buckets.partition_keys:62|h|#aggregator:default", - } - "#); - - insta::assert_debug_snapshot!(tail, @r#" - [ - "metrics.buckets.per_batch:1|h|#aggregator:default", - "metrics.buckets.batches_per_partition:1|h|#aggregator:default", - "metrics.buckets.per_batch:1|h|#aggregator:default", - "metrics.buckets.batches_per_partition:1|h|#aggregator:default", - ] - "#); - } - - #[test] - fn test_capped_iter_empty() { - let buckets = vec![]; - - let mut iter = CappedBucketIter::new(buckets.into_iter(), 200); - assert!(iter.next().is_none()); - } - - #[test] - fn test_capped_iter_single() { - let json = r#"[ - { - "name": "endpoint.response_time", - "unit": "millisecond", - "value": [36, 49, 57, 68], - "type": "d", - "timestamp": 1615889440, - "width": 10, - "tags": { - "route": "user_index" - } - } - ]"#; - - let buckets = serde_json::from_str::>(json).unwrap(); - - let mut iter = CappedBucketIter::new(buckets.into_iter(), 200); - let batch = iter.next().unwrap(); - assert_eq!(batch.len(), 1); - - assert!(iter.next().is_none()); - } - - #[test] - fn test_capped_iter_split() { - let json = r#"[ - { - "name": "endpoint.response_time", - "unit": "millisecond", - "value": [1, 1, 1, 1], - "type": "d", - "timestamp": 1615889440, - "width": 10, - "tags": { - "route": "user_index" - } - } - ]"#; - - let buckets = serde_json::from_str::>(json).unwrap(); - - // 58 is a magic number obtained by experimentation that happens to split this bucket - let mut iter = CappedBucketIter::new(buckets.into_iter(), 108); - let batch1 = iter.next().unwrap(); - assert_eq!(batch1.len(), 1); - - match batch1.first().unwrap().value { - BucketValue::Distribution(ref dist) => assert_eq!(dist.len(), 2), - _ => unreachable!(), - } - - let batch2 = iter.next().unwrap(); - assert_eq!(batch2.len(), 1); - - match batch2.first().unwrap().value { - BucketValue::Distribution(ref dist) => assert_eq!(dist.len(), 2), - _ => unreachable!(), - } - - assert!(iter.next().is_none()); - } - - fn test_capped_iter_completeness(max_flush_bytes: usize, expected_elements: usize) { - let json = r#"[ - { - "name": "endpoint.response_time", - "unit": "millisecond", - "value": [1, 1, 1, 1], - "type": "d", - "timestamp": 1615889440, - "width": 10, - "tags": { - "route": "user_index" - } - } - ]"#; - - let buckets = serde_json::from_str::>(json).unwrap(); - - let mut iter = CappedBucketIter::new(buckets.into_iter(), max_flush_bytes); - let batches = iter - .by_ref() - .take(expected_elements + 1) - .collect::>(); - assert!( - batches.len() <= expected_elements, - "Cannot have more buckets than individual values" - ); - let total_elements: usize = batches.into_iter().flatten().map(|x| x.value.len()).sum(); - assert_eq!(total_elements, expected_elements); - } - - #[test] - fn test_capped_iter_completeness_0() { - test_capped_iter_completeness(0, 0); - } - - #[test] - fn test_capped_iter_completeness_90() { - // This would cause an infinite loop. - test_capped_iter_completeness(90, 0); - } - - #[test] - fn test_capped_iter_completeness_100() { - test_capped_iter_completeness(100, 4); - } } diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index 890708906e..bf5a55c622 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -75,8 +75,10 @@ mod bucket; mod protocol; mod router; mod statsd; +mod view; pub use aggregatorservice::*; pub use bucket::*; pub use protocol::*; pub use router::*; +pub use view::*; diff --git a/relay-metrics/src/snapshots/relay_metrics__view__tests__buckets_view_serialize_partial.snap b/relay-metrics/src/snapshots/relay_metrics__view__tests__buckets_view_serialize_partial.snap new file mode 100644 index 0000000000..cd213488ec --- /dev/null +++ b/relay-metrics/src/snapshots/relay_metrics__view__tests__buckets_view_serialize_partial.snap @@ -0,0 +1,85 @@ +--- +source: relay-metrics/src/view.rs +expression: partials +--- +[ + [ + { + "timestamp": 5000, + "width": 0, + "name": "c:custom/b1@none", + "type": "c", + "value": 12.0, + "tags": { + "bar": "baz", + "foo": "" + } + }, + { + "timestamp": 5000, + "width": 0, + "name": "d:custom/b2@none", + "type": "d", + "value": [ + 1.0, + 2.0 + ], + "tags": { + "bar": "baz", + "foo": "" + } + } + ], + [ + { + "timestamp": 5000, + "width": 0, + "name": "d:custom/b2@none", + "type": "d", + "value": [ + 3.0, + 5.0, + 5.0 + ], + "tags": { + "bar": "baz", + "foo": "" + } + }, + { + "timestamp": 5000, + "width": 0, + "name": "s:custom/b3@none", + "type": "s", + "value": [ + 42 + ] + } + ], + [ + { + "timestamp": 5000, + "width": 0, + "name": "s:custom/b3@none", + "type": "s", + "value": [ + 75 + ] + } + ], + [ + { + "timestamp": 5000, + "width": 0, + "name": "g:custom/b4@none", + "type": "g", + "value": { + "last": 25.0, + "min": 17.0, + "max": 42.0, + "sum": 220.0, + "count": 85 + } + } + ] +] diff --git a/relay-metrics/src/statsd.rs b/relay-metrics/src/statsd.rs index 744076d259..6fbd9c7212 100644 --- a/relay-metrics/src/statsd.rs +++ b/relay-metrics/src/statsd.rs @@ -112,27 +112,11 @@ pub enum MetricHistograms { /// time period (`false`) or after the initial delay has expired (`true`). BucketsDelay, - /// The number of batches emitted per partition by [`crate::aggregator::Aggregator`]. /// - /// This metric is tagged with: - /// - `aggregator`: The name of the metrics aggregator (usually `"default"`). - BatchesPerPartition, - - /// The number of buckets in a batch emitted by [`crate::aggregator::Aggregator`]. - /// - /// This corresponds to the number of buckets that will end up in an envelope. - /// - /// This metric is tagged with: - /// - `aggregator`: The name of the metrics aggregator (usually `"default"`). - BucketsPerBatch, - /// Distribution of flush buckets over partition keys. /// /// The distribution of buckets should be even. /// If it is not, this metric should expose it. - /// - /// This metric is tagged with: - /// - `aggregator`: The name of the metrics aggregator (usually `"default"`). PartitionKeys, /// Distribution of invalid bucket timestamps observed, relative to the time of observation. @@ -147,8 +131,6 @@ impl HistogramMetric for MetricHistograms { Self::BucketsFlushed => "metrics.buckets.flushed", Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project", Self::BucketsDelay => "metrics.buckets.delay", - Self::BatchesPerPartition => "metrics.buckets.batches_per_partition", - Self::BucketsPerBatch => "metrics.buckets.per_batch", Self::PartitionKeys => "metrics.buckets.partition_keys", Self::InvalidBucketTimestamp => "metrics.buckets.invalid_timestamp", } diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs new file mode 100644 index 0000000000..6b6e1e5c06 --- /dev/null +++ b/relay-metrics/src/view.rs @@ -0,0 +1,1061 @@ +use relay_common::time::UnixTimestamp; +use serde::ser::{SerializeMap, SerializeSeq}; +use serde::Serialize; + +use crate::{aggregator, CounterType, DistributionType, GaugeValue, SetType, SetValue}; +use std::collections::BTreeMap; +use std::fmt; +use std::ops::Range; + +use crate::bucket::Bucket; +use crate::BucketValue; + +/// The fraction of size passed to [`BucketsView::by_size()`] at which buckets will be split. A value of +/// `2` means that all buckets smaller than half of max_flush_bytes will be moved in their entirety, +/// and buckets larger will be split up. +const BUCKET_SPLIT_FACTOR: usize = 32; + +/// The average size of values when serialized. +const AVG_VALUE_SIZE: usize = 8; + +/// Just an internal type representing an index into a slice of buckets. +/// +/// Note: the meaning of fields depends on the context of the index. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct Index { + /// Index into the slice. + slice: usize, + /// Index into a bucket. + bucket: usize, +} + +/// A view into a slice of metric buckets. +/// +/// The view can be used to iterate over a large slice +/// of metric data slicing even into the buckets themselves. +/// +/// ```txt +/// Full View +/// /---------------------------------------------\ +/// [[C:1], [C:12], [D:0, 1, 2, 3, 5, 5], [S:42, 75]] +/// \--------------------------/ \---------------/ +/// View 1 View 2 +/// ``` +/// +/// Iterating over a [`BucketsView`] yields [`BucketView`] items, +/// only the first and last elements may be partial buckets. +/// +/// Using the above example, iterating over `View 1` yields the buckets: +/// `[C:1], [C:12], [D:0, 1, 2, 3]`. +pub struct BucketsView<'a> { + /// Source slice of buckets. + inner: &'a [Bucket], + /// Start index, slice index indicates bucket, + /// bucket index indicates offset in the selected bucket. + start: Index, + /// End index, slice index indicates exclusive end, + /// bucket index, indicates offset into the *next* bucket past the end. + end: Index, +} + +impl<'a> BucketsView<'a> { + /// Creates a new buckets view containing all data from the slice. + pub fn new(buckets: &'a [Bucket]) -> Self { + Self { + inner: buckets, + start: Index { + slice: 0, + bucket: 0, + }, + end: Index { + slice: buckets.len(), + bucket: 0, + }, + } + } + + /// Returns the amount of partial or full buckets in the view. + pub fn len(&self) -> usize { + let mut len = self.end.slice - self.start.slice; + if self.end.bucket != 0 { + len += 1; + } + len + } + + /// Returns whether the view contains any buckets. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Iterater over all buckets in the view. + pub fn iter(&self) -> impl Iterator> { + BucketsViewIter::new(self.inner, self.start, self.end) + } + + /// Iterator which slices the source view into segments with an approximate size of `size_in_bytes`. + pub fn by_size(&self, size_in_bytes: usize) -> impl Iterator> { + BucketsViewBySizeIter::new(self.inner, self.start, self.end, size_in_bytes) + } +} + +impl<'a> fmt::Debug for BucketsView<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let contents = self.iter().collect::>(); + f.debug_tuple("BucketsView").field(&contents).finish() + } +} + +/// Iterator yielding all items contained in a [`BucketsView`]. +/// +/// First and/or last item may be partial buckets. +struct BucketsViewIter<'a> { + /// Source slice of buckets. + inner: &'a [Bucket], + /// Current index. + current: Index, + /// End index. + end: Index, +} + +impl<'a> BucketsViewIter<'a> { + /// Creates a new iterator. + /// + /// **Notes:** if `start` and `end` are not valid indices the iterator may panic. + fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self { + Self { + inner, + end, + current: start, + } + } +} + +impl<'a> Iterator for BucketsViewIter<'a> { + type Item = BucketView<'a>; + + fn next(&mut self) -> Option { + // We reached the exact end, there is no sub-bucket index. + if self.current.slice == self.end.slice && self.end.bucket == 0 { + return None; + } + // We are way past, including sub-bucket offset. + if self.current.slice > self.end.slice { + return None; + } + + // This doesn't overflow because the last bucket in the inner slice will always have a 0 bucket index. + let next = &self.inner[self.current.slice]; + + // Choose the bucket end, this will always the full bucket except if it is the last. + let end = match self.current.slice == self.end.slice { + false => next.value.len(), + true => self.end.bucket, + }; + + let next = BucketView::new(next).select(self.current.bucket..end); + + // Even if the current Bucket was partial, the next one will be full, + // except it is the last one. + self.current = Index { + slice: self.current.slice + 1, + bucket: 0, + }; + + Some(next) + } +} + +/// Iterator slicing a [`BucketsView`] into smaller views constrained by a given size in bytes. +/// +/// See [`estimate_size`] for how the size of a bucket is calculated. +struct BucketsViewBySizeIter<'a> { + /// Source slice. + inner: &'a [Bucket], + /// Current position in the slice. + current: Index, + /// Terminal position. + end: Index, + /// Maximum size of in bytes of each slice. + max_size_bytes: usize, +} + +impl<'a> BucketsViewBySizeIter<'a> { + /// Creates a new iterator. + /// + /// **Notes:** if `start` and `end` are not valid indices the iterator may panic. + fn new(inner: &'a [Bucket], start: Index, end: Index, max_size_bytes: usize) -> Self { + Self { + inner, + end, + current: start, + max_size_bytes, + } + } +} + +impl<'a> Iterator for BucketsViewBySizeIter<'a> { + type Item = BucketsView<'a>; + + fn next(&mut self) -> Option { + let start = self.current; + + let mut remaining_bytes = self.max_size_bytes; + loop { + // Make sure, we don't shoot past the end ... + if (self.current.slice > self.end.slice) + || (self.current.slice == self.end.slice && self.end.bucket == 0) + { + break; + } + + // Select next potential bucket, + // this won't overflow because `end` will never go past the slice and + // we just validated that current is constrained by end. + let bucket = &self.inner[self.current.slice]; + let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len()); + + match split_at( + &bucket, + remaining_bytes, + self.max_size_bytes / BUCKET_SPLIT_FACTOR, + ) { + SplitDecision::BucketFits(size) => { + remaining_bytes -= size; + self.current = Index { + slice: self.current.slice + 1, + bucket: 0, + }; + continue; + } + SplitDecision::MoveToNextBatch => break, + SplitDecision::Split(at) => { + // Only certain buckets can be split, if the bucket can't be split, + // move it to the next batch. + if bucket.can_split() { + self.current = Index { + slice: self.current.slice, + bucket: self.current.bucket + at, + }; + } + break; + } + } + } + + if start == self.current { + // Either no progress could be made (not enough space to fit a bucket), + // or we're done. + return None; + } + + // Current is the current for the next batch now, + // which means, current is the end for this batch. + Some(BucketsView { + inner: self.inner, + start, + end: self.current, + }) + } +} + +impl<'a> Serialize for BucketsView<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_seq(Some(self.len()))?; + + for bucket in self.iter() { + state.serialize_element(&bucket)?; + } + + state.end() + } +} + +/// A view into a metrics bucket. Sometimes also called a partial bucket. +/// A view contains a subset of datapoints of the original bucket. +/// +/// ```txt +/// Full Bucket +/// /-------------------------------\ +/// [0, 1, 2, 3, 5, 5, 5, 10, 11, 11] +/// \----------------/\-------------/ +/// View 1 View 2 +/// ``` +/// +/// A view can be split again into multiple smaller views. +pub struct BucketView<'a> { + /// The source bucket. + inner: &'a Bucket, + /// Non-empty and valid range into the bucket. + /// The full range is constrained by `0..bucket.value.len()` + range: Range, +} + +impl<'a> BucketView<'a> { + /// Creates a new bucket view of a bucket. + /// The resulting view contains the entire bucket. + pub fn new(bucket: &'a Bucket) -> Self { + Self { + inner: bucket, + range: 0..bucket.value.len(), + } + } + + /// Timestamp of the bucket. + /// + /// See also: [`Bucket::timestamp`] + pub fn timestamp(&self) -> UnixTimestamp { + self.inner.timestamp + } + + /// Width of the bucket. + /// + /// See also: [`Bucket::width`] + pub fn width(&self) -> u64 { + self.inner.width + } + + /// Name of the bucket. + /// + /// See also: [`Bucket::name`] + pub fn name(&self) -> &str { + &self.inner.name + } + + /// Value of the bucket view. + pub fn value(&self) -> BucketViewValue<'a> { + match &self.inner.value { + BucketValue::Counter(c) => BucketViewValue::Counter(*c), + BucketValue::Distribution(d) => BucketViewValue::Distribution(&d[self.range.clone()]), + BucketValue::Set(s) => BucketViewValue::Set(SetView::new(s, self.range.clone())), + BucketValue::Gauge(g) => BucketViewValue::Gauge(*g), + } + } + + /// Name of the bucket. + /// + /// See also: [`Bucket::tags`] + pub fn tags(&self) -> &BTreeMap { + &self.inner.tags + } + + /// Returns the value of the specified tag if it exists. + /// + /// See also: [`Bucket::tag()`] + pub fn tag(&self, name: &str) -> Option<&str> { + self.inner.tag(name) + } + + /// Number of raw datapoints in this view. + /// + /// See also: [`BucketValue::len()`] + pub fn len(&self) -> usize { + self.range.len() + } + + /// Returns `true` if this bucket view contains no values. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Selects a sub-view of the current view. + /// + /// # Panics + /// + /// Function panics when: + /// * the passed range is not contained in the current view. + /// * trying to split a counter or gauage bucket + pub fn select(mut self, range: Range) -> Self { + assert!( + range.start >= self.range.start, + "range not contained in view" + ); + assert!(range.end <= self.range.end, "range not contained in view"); + assert!( + self.can_split() || range == (0..self.inner.value.len()), + "attempt to split unsplittable bucket" + ); + + self.range = range; + self + } + + /// Whether the bucket can be split into multiple. + /// + /// Only set and distribution buckets can be split. + fn can_split(&self) -> bool { + matches!( + self.inner.value, + BucketValue::Distribution(_) | BucketValue::Set(_) + ) + } + + /// Returns `true` when this view contains the entire bucket. + fn is_full_bucket(&self) -> bool { + self.range.start == 0 && self.range.end == self.inner.value.len() + } +} + +impl<'a> fmt::Debug for BucketView<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BucketView") + .field("timestamp", &self.inner.timestamp) + .field("width", &self.inner.width) + .field("name", &self.inner.name) + .field("value", &self.value()) + .field("tags", &self.inner.tags) + .finish() + } +} + +impl From<&BucketView<'_>> for Bucket { + fn from(value: &BucketView<'_>) -> Self { + // short circuit, it's the entire bucket + if value.is_full_bucket() { + return value.inner.clone(); + } + + Bucket { + timestamp: value.inner.timestamp, + width: value.inner.width, + name: value.inner.name.clone(), + value: value.value().into(), + tags: value.inner.tags.clone(), + } + } +} + +impl From> for Bucket { + fn from(value: BucketView<'_>) -> Self { + Bucket::from(&value) + } +} + +impl<'a> Serialize for BucketView<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let Bucket { + timestamp, + width, + name, + value: _, + tags, + } = self.inner; + + let len = match tags.is_empty() { + true => 4, + false => 5, + }; + + let mut state = serializer.serialize_map(Some(len))?; + + state.serialize_entry("timestamp", timestamp)?; + state.serialize_entry("width", width)?; + state.serialize_entry("name", name)?; + + if self.is_full_bucket() { + self.inner + .value + .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?; + } else { + self.value() + .serialize(serde::__private::ser::FlatMapSerializer(&mut state))?; + } + + if !tags.is_empty() { + state.serialize_entry("tags", tags)?; + } + + state.end() + } +} + +/// A view into the datapoints of a [`BucketValue`]. +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(tag = "type", content = "value")] +pub enum BucketViewValue<'a> { + /// A counter metric. + /// + /// See: [`BucketValue::Counter`]. + #[serde(rename = "c")] + Counter(CounterType), + /// A distribution metric. + /// + /// See: [`BucketValue::Distribution`]. + #[serde(rename = "d")] + Distribution(&'a [DistributionType]), + /// A set metric. + /// + /// See: [`BucketValue::Set`]. + #[serde(rename = "s")] + Set(SetView<'a>), + /// A gauage metric. + /// + /// See: [`BucketValue::Gauge`]. + #[serde(rename = "g")] + Gauge(GaugeValue), +} + +impl<'a> From<&BucketViewValue<'a>> for BucketValue { + fn from(value: &BucketViewValue<'a>) -> Self { + match value { + BucketViewValue::Counter(c) => BucketValue::Counter(*c), + BucketViewValue::Distribution(d) => { + BucketValue::Distribution(d.iter().copied().collect()) + } + BucketViewValue::Set(s) => BucketValue::Set(s.iter().copied().collect()), + BucketViewValue::Gauge(g) => BucketValue::Gauge(*g), + } + } +} + +impl<'a> From> for BucketValue { + fn from(value: BucketViewValue<'a>) -> Self { + BucketValue::from(&value) + } +} + +/// A view into the datapoints of a set metric. +#[derive(Clone)] +pub struct SetView<'a> { + source: &'a SetValue, + range: Range, +} + +impl<'a> SetView<'a> { + fn new(source: &'a SetValue, range: Range) -> Self { + Self { source, range } + } + + /// Amount of datapoints contained within the set view. + pub fn len(&self) -> usize { + self.range.len() + } + + /// Returns `true` if this set contains no values. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Iterator over all datapoints contained in this set metric. + pub fn iter(&self) -> impl Iterator { + self.source + .iter() + .skip(self.range.start) + .take(self.range.len()) + } +} + +impl<'a> PartialEq for SetView<'a> { + fn eq(&self, other: &Self) -> bool { + self.iter().eq(other.iter()) + } +} + +impl<'a> fmt::Debug for SetView<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("SetView") + .field(&self.iter().collect::>()) + .finish() + } +} + +impl<'a> Serialize for SetView<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_seq(Some(self.len()))?; + + for item in self.iter() { + state.serialize_element(item)?; + } + + state.end() + } +} + +/// Result of [`split_at`]. +enum SplitDecision { + /// Bucket fits within the current budget. + /// + /// Contains the size of the bucket to subtract from the budget. + BucketFits(usize), + /// Bucket does not fit within the current budget and cannot be split. + MoveToNextBatch, + /// The bucket should be split at the specified position. + Split(usize), +} + +/// Calculates a split for this bucket if its estimated serialization size exceeds a threshold. +/// +/// There are three possible return values: +/// - `BucketFits` if the bucket fits entirely into the size budget. +/// - `MoveToNextBatch` if the size budget cannot even hold the bucket name and tags. There is no +/// split, the entire bucket is moved. +/// - `Split(at)` if the bucket fits partially, the bucket should be split `at`. +/// +/// This is an approximate function. The bucket is not actually serialized, but rather its +/// footprint is estimated through the number of data points contained. See +/// `estimate_size` for more information. +fn split_at(bucket: &BucketView<'_>, size: usize, min_split_size: usize) -> SplitDecision { + // If there's enough space for the entire bucket, do not perform a split. + let bucket_size = estimate_size(bucket); + if size >= bucket_size { + return SplitDecision::BucketFits(bucket_size); + } + + // If the bucket key can't even fit into the remaining length, move the entire bucket into + // the right-hand side. + let own_size = estimate_base_size(bucket); + if size < (own_size + AVG_VALUE_SIZE) { + // split_at must not be zero + return SplitDecision::MoveToNextBatch; + } + + if bucket_size < min_split_size { + return SplitDecision::MoveToNextBatch; + } + + // Perform a split with the remaining space after adding the key. We assume an average + // length of 8 bytes per value and compute the number of items fitting into the left side. + let split_at = (size - own_size) / AVG_VALUE_SIZE; + + SplitDecision::Split(split_at) +} + +/// Estimates the number of bytes needed to serialize the bucket without value. +/// +/// Note that this does not match the exact size of the serialized payload. Instead, the size is +/// approximated through tags and a static overhead. +fn estimate_base_size(bucket: &BucketView<'_>) -> usize { + 50 + bucket.name().len() + aggregator::tags_cost(bucket.tags()) +} + +/// Estimates the number of bytes needed to serialize the bucket. +/// +/// Note that this does not match the exact size of the serialized payload. Instead, the size is +/// approximated through the number of contained values, assuming an average size of serialized +/// values. +fn estimate_size(bucket: &BucketView<'_>) -> usize { + estimate_base_size(bucket) + bucket.len() * AVG_VALUE_SIZE +} + +#[cfg(test)] +mod tests { + use insta::assert_json_snapshot; + use relay_common::time::UnixTimestamp; + + use super::*; + + #[test] + fn test_bucket_view_select_counter() { + let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap(); + + let view = BucketView::new(&bucket).select(0..1); + assert_eq!(view.len(), 1); + assert_eq!( + serde_json::to_string(&view).unwrap(), + serde_json::to_string(&bucket).unwrap() + ); + } + + #[test] + fn test_bucket_view_select_invalid_counter() { + let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap(); + + let select_fail = |range| { + let view = BucketView::new(&bucket); + std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") + }; + + select_fail(0..0); + select_fail(0..2); + select_fail(1..1); + } + + #[test] + fn test_bucket_view_select_distribution() { + let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap(); + + let view = BucketView::new(&bucket).select(0..3); + assert_eq!(view.len(), 3); + assert_eq!( + view.value(), + BucketViewValue::Distribution(&[1.0, 2.0, 3.0]) + ); + let view = BucketView::new(&bucket).select(1..3); + assert_eq!(view.len(), 2); + assert_eq!(view.value(), BucketViewValue::Distribution(&[2.0, 3.0])); + let view = BucketView::new(&bucket).select(1..5); + assert_eq!(view.len(), 4); + assert_eq!( + view.value(), + BucketViewValue::Distribution(&[2.0, 3.0, 5.0, 5.0]) + ); + } + + #[test] + fn test_bucket_view_select_invalid_distribution() { + let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap(); + + let select_fail = |range| { + let view = BucketView::new(&bucket); + std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") + }; + + select_fail(0..6); + select_fail(5..6); + select_fail(77..99); + } + + #[test] + fn test_bucket_view_select_set() { + let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap(); + let s = [42, 75].into(); + + let view = BucketView::new(&bucket).select(0..2); + assert_eq!(view.len(), 2); + assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2))); + let view = BucketView::new(&bucket).select(1..2); + assert_eq!(view.len(), 1); + assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2))); + let view = BucketView::new(&bucket).select(0..1); + assert_eq!(view.len(), 1); + assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1))); + } + + #[test] + fn test_bucket_view_select_invalid_set() { + let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap(); + + let select_fail = |range| { + let view = BucketView::new(&bucket); + std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") + }; + + select_fail(0..3); + select_fail(2..5); + } + + #[test] + fn test_bucket_view_select_gauge() { + let bucket = + Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap(); + + let view = BucketView::new(&bucket).select(0..5); + assert_eq!(view.len(), 5); + assert_eq!( + view.value(), + BucketViewValue::Gauge(GaugeValue { + last: 25.0, + min: 17.0, + max: 42.0, + sum: 220.0, + count: 85 + }) + ); + } + + #[test] + fn test_bucket_view_select_invalid_gauge() { + let bucket = + Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap(); + + let select_fail = |range| { + let view = BucketView::new(&bucket); + std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") + }; + + select_fail(0..1); + select_fail(0..4); + select_fail(5..5); + select_fail(5..6); + } + + #[test] + fn test_buckets_view_empty() { + let buckets = Vec::new(); + let view = BucketsView::new(&buckets); + assert_eq!(view.len(), 0); + assert!(view.is_empty()); + let partials = view.iter().collect::>(); + assert!(partials.is_empty()); + } + + #[test] + fn test_buckets_view_iter_full() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + assert_eq!(view.len(), 4); + assert!(!view.is_empty()); + let partials = view.iter().collect::>(); + assert_eq!(partials.len(), 4); + assert_eq!(partials[0].name(), "c:custom/b0@none"); + assert_eq!(partials[0].len(), 1); + assert_eq!(partials[1].name(), "c:custom/b1@none"); + assert_eq!(partials[1].len(), 1); + assert_eq!(partials[2].name(), "d:custom/b2@none"); + assert_eq!(partials[2].len(), 5); + assert_eq!(partials[3].name(), "s:custom/b3@none"); + assert_eq!(partials[3].len(), 2); + } + + #[test] + fn test_buckets_view_iter_partial_end() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let mut view = BucketsView::new(&buckets); + view.end.slice = 2; + view.end.bucket = 3; + assert_eq!(view.len(), 3); + assert!(!view.is_empty()); + + let partials = view.iter().collect::>(); + assert_eq!(partials.len(), 3); + assert_eq!(partials[0].name(), "c:custom/b0@none"); + assert_eq!(partials[0].len(), 1); + assert_eq!(partials[1].name(), "c:custom/b1@none"); + assert_eq!(partials[1].len(), 1); + assert_eq!(partials[2].name(), "d:custom/b2@none"); + assert_eq!(partials[2].len(), 3); + } + + #[test] + fn test_buckets_view_iter_partial_start() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let mut view = BucketsView::new(&buckets); + view.start.slice = 2; + view.start.bucket = 3; + assert_eq!(view.len(), 2); + assert!(!view.is_empty()); + + let partials = view.iter().collect::>(); + assert_eq!(partials.len(), 2); + assert_eq!(partials[0].name(), "d:custom/b2@none"); + assert_eq!(partials[0].len(), 2); + assert_eq!(partials[1].name(), "s:custom/b3@none"); + assert_eq!(partials[1].len(), 2); + } + + #[test] + fn test_buckets_view_iter_partial_start_and_end() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let mut view = BucketsView::new(&buckets); + view.start.slice = 2; + view.start.bucket = 1; + view.end.slice = 3; + view.end.bucket = 1; + assert_eq!(view.len(), 2); + assert!(!view.is_empty()); + + let partials = view.iter().collect::>(); + assert_eq!(partials.len(), 2); + assert_eq!(partials[0].name(), "d:custom/b2@none"); + assert_eq!(partials[0].len(), 4); + assert_eq!(partials[1].name(), "s:custom/b3@none"); + assert_eq!(partials[1].len(), 1); + } + + #[test] + fn test_buckets_view_by_size_small() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + let partials = view + .by_size(100) + .map(|bv| { + let len: usize = bv.iter().map(|b| b.len()).sum(); + let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + (len, size) + }) + .collect::>(); + + assert_eq!(partials, vec![(1, 74), (1, 74), (4, 98), (1, 74), (2, 82),]); + } + + #[test] + fn test_buckets_view_by_size_one_split() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + let partials = view + .by_size(250) + .map(|bv| { + let len: usize = bv.iter().map(|b| b.len()).sum(); + let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + (len, size) + }) + .collect::>(); + + assert_eq!(partials, vec![(6, 246), (3, 156)]); + } + + #[test] + fn test_buckets_view_by_size_no_split() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + let partials = view + .by_size(500) + .map(|bv| { + let len: usize = bv.iter().map(|b| b.len()).sum(); + let size: usize = bv.iter().map(|b| estimate_size(&b)).sum(); + (len, size) + }) + .collect::>(); + + assert_eq!(partials, vec![(9, 336)]); + } + + #[test] + fn test_buckets_view_by_size_no_too_small_no_bucket_fits() { + let b = br#" +b0:1|c +b1:12|c +b2:1:2:3:5:5|d +b3:42:75|s"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + let partials = view + .by_size(50) // Too small, a bucket requires at least 74 bytes + .count(); + + assert_eq!(partials, 0); + } + + #[test] + fn test_buckets_view_by_size_do_not_split_gauge() { + let b = br#" +transactions/foo:25:17:42:220:85|g"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + // 100 is too small to fit the gauge, but it is big enough to fit half a gauage, + // make sure the gauge does not actually get split. + let partials = view.by_size(100).count(); + + assert_eq!(partials, 0); + } + + #[test] + fn test_buckets_view_serialize_full() { + let b = br#" +b0:1|c +b1:12|c|#foo,bar:baz +b2:1:2:3:5:5|d|#foo,bar:baz +b3:42:75|s +transactions/foo:25:17:42:220:85|g"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + assert_eq!( + serde_json::to_string(&BucketsView::new(&buckets)).unwrap(), + serde_json::to_string(&buckets).unwrap() + ); + } + + #[test] + fn test_buckets_view_serialize_partial() { + let b = br#" +b1:12|c|#foo,bar:baz +b2:1:2:3:5:5|d|#foo,bar:baz +b3:42:75|s +b4:25:17:42:220:85|g"#; + + let timestamp = UnixTimestamp::from_secs(5000); + let buckets = Bucket::parse_all(b, timestamp) + .collect::, _>>() + .unwrap(); + + let view = BucketsView::new(&buckets); + // This creates 4 separate views, spanning 1-2, 2-3, 3, 4. + // 4 is too big to fit into a view together with the remainder of 3. + let partials = view.by_size(178).collect::>(); + + assert_json_snapshot!(partials); + } +} diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 77b0d30308..b93c487c87 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -8,7 +8,7 @@ use chrono::Utc; use relay_base_schema::project::ProjectKey; use relay_config::{Config, HttpEncoding}; use relay_event_schema::protocol::ClientReport; -use relay_metrics::{Aggregator, Bucket, MergeBuckets}; +use relay_metrics::Bucket; use relay_quotas::Scoping; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse}; @@ -29,6 +29,8 @@ use crate::http::{HttpError, Request, RequestBuilder, Response}; use crate::statsd::RelayHistograms; use crate::utils::ManagedEnvelope; +use super::processor::EncodeMetrics; + /// Error created while handling [`SendEnvelope`]. #[derive(Debug, thiserror::Error)] #[allow(clippy::enum_variant_names)] @@ -147,8 +149,6 @@ pub struct SendMetrics { pub buckets: Vec, /// Scoping information for the metrics. pub scoping: Scoping, - /// The key of the logical partition to send the metrics to. - pub partition_key: Option, } /// Dispatch service for generating and submitting Envelopes. @@ -196,7 +196,6 @@ impl FromMessage for EnvelopeManager { #[derive(Debug)] pub struct EnvelopeManagerService { config: Arc, - aggregator: Addr, enveloper_processor: Addr, project_cache: Addr, test_store: Addr, @@ -209,7 +208,6 @@ impl EnvelopeManagerService { /// Creates a new instance of the [`EnvelopeManager`] service. pub fn new( config: Arc, - aggregator: Addr, enveloper_processor: Addr, project_cache: Addr, test_store: Addr, @@ -217,7 +215,6 @@ impl EnvelopeManagerService { ) -> Self { Self { config, - aggregator, enveloper_processor, project_cache, test_store, @@ -238,7 +235,7 @@ impl EnvelopeManagerService { &self, mut envelope: Box, scoping: Scoping, - partition_key: Option, + partition_key: Option, ) -> Result<(), SendEnvelopeError> { #[cfg(feature = "processing")] { @@ -280,7 +277,7 @@ impl EnvelopeManagerService { http_encoding: self.config.http_encoding(), response_sender: tx, project_key: scoping.project_key, - partition_key, + partition_key: partition_key.map(|k| k.to_string()), }; if let HttpEncoding::Identity = request.http_encoding { @@ -300,9 +297,13 @@ impl EnvelopeManagerService { let SubmitEnvelope { mut envelope } = message; let scoping = envelope.scoping(); + let partition_key = envelope.partition_key(); let inner_envelope = envelope.take_envelope(); - match self.submit_envelope(inner_envelope, scoping, None).await { + match self + .submit_envelope(inner_envelope, scoping, partition_key) + .await + { Ok(_) => { envelope.accept(); } @@ -328,37 +329,27 @@ impl EnvelopeManagerService { } async fn handle_send_metrics(&self, message: SendMetrics) { - let SendMetrics { - buckets, - scoping, - partition_key, - } = message; + let SendMetrics { buckets, scoping } = message; - let upstream = self.config.upstream_descriptor(); - let dsn = PartialDsn { - scheme: upstream.scheme(), - public_key: scoping.project_key, - host: upstream.host().to_owned(), - port: upstream.port(), - path: "".to_owned(), - project_id: Some(scoping.project_id), - }; - - let mut item = Item::new(ItemType::MetricBuckets); - item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); - let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn)); - envelope.add_item(item); + #[allow(unused_mut)] + let mut partitions = self.config.metrics_partitions(); + #[allow(unused_mut)] + let mut max_batch_size_bytes = self.config.metrics_max_batch_size_bytes(); - let partition_key = partition_key.map(|x| x.to_string()); - let result = self.submit_envelope(envelope, scoping, partition_key).await; - if let Err(err) = result { - relay_log::trace!( - error = &err as &dyn Error, - "failed to submit the envelope, merging buckets back", - ); - self.aggregator - .send(MergeBuckets::new(scoping.project_key, buckets)); + #[cfg(feature = "processing")] + if self.store_forwarder.is_some() { + // Partitioning on processing relays does not make sense, they end up all + // in the same Kafka topic anyways and the partition key is ignored. + partitions = None; + max_batch_size_bytes = self.config.metrics_max_batch_size_bytes_processing(); } + + self.enveloper_processor.send(EncodeMetrics { + buckets, + scoping, + max_batch_size_bytes, + partitions, + }); } async fn handle_send_client_reports(&self, message: SendClientReports) { diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 3afbca17b3..9dd65d5837 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -36,12 +36,13 @@ use relay_event_schema::protocol::{ Timestamp, TraceContext, UserReport, Values, }; use relay_filter::FilterStatKey; +use relay_metrics::aggregator::partition_buckets; use relay_metrics::aggregator::AggregatorConfig; -use relay_metrics::{Bucket, MergeBuckets, MetricNamespace}; +use relay_metrics::{Bucket, BucketsView, MergeBuckets, MetricNamespace}; use relay_pii::{scrub_graphql, PiiAttachmentsProcessor, PiiConfigError, PiiProcessor}; use relay_profiling::{ProfileError, ProfileId}; use relay_protocol::{Annotated, Array, Empty, FromValue, Object, Value}; -use relay_quotas::{DataCategory, ReasonCode}; +use relay_quotas::{DataCategory, ReasonCode, Scoping}; use relay_replays::recording::RecordingScrubber; use relay_sampling::config::{RuleType, SamplingMode}; use relay_sampling::evaluation::{ @@ -71,7 +72,7 @@ use crate::actors::project::ProjectState; use crate::actors::project_cache::ProjectCache; use crate::actors::upstream::{SendRequest, UpstreamRelay}; use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; -use crate::extractors::RequestMeta; +use crate::extractors::{PartialDsn, RequestMeta}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; @@ -485,6 +486,19 @@ impl EncodeEnvelope { } } +/// Encodes metrics into an envelope ready to be sent upstream. +#[derive(Debug)] +pub struct EncodeMetrics { + /// The metric buckets to encode. + pub buckets: Vec, + /// Scoping for metric buckets. + pub scoping: Scoping, + /// Approximate size in bytes to batch buckets. + pub max_batch_size_bytes: usize, + /// Amount of logical partitions for the buckets. + pub partitions: Option, +} + /// Applies rate limits to metrics buckets and forwards them to the envelope manager. #[cfg(feature = "processing")] #[derive(Debug)] @@ -498,8 +512,9 @@ pub enum EnvelopeProcessor { ProcessEnvelope(Box), ProcessMetrics(Box), EncodeEnvelope(Box), + EncodeMetrics(Box), #[cfg(feature = "processing")] - RateLimitFlushBuckets(RateLimitBuckets), + RateLimitBuckets(RateLimitBuckets), } impl relay_system::Interface for EnvelopeProcessor {} @@ -528,12 +543,20 @@ impl FromMessage for EnvelopeProcessor { } } +impl FromMessage for EnvelopeProcessor { + type Response = NoResponse; + + fn from_message(message: EncodeMetrics, _: ()) -> Self { + Self::EncodeMetrics(Box::new(message)) + } +} + #[cfg(feature = "processing")] impl FromMessage for EnvelopeProcessor { type Response = NoResponse; fn from_message(message: RateLimitBuckets, _: ()) -> Self { - Self::RateLimitFlushBuckets(message) + Self::RateLimitBuckets(message) } } @@ -2983,12 +3006,10 @@ impl EnvelopeProcessorService { /// Check and apply rate limits to metrics buckets. #[cfg(feature = "processing")] - fn handle_rate_limit_flush_buckets(&self, message: RateLimitBuckets) { + fn handle_rate_limit_buckets(&self, message: RateLimitBuckets) { use relay_quotas::ItemScoping; - let RateLimitBuckets { - mut bucket_limiter, .. - } = message; + let RateLimitBuckets { mut bucket_limiter } = message; let scoping = *bucket_limiter.scoping(); @@ -3075,14 +3096,60 @@ impl EnvelopeProcessorService { } } + fn handle_encode_metrics(&self, message: EncodeMetrics) { + let EncodeMetrics { + buckets, + scoping, + max_batch_size_bytes, + partitions, + } = message; + + let upstream = self.inner.config.upstream_descriptor(); + let dsn = PartialDsn { + scheme: upstream.scheme(), + public_key: scoping.project_key, + host: upstream.host().to_owned(), + port: upstream.port(), + path: "".to_owned(), + project_id: Some(scoping.project_id), + }; + + for (partition_key, buckets) in partition_buckets(scoping.project_key, buckets, partitions) + { + let mut num_batches = 0; + + for batch in BucketsView::new(&buckets).by_size(max_batch_size_bytes) { + #[allow(clippy::redundant_closure_call)] + let mut envelope: ManagedEnvelope = (move || { + let _ = dsn; + todo!("Added by next commit in the PR") + })(); + envelope.set_partition_key(partition_key).scope(scoping); + + relay_statsd::metric!( + histogram(RelayHistograms::BucketsPerBatch) = batch.len() as u64 + ); + + self.inner + .envelope_manager + .send(SubmitEnvelope { envelope }); + + num_batches += 1; + } + + relay_statsd::metric!(histogram(RelayHistograms::BatchesPerPartition) = num_batches); + } + } + fn handle_message(&self, message: EnvelopeProcessor) { match message { EnvelopeProcessor::ProcessEnvelope(message) => self.handle_process_envelope(*message), EnvelopeProcessor::ProcessMetrics(message) => self.handle_process_metrics(*message), EnvelopeProcessor::EncodeEnvelope(message) => self.handle_encode_envelope(*message), + EnvelopeProcessor::EncodeMetrics(message) => self.handle_encode_metrics(*message), #[cfg(feature = "processing")] - EnvelopeProcessor::RateLimitFlushBuckets(message) => { - self.handle_rate_limit_flush_buckets(message); + EnvelopeProcessor::RateLimitBuckets(message) => { + self.handle_rate_limit_buckets(message); } } } diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index f163151c4e..2d7b9e678c 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -1013,23 +1013,14 @@ impl Project { }) } - pub fn flush_buckets( - &mut self, - envelope_manager: Addr, - partition_key: Option, - buckets: Vec, - ) { + pub fn flush_buckets(&mut self, envelope_manager: Addr, buckets: Vec) { let Some(scoping) = self.scoping() else { relay_log::trace!("there is no scoping: dropping {} buckets", buckets.len()); return; }; if !buckets.is_empty() { - envelope_manager.send(SendMetrics { - buckets, - scoping, - partition_key, - }); + envelope_manager.send(SendMetrics { buckets, scoping }); } } } diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 38a9b4e514..01abb291a0 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -859,8 +859,9 @@ impl ProjectCacheBroker { fn handle_flush_buckets(&mut self, message: FlushBuckets) { let envelope_manager = self.services.envelope_manager.clone(); + self.get_or_create_project(message.project_key) - .flush_buckets(envelope_manager, message.partition_key, message.buckets); + .flush_buckets(envelope_manager, message.buckets); } fn handle_buffer_index(&mut self, message: UpdateBufferIndex) { diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 19aa6967dd..b8ed71126c 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -151,7 +151,6 @@ impl ServiceState { #[allow(unused_mut)] let mut envelope_manager_service = EnvelopeManagerService::new( config.clone(), - aggregator.clone(), processor.clone(), project_cache.clone(), test_store.clone(), diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 63e56aea44..3ce79faf9c 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -72,6 +72,12 @@ pub enum RelayHistograms { /// As long as there are enough permits in the [`crate::utils::BufferGuard`], this number should /// always be one. BufferDequeueAttempts, + /// The number of batches emitted per partition. + BatchesPerPartition, + /// The number of buckets in a batch emitted. + /// + /// This corresponds to the number of buckets that will end up in an envelope. + BucketsPerBatch, /// The number of spans per processed transaction event. /// /// This metric is tagged with: @@ -160,6 +166,8 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::EnvelopeQueueSizePct => "event.queue_size.pct", RelayHistograms::EnvelopeQueueSize => "event.queue_size", RelayHistograms::EventSpans => "event.spans", + RelayHistograms::BatchesPerPartition => "metrics.buckets.batches_per_partition", + RelayHistograms::BucketsPerBatch => "metrics.buckets.per_batch", RelayHistograms::BufferEnvelopesMemoryBytes => "buffer.envelopes_mem", RelayHistograms::BufferDiskSize => "buffer.disk_size", RelayHistograms::BufferDequeueAttempts => "buffer.dequeue_attempts", diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index 76f8aa692d..63838b7e58 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -59,6 +59,7 @@ struct EnvelopeContext { summary: EnvelopeSummary, scoping: Scoping, slot: Option, + partition_key: Option, done: bool, } @@ -102,6 +103,7 @@ impl ManagedEnvelope { summary, scoping, slot, + partition_key: None, done: false, }, outcome_aggregator, @@ -342,6 +344,15 @@ impl ManagedEnvelope { self.context.scoping } + pub fn partition_key(&self) -> Option { + self.context.partition_key + } + + pub fn set_partition_key(&mut self, partition_key: Option) -> &mut Self { + self.context.partition_key = partition_key; + self + } + pub fn meta(&self) -> &RequestMeta { self.envelope().meta() } diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index db8d81a91c..76fbd8593b 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -428,6 +428,8 @@ def test_uses_trace_public_key(mini_sentry, relay): # and it should create an outcome outcomes = mini_sentry.captured_outcomes.get(timeout=2) assert outcomes is not None + with pytest.raises(queue.Empty): + mini_sentry.captured_outcomes.get(timeout=1) # Second # send trace with project_id2 context (should go through) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index a81c16425c..e19f8986e5 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -6,6 +6,7 @@ import pytest import requests +import queue from .test_envelope import generate_transaction_item @@ -121,19 +122,20 @@ def test_metrics_backdated(mini_sentry, relay): @pytest.mark.parametrize( - "flush_partitions,expected_header", [(None, None), (0, "0"), (1, "0"), (128, "34")] + "metrics_partitions,expected_header", + [(None, None), (0, "0"), (1, "0"), (128, "34")], ) -def test_metrics_partition_key(mini_sentry, relay, flush_partitions, expected_header): +def test_metrics_partition_key(mini_sentry, relay, metrics_partitions, expected_header): forever = 100 * 365 * 24 * 60 * 60 # *almost forever relay_config = { "processing": { "max_session_secs_in_past": forever, + "metrics_partitions": metrics_partitions, }, "aggregator": { "bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0, - "flush_partitions": flush_partitions, "max_secs_in_past": forever, "max_secs_in_future": forever, }, @@ -162,6 +164,41 @@ def test_metrics_partition_key(mini_sentry, relay, flush_partitions, expected_he assert headers["X-Sentry-Relay-Shard"] == expected_header, headers +@pytest.mark.parametrize( + "max_batch_size,expected_events", [(1000, 1), (200, 2), (130, 3), (100, 6), (50, 0)] +) +def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_events): + forever = 100 * 365 * 24 * 60 * 60 # *almost forever + relay_config = { + "processing": { + "max_session_secs_in_past": forever, + "metrics_max_batch_size": max_batch_size, + }, + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + "debounce_delay": 0, + "max_secs_in_past": forever, + "max_secs_in_future": forever, + }, + } + relay = relay(mini_sentry, options=relay_config) + + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + metrics_payload = ( + "transactions/foo:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17|d|T999994711" + ) + relay.send_metrics(project_id, metrics_payload) + + for _ in range(expected_events): + mini_sentry.captured_events.get(timeout=3) + + with pytest.raises(queue.Empty): + mini_sentry.captured_events.get(timeout=1) + + def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_consumer): relay = relay_with_processing(options=TEST_CONFIG) metrics_consumer = metrics_consumer() From a6ae73f6ecccaf1c63de7e2531069de15da879db Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 17 Nov 2023 16:17:04 +0100 Subject: [PATCH 02/10] produce outcomes for metrics --- relay-server/src/actors/envelopes.rs | 11 +- relay-server/src/actors/processor.rs | 53 ++++++-- relay-server/src/actors/project.rs | 37 +++++- relay-server/src/actors/project_cache.rs | 3 +- relay-server/src/envelope.rs | 48 +++++++ relay-server/src/service.rs | 1 + relay-server/src/utils/managed_envelope.rs | 40 ++++-- relay-server/src/utils/metrics_rate_limits.rs | 122 +++++++++++------- relay-server/src/utils/rate_limits.rs | 49 ++++++- 9 files changed, 288 insertions(+), 76 deletions(-) diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index b93c487c87..9c7cb9c2c5 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -27,7 +27,7 @@ use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType use crate::extractors::{PartialDsn, RequestMeta}; use crate::http::{HttpError, Request, RequestBuilder, Response}; use crate::statsd::RelayHistograms; -use crate::utils::ManagedEnvelope; +use crate::utils::{ExtractionMode, ManagedEnvelope}; use super::processor::EncodeMetrics; @@ -149,6 +149,8 @@ pub struct SendMetrics { pub buckets: Vec, /// Scoping information for the metrics. pub scoping: Scoping, + /// Transaction extraction mode. + pub extraction_mode: ExtractionMode, } /// Dispatch service for generating and submitting Envelopes. @@ -329,7 +331,11 @@ impl EnvelopeManagerService { } async fn handle_send_metrics(&self, message: SendMetrics) { - let SendMetrics { buckets, scoping } = message; + let SendMetrics { + buckets, + scoping, + extraction_mode, + } = message; #[allow(unused_mut)] let mut partitions = self.config.metrics_partitions(); @@ -347,6 +353,7 @@ impl EnvelopeManagerService { self.enveloper_processor.send(EncodeMetrics { buckets, scoping, + extraction_mode, max_batch_size_bytes, partitions, }); diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 9dd65d5837..01c4c01e3d 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -71,16 +71,19 @@ use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::actors::project::ProjectState; use crate::actors::project_cache::ProjectCache; use crate::actors::upstream::{SendRequest, UpstreamRelay}; -use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType}; +use crate::envelope::{AttachmentType, ContentType, Envelope, Item, ItemType, SourceQuantities}; use crate::extractors::{PartialDsn, RequestMeta}; use crate::metrics_extraction::transactions::types::ExtractMetricsError; use crate::metrics_extraction::transactions::{ExtractedMetrics, TransactionExtractor}; use crate::service::ServiceError; use crate::statsd::{PlatformTag, RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{ - self, ChunkedFormDataAggregator, FormDataIter, ItemAction, ManagedEnvelope, SamplingResult, + self, extract_transaction_count, ChunkedFormDataAggregator, ExtractionMode, FormDataIter, + ItemAction, ManagedEnvelope, SamplingResult, }; +use super::test_store::TestStore; + /// The minimum clock drift for correction to apply. const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60); @@ -493,6 +496,8 @@ pub struct EncodeMetrics { pub buckets: Vec, /// Scoping for metric buckets. pub scoping: Scoping, + /// Transaction metrics extraction mode. + pub extraction_mode: ExtractionMode, /// Approximate size in bytes to batch buckets. pub max_batch_size_bytes: usize, /// Amount of logical partitions for the buckets. @@ -578,6 +583,7 @@ struct InnerProcessor { #[cfg(feature = "processing")] aggregator: Addr, upstream_relay: Addr, + test_store: Addr, #[cfg(feature = "processing")] rate_limiter: Option, geoip_lookup: Option, @@ -593,6 +599,7 @@ impl EnvelopeProcessorService { outcome_aggregator: Addr, project_cache: Addr, upstream_relay: Addr, + test_store: Addr, #[cfg(feature = "processing")] aggregator: Addr, ) -> Self { let geoip_lookup = config.geoip_path().and_then(|p| { @@ -616,6 +623,7 @@ impl EnvelopeProcessorService { project_cache, outcome_aggregator, upstream_relay, + test_store, geoip_lookup, #[cfg(feature = "processing")] aggregator, @@ -3101,6 +3109,7 @@ impl EnvelopeProcessorService { buckets, scoping, max_batch_size_bytes, + extraction_mode, partitions, } = message; @@ -3119,11 +3128,14 @@ impl EnvelopeProcessorService { let mut num_batches = 0; for batch in BucketsView::new(&buckets).by_size(max_batch_size_bytes) { - #[allow(clippy::redundant_closure_call)] - let mut envelope: ManagedEnvelope = (move || { - let _ = dsn; - todo!("Added by next commit in the PR") - })(); + let mut envelope = Envelope::from_request(None, RequestMeta::outbound(dsn.clone())); + envelope.add_item(create_metrics_item(&batch, extraction_mode)); + + let mut envelope = ManagedEnvelope::standalone( + envelope, + self.inner.outcome_aggregator.clone(), + self.inner.test_store.clone(), + ); envelope.set_partition_key(partition_key).scope(scoping); relay_statsd::metric!( @@ -3192,6 +3204,26 @@ impl Service for EnvelopeProcessorService { } } +fn create_metrics_item(buckets: &BucketsView<'_>, extraction_mode: ExtractionMode) -> Item { + let source_quantities = buckets + .iter() + .filter_map(|bucket| extract_transaction_count(&bucket, extraction_mode)) + .fold(SourceQuantities::default(), |acc, c| { + let profile_count = if c.has_profile { c.count } else { 0 }; + + SourceQuantities { + transactions: acc.transactions + c.count, + profiles: acc.profiles + profile_count, + } + }); + + let mut item = Item::new(ItemType::MetricBuckets); + item.set_source_quantities(source_quantities); + item.set_payload(ContentType::Json, serde_json::to_vec(&buckets).unwrap()); + + item +} + #[cfg(test)] mod tests { use std::env; @@ -3629,6 +3661,7 @@ mod tests { let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {}); let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {}); let (upstream_relay, _) = mock_service("upstream_relay", (), |&mut (), _| {}); + let (test_store, _) = mock_service("test_store", (), |&mut (), _| {}); #[cfg(feature = "processing")] let (aggregator, _) = mock_service("aggregator", (), |&mut (), _| {}); let inner = InnerProcessor { @@ -3636,14 +3669,15 @@ mod tests { envelope_manager, project_cache, outcome_aggregator, + #[cfg(feature = "processing")] + aggregator, upstream_relay, + test_store, #[cfg(feature = "processing")] rate_limiter: None, #[cfg(feature = "processing")] redis_pool: None, geoip_lookup: None, - #[cfg(feature = "processing")] - aggregator, }; EnvelopeProcessorService { @@ -3659,6 +3693,7 @@ mod tests { project_cache: Addr::dummy(), outcome_aggregator: Addr::dummy(), upstream_relay: Addr::dummy(), + test_store: Addr::dummy(), #[cfg(feature = "processing")] rate_limiter: None, #[cfg(feature = "processing")] diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 2d7b9e678c..7cfbd61584 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -28,7 +28,9 @@ use crate::actors::project_cache::{CheckedEnvelope, ProjectCache, RequestUpdate} use crate::extractors::RequestMeta; use crate::statsd::RelayCounters; -use crate::utils::{EnvelopeLimiter, ManagedEnvelope, MetricsLimiter, RetryBackoff}; +use crate::utils::{ + EnvelopeLimiter, ExtractionMode, ManagedEnvelope, MetricsLimiter, RetryBackoff, +}; /// The expiry status of a project state. Return value of [`ProjectState::check_expiry`]. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] @@ -564,7 +566,8 @@ impl Project { _ => false, }; - match MetricsLimiter::create(metrics, &state.config.quotas, scoping, usage) { + let mode = ExtractionMode::from_usage(usage); + match MetricsLimiter::create(metrics, &state.config.quotas, scoping, mode) { Ok(mut limiter) => { limiter.enforce_limits(Ok(&self.rate_limits), outcome_aggregator); limiter.into_metrics() @@ -637,8 +640,9 @@ impl Project { Some(ErrorBoundary::Ok(ref c)) => c.usage_metric(), _ => false, }; + let extraction_mode = ExtractionMode::from_usage(usage); - let buckets = match MetricsLimiter::create(buckets, quotas, scoping, usage) { + let buckets = match MetricsLimiter::create(buckets, quotas, scoping, extraction_mode) { Ok(mut bucket_limiter) => { let cached_rate_limits = self.rate_limits().clone(); #[allow(unused_variables)] @@ -1013,14 +1017,37 @@ impl Project { }) } - pub fn flush_buckets(&mut self, envelope_manager: Addr, buckets: Vec) { + pub fn flush_buckets( + &mut self, + project_cache: Addr, + envelope_manager: Addr, + buckets: Vec, + ) { + let Some(project_state) = self.get_cached_state(project_cache, false) else { + relay_log::trace!( + "there is no project state: dropping {} buckets", + buckets.len() + ); + return; + }; + let Some(scoping) = self.scoping() else { relay_log::trace!("there is no scoping: dropping {} buckets", buckets.len()); return; }; + let usage = match project_state.config.transaction_metrics { + Some(ErrorBoundary::Ok(ref c)) => c.usage_metric(), + _ => false, + }; + let extraction_mode = ExtractionMode::from_usage(usage); + if !buckets.is_empty() { - envelope_manager.send(SendMetrics { buckets, scoping }); + envelope_manager.send(SendMetrics { + buckets, + scoping, + extraction_mode, + }); } } } diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 01abb291a0..8fc5663e91 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -859,9 +859,10 @@ impl ProjectCacheBroker { fn handle_flush_buckets(&mut self, message: FlushBuckets) { let envelope_manager = self.services.envelope_manager.clone(); + let project_cache = self.services.project_cache.clone(); self.get_or_create_project(message.project_key) - .flush_buckets(envelope_manager, message.buckets); + .flush_buckets(project_cache, envelope_manager, message.buckets); } fn handle_buffer_index(&mut self, message: UpdateBufferIndex) { diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index a0679c531a..87c3989a0d 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -480,6 +480,18 @@ pub struct ItemHeaders { #[serde(default, skip)] rate_limited: bool, + /// Contains the amount of events this item was generated and aggregated from. + /// + /// A [metrics buckets](`ItemType::MetricBuckets`) item contains metrics extracted and + /// aggregated from (currently) transactions and profiles. + /// + /// This information can not be directly inferred from the item itself anymore. + /// The amount of events this item/metric represents is instead stored here. + /// + /// NOTE: This is internal-only and not exposed into the Envelope. + #[serde(default, skip)] + source_quantities: Option, + /// A list of cumulative sample rates applied to this event. /// /// Multiple entries in `sample_rates` mean that the event was sampled multiple times. The @@ -501,6 +513,17 @@ pub struct ItemHeaders { other: BTreeMap, } +/// Container for item quantities that the item was derived from. +/// +/// For example a metric bucket may be derived and aggregated from multiple transactions. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub struct SourceQuantities { + /// Transaction quantity. + pub transactions: usize, + /// Profile quantity. + pub profiles: usize, +} + #[derive(Clone, Debug)] pub struct Item { headers: ItemHeaders, @@ -519,6 +542,7 @@ impl Item { filename: None, routing_hint: None, rate_limited: false, + source_quantities: None, sample_rates: None, other: BTreeMap::new(), metrics_extracted: false, @@ -666,6 +690,16 @@ impl Item { self.headers.sample_rates.take() } + /// Returns the contained source quantities. + pub fn source_quantities(&self) -> Option { + self.headers.source_quantities + } + + /// Sets new source quantities. + pub fn set_source_quantities(&mut self, source_quantities: SourceQuantities) { + self.headers.source_quantities = Some(source_quantities); + } + /// Sets sample rates for this item. pub fn set_sample_rates(&mut self, sample_rates: Value) { if matches!(sample_rates, Value::Array(ref a) if !a.is_empty()) { @@ -1343,6 +1377,20 @@ mod tests { assert_eq!(item.routing_hint(), Some(uuid)); } + #[test] + fn test_item_source_quantities() { + let mut item = Item::new(ItemType::MetricBuckets); + assert!(item.source_quantities().is_none()); + + let source_quantities = SourceQuantities { + transactions: 12, + ..Default::default() + }; + item.set_source_quantities(source_quantities); + + assert_eq!(item.source_quantities(), Some(source_quantities)); + } + #[test] fn test_envelope_empty() { let event_id = EventId::new(); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index b8ed71126c..969312de23 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -143,6 +143,7 @@ impl ServiceState { outcome_aggregator.clone(), project_cache.clone(), upstream_relay.clone(), + test_store.clone(), #[cfg(feature = "processing")] aggregator.clone(), ) diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index 63838b7e58..b41cc438e9 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -111,28 +111,30 @@ impl ManagedEnvelope { } } - /// Creates a standalone envelope for testing purposes. - /// - /// As opposed to [`new`](Self::new), this does not require a queue permit. This makes it - /// suitable for unit testing internals of the processing pipeline. #[cfg(test)] - pub fn standalone( + pub fn untracked( envelope: Box, outcome_aggregator: Addr, test_store: Addr, ) -> Self { - Self::new_internal(envelope, None, outcome_aggregator, test_store) + let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store); + envelope.context.done = true; + envelope } - #[cfg(test)] - pub fn untracked( + /// Creates a new managed envelope like [`new`](Self::new) but without a queue permit. + /// + /// This is suitable for aggregated metrics. Metrics live outside the lifecycle of a normal + /// event. They are extracted, aggregated and regularily flushed, after the + /// source event has already been processed. + /// + /// The constructor is also suitable for unit testing internals of the processing pipeline. + pub fn standalone( envelope: Box, outcome_aggregator: Addr, test_store: Addr, ) -> Self { - let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store); - envelope.context.done = true; - envelope + Self::new_internal(envelope, None, outcome_aggregator, test_store) } /// Computes a managed envelope from the given envelope and binds it to the processing queue. @@ -298,6 +300,7 @@ impl ManagedEnvelope { tags.has_attachments = summary.attachment_quantity > 0, tags.has_sessions = summary.session_quantity > 0, tags.has_profiles = summary.profile_quantity > 0, + tags.has_transactions = summary.secondary_transaction_quantity > 0, tags.has_replays = summary.replay_quantity > 0, tags.has_checkins = summary.checkin_quantity > 0, tags.event_category = ?summary.event_category, @@ -326,7 +329,7 @@ impl ManagedEnvelope { if self.context.summary.profile_quantity > 0 { self.track_outcome( - outcome, + outcome.clone(), if self.use_index_category() { DataCategory::ProfileIndexed } else { @@ -336,6 +339,19 @@ impl ManagedEnvelope { ); } + // Track outcomes for attached secondary transactions, e.g. extracted from metrics. + // + // Primary transaction count is already tracked through the event category + // (see: `Self::event_category()`). + if self.context.summary.secondary_transaction_quantity > 0 { + self.track_outcome( + outcome, + // Secondary transaction counts are never indexed transactions + DataCategory::Transaction, + self.context.summary.secondary_transaction_quantity, + ); + } + self.finish(RelayCounters::EnvelopeRejected, handling); } diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index f83993a4cc..f70711e432 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -1,7 +1,9 @@ //! Quota and rate limiting helpers for metrics and metrics buckets. use chrono::{DateTime, Utc}; use relay_common::time::UnixTimestamp; -use relay_metrics::{Bucket, BucketValue, MetricNamespace, MetricResourceIdentifier}; +use relay_metrics::{ + Bucket, BucketView, BucketViewValue, MetricNamespace, MetricResourceIdentifier, +}; use relay_quotas::{DataCategory, ItemScoping, Quota, RateLimits, Scoping}; use relay_system::Addr; @@ -34,6 +36,65 @@ pub struct MetricsLimiter> = Vec> { const PROFILE_TAG: &str = "has_profile"; +/// Extracts the transaction count from a metric. +/// +/// If the metric was extracted from a or multiple transaction, it returns the amount +/// of datapoints contained in the bucket. +/// +/// Additionally tracks whether the transactions also contained profiling information. +/// +/// Returns `None` if the metric was not extracted from transactions. +pub fn extract_transaction_count( + metric: &BucketView<'_>, + mode: ExtractionMode, +) -> Option { + let mri = match MetricResourceIdentifier::parse(metric.name()) { + Ok(mri) => mri, + Err(_) => { + relay_log::error!("invalid MRI: {}", metric.name()); + return None; + } + }; + + if mri.namespace != MetricNamespace::Transactions { + return None; + } + + let usage = matches!(mode, ExtractionMode::Usage); + let count = match metric.value() { + BucketViewValue::Counter(c) if usage && mri.name == "usage" => c as usize, + BucketViewValue::Distribution(d) if !usage && mri.name == "duration" => d.len(), + _ => 0, + }; + + let has_profile = + matches!(mri.name, "usage" | "duration") && metric.tag(PROFILE_TAG) == Some("true"); + + Some(TransactionCount { count, has_profile }) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExtractionMode { + Usage, + Duration, +} + +impl ExtractionMode { + pub fn from_usage(usage: bool) -> Self { + if usage { + Self::Usage + } else { + Self::Duration + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct TransactionCount { + pub count: usize, + pub has_profile: bool, +} + impl>> MetricsLimiter { /// Create a new limiter instance. /// @@ -42,60 +103,29 @@ impl>> MetricsLimiter { buckets: Vec, quotas: Q, scoping: Scoping, - usage: bool, + mode: ExtractionMode, ) -> Result> { let counts: Vec<_> = buckets .iter() - .map(|metric| { - let mri = match MetricResourceIdentifier::parse(&metric.name) { - Ok(mri) => mri, - Err(_) => { - relay_log::error!("invalid MRI: {}", metric.name); - return None; - } - }; - - // Keep all metrics that are not transaction related: - if mri.namespace != MetricNamespace::Transactions { - return None; - } - - let count = match &metric.value { - // The "usage" counter directly tracks the number of processed transactions. - BucketValue::Counter(c) if usage && mri.name == "usage" => *c as usize, - - // Fallback to the legacy "duration" metric, which is extracted exactly once for - // every processed transaction and was originally used to count transactions. - BucketValue::Distribution(d) if !usage && mri.name == "duration" => d.len(), - - // For any other metric in the transaction namespace, we check the limit with - // quantity=0 so transactions are not double counted against the quota. - _ => 0, - }; - - let has_profile = matches!(mri.name, "usage" | "duration") - && metric.tag(PROFILE_TAG) == Some("true"); - - Some((count, has_profile)) - }) + .map(|metric| extract_transaction_count(&BucketView::new(metric), mode)) .collect(); - // Accumulate the total transaction count: - let mut total_counts: Option<(usize, usize)> = None; - for (tx_count, has_profile) in counts.iter().flatten() { - let (total_txs, total_profiles) = total_counts.get_or_insert((0, 0)); - *total_txs += tx_count; - if *has_profile { - *total_profiles += tx_count; - } - } + // Accumulate the total transaction count and profile count + let total_counts = counts + .iter() + .filter_map(Option::as_ref) + .map(|c| { + let profile_count = if c.has_profile { c.count } else { 0 }; + (c.count, profile_count) + }) + .reduce(|a, b| (a.0 + b.0, a.1 + b.1)); if let Some((transaction_count, profile_count)) = total_counts { let transaction_buckets = counts.iter().map(Option::is_some).collect(); let profile_buckets = counts .iter() .map(|o| match o { - Some((_, has_profile)) => *has_profile, + Some(c) => c.has_profile, None => false, }) .collect(); @@ -324,7 +354,7 @@ mod tests { project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, - true, + ExtractionMode::Usage, ) .unwrap(); @@ -412,7 +442,7 @@ mod tests { project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, - true, + ExtractionMode::Usage, ) .unwrap(); diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 80c868b7da..6b63dbe78f 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -139,6 +139,16 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub checkin_quantity: usize, + /// Secondary number of of transactions. + /// + /// This is 0 for envelopes which contain a transaction, + /// only secondary transaction quantity should be tracked here, + /// these are for example transaction counts extracted from metrics. + /// + /// A "primary" transaction is contained within the envelope, + /// marking the envelope data category a [`DataCategory::Transaction`]. + pub secondary_transaction_quantity: usize, + /// Indicates that the envelope contains regular attachments that do not create event payloads. pub has_plain_attachments: bool, @@ -177,6 +187,11 @@ impl EnvelopeSummary { continue; } + if let Some(source_quantities) = item.source_quantities() { + summary.secondary_transaction_quantity += source_quantities.transactions; + summary.profile_quantity += source_quantities.profiles; + } + summary.payload_size += item.len(); summary.set_quantity(item); } @@ -652,7 +667,10 @@ mod tests { use smallvec::smallvec; use super::*; - use crate::envelope::{AttachmentType, ContentType}; + use crate::{ + envelope::{AttachmentType, ContentType, SourceQuantities}, + extractors::RequestMeta, + }; #[test] fn test_format_rate_limits() { @@ -1237,4 +1255,33 @@ mod tests { mock.assert_call(DataCategory::TransactionIndexed, Some(1)); mock.assert_call(DataCategory::Attachment, None); } + + #[test] + fn test_source_quantity_for_total_quantity() { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + let request_meta = RequestMeta::new(dsn); + + let mut envelope = Envelope::from_request(None, request_meta); + + let mut item = Item::new(ItemType::MetricBuckets); + item.set_source_quantities(SourceQuantities { + transactions: 5, + profiles: 2, + }); + envelope.add_item(item); + + let mut item = Item::new(ItemType::MetricBuckets); + item.set_source_quantities(SourceQuantities { + transactions: 2, + profiles: 0, + }); + envelope.add_item(item); + + let summary = EnvelopeSummary::compute(&envelope); + + assert_eq!(summary.profile_quantity, 2); + assert_eq!(summary.secondary_transaction_quantity, 7); + } } From 5409f861450218afc40ce5c91ef91894095e2f18 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 23 Nov 2023 09:47:26 +0100 Subject: [PATCH 03/10] review: fixes mainly typos --- relay-metrics/src/aggregator.rs | 10 ++--- relay-metrics/src/view.rs | 38 +++++++++++-------- relay-server/src/utils/metrics_rate_limits.rs | 12 ++++++ relay-server/src/utils/rate_limits.rs | 2 +- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 1d11b7b016..991b6c5ca8 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -67,8 +67,8 @@ struct BucketKey { } impl BucketKey { - // Create a 64-bit hash of the bucket key using FnvHasher. - // This is used for partition key computation and statsd logging. + /// Creates a 64-bit hash of the bucket key using FnvHasher. + /// This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { BucketKeyRef { project_key: self.project_key, @@ -109,8 +109,8 @@ struct BucketKeyRef<'a> { } impl<'a> BucketKeyRef<'a> { - // Create a 64-bit hash of the bucket key using FnvHasher. - // This is used for partition key computation and statsd logging. + /// Creates a 64-bit hash of the bucket key using FnvHasher. + /// This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { let mut hasher = FnvHasher::default(); std::hash::Hash::hash(self, &mut hasher); @@ -886,7 +886,7 @@ impl fmt::Debug for Aggregator { } } -/// Split buckets into N logical partitions, determined by the bucket key. +/// Splits buckets into N logical partitions, determined by the bucket key. pub fn partition_buckets( project_key: ProjectKey, buckets: Vec, diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs index 6b6e1e5c06..314d65b283 100644 --- a/relay-metrics/src/view.rs +++ b/relay-metrics/src/view.rs @@ -11,14 +11,14 @@ use crate::bucket::Bucket; use crate::BucketValue; /// The fraction of size passed to [`BucketsView::by_size()`] at which buckets will be split. A value of -/// `2` means that all buckets smaller than half of max_flush_bytes will be moved in their entirety, +/// `2` means that all buckets smaller than half of `metrics_max_batch_size` will be moved in their entirety, /// and buckets larger will be split up. const BUCKET_SPLIT_FACTOR: usize = 32; /// The average size of values when serialized. const AVG_VALUE_SIZE: usize = 8; -/// Just an internal type representing an index into a slice of buckets. +/// An internal type representing an index into a slice of buckets. /// /// Note: the meaning of fields depends on the context of the index. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -45,16 +45,23 @@ struct Index { /// Iterating over a [`BucketsView`] yields [`BucketView`] items, /// only the first and last elements may be partial buckets. /// +/// In the above example `View 1` has a partial bucket at the end and +/// `View 2` has a partial bucket in the beginning. +/// /// Using the above example, iterating over `View 1` yields the buckets: /// `[C:1], [C:12], [D:0, 1, 2, 3]`. pub struct BucketsView<'a> { /// Source slice of buckets. inner: &'a [Bucket], - /// Start index, slice index indicates bucket, - /// bucket index indicates offset in the selected bucket. + /// Start index. + /// + /// - Slice index indicates bucket. + /// - Bucket index indicates offset in the selected bucket. start: Index, - /// End index, slice index indicates exclusive end, - /// bucket index, indicates offset into the *next* bucket past the end. + /// End index. + /// + /// - Slice index indicates exclusive end. + /// - Bucket index, indicates offset into the *next* bucket past the end. end: Index, } @@ -88,7 +95,7 @@ impl<'a> BucketsView<'a> { self.len() == 0 } - /// Iterater over all buckets in the view. + /// Iterator over all buckets in the view. pub fn iter(&self) -> impl Iterator> { BucketsViewIter::new(self.inner, self.start, self.end) } @@ -147,7 +154,7 @@ impl<'a> Iterator for BucketsViewIter<'a> { // This doesn't overflow because the last bucket in the inner slice will always have a 0 bucket index. let next = &self.inner[self.current.slice]; - // Choose the bucket end, this will always the full bucket except if it is the last. + // Choose the bucket end, this will always be the full bucket except if it is the last. let end = match self.current.slice == self.end.slice { false => next.value.len(), true => self.end.bucket, @@ -156,7 +163,7 @@ impl<'a> Iterator for BucketsViewIter<'a> { let next = BucketView::new(next).select(self.current.bucket..end); // Even if the current Bucket was partial, the next one will be full, - // except it is the last one. + // except if it is the last one. self.current = Index { slice: self.current.slice + 1, bucket: 0, @@ -176,7 +183,7 @@ struct BucketsViewBySizeIter<'a> { current: Index, /// Terminal position. end: Index, - /// Maximum size of in bytes of each slice. + /// Maximum size in bytes of each slice. max_size_bytes: usize, } @@ -296,6 +303,7 @@ pub struct BucketView<'a> { impl<'a> BucketView<'a> { /// Creates a new bucket view of a bucket. + /// /// The resulting view contains the entire bucket. pub fn new(bucket: &'a Bucket) -> Self { Self { @@ -367,7 +375,7 @@ impl<'a> BucketView<'a> { /// /// Function panics when: /// * the passed range is not contained in the current view. - /// * trying to split a counter or gauage bucket + /// * trying to split a counter or gauge bucket. pub fn select(mut self, range: Range) -> Self { assert!( range.start >= self.range.start, @@ -603,17 +611,17 @@ enum SplitDecision { /// This is an approximate function. The bucket is not actually serialized, but rather its /// footprint is estimated through the number of data points contained. See /// `estimate_size` for more information. -fn split_at(bucket: &BucketView<'_>, size: usize, min_split_size: usize) -> SplitDecision { +fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision { // If there's enough space for the entire bucket, do not perform a split. let bucket_size = estimate_size(bucket); - if size >= bucket_size { + if max_size >= bucket_size { return SplitDecision::BucketFits(bucket_size); } // If the bucket key can't even fit into the remaining length, move the entire bucket into // the right-hand side. let own_size = estimate_base_size(bucket); - if size < (own_size + AVG_VALUE_SIZE) { + if max_size < (own_size + AVG_VALUE_SIZE) { // split_at must not be zero return SplitDecision::MoveToNextBatch; } @@ -624,7 +632,7 @@ fn split_at(bucket: &BucketView<'_>, size: usize, min_split_size: usize) -> Spli // Perform a split with the remaining space after adding the key. We assume an average // length of 8 bytes per value and compute the number of items fitting into the left side. - let split_at = (size - own_size) / AVG_VALUE_SIZE; + let split_at = (max_size - own_size) / AVG_VALUE_SIZE; SplitDecision::Split(split_at) } diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index f70711e432..a539ba7f27 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -73,13 +73,21 @@ pub fn extract_transaction_count( Some(TransactionCount { count, has_profile }) } +/// Wether to extract transaction and profile count based on the usage +/// or duration metric. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ExtractionMode { + /// Use the usage count metric. Usage, + /// Use the duration distribution metric. Duration, } impl ExtractionMode { + /// Utility function for creating an [`ExtractionMode`]. + /// + /// Returns [`ExtractionMode::Usage`] when passed `true`, + /// [`ExtractionMode::Duration`] otherwise. pub fn from_usage(usage: bool) -> Self { if usage { Self::Usage @@ -89,9 +97,13 @@ impl ExtractionMode { } } +/// Return value of [`extract_transaction_count`], containing the extracted +/// count of transactions and wether they have associated profiles. #[derive(Debug, Clone, Copy)] pub struct TransactionCount { + /// Number of transactions. pub count: usize, + /// Whether the transactions have associated profiles. pub has_profile: bool, } diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 6b63dbe78f..e3c8ec5bcb 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -139,7 +139,7 @@ pub struct EnvelopeSummary { /// The number of monitor check-ins. pub checkin_quantity: usize, - /// Secondary number of of transactions. + /// Secondary number of transactions. /// /// This is 0 for envelopes which contain a transaction, /// only secondary transaction quantity should be tracked here, From 9b871c6b1c428a8f311288c250b1db6b65847b99 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 23 Nov 2023 09:57:02 +0100 Subject: [PATCH 04/10] review: move changelog --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 060f4a00e4..d5337f3871 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682)) + ## 23.11.1 **Features**: @@ -52,7 +56,6 @@ - Scrub all DB Core Data spans differently. ([#2686](https://github.com/getsentry/relay/pull/2686)) - Support generic metrics extraction version 2. ([#2692](https://github.com/getsentry/relay/pull/2692)) - Emit error on continued project config fetch failures after a time interval. ([#2700](https://github.com/getsentry/relay/pull/2700)) -- Partition and split metric buckets just before sending. ([#2682](https://github.com/getsentry/relay/pull/2682)) ## 23.10.1 From 78053ca805f91a74c31854df05777736166beba0 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 23 Nov 2023 10:48:56 +0100 Subject: [PATCH 05/10] don't panic --- relay-metrics/src/view.rs | 121 +++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/relay-metrics/src/view.rs b/relay-metrics/src/view.rs index 314d65b283..78c0513be2 100644 --- a/relay-metrics/src/view.rs +++ b/relay-metrics/src/view.rs @@ -128,7 +128,7 @@ struct BucketsViewIter<'a> { impl<'a> BucketsViewIter<'a> { /// Creates a new iterator. /// - /// **Notes:** if `start` and `end` are not valid indices the iterator may panic. + /// Start and end must be valid indices or iterator may end early. fn new(inner: &'a [Bucket], start: Index, end: Index) -> Self { Self { inner, @@ -152,7 +152,11 @@ impl<'a> Iterator for BucketsViewIter<'a> { } // This doesn't overflow because the last bucket in the inner slice will always have a 0 bucket index. - let next = &self.inner[self.current.slice]; + debug_assert!( + self.current.slice < self.inner.len(), + "invariant violated, iterator pointing past the slice" + ); + let next = self.inner.get(self.current.slice)?; // Choose the bucket end, this will always be the full bucket except if it is the last. let end = match self.current.slice == self.end.slice { @@ -161,6 +165,11 @@ impl<'a> Iterator for BucketsViewIter<'a> { }; let next = BucketView::new(next).select(self.current.bucket..end); + let Some(next) = next else { + debug_assert!(false, "invariant violated, invalid bucket split"); + relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets."); + return None; + }; // Even if the current Bucket was partial, the next one will be full, // except if it is the last one. @@ -190,7 +199,7 @@ struct BucketsViewBySizeIter<'a> { impl<'a> BucketsViewBySizeIter<'a> { /// Creates a new iterator. /// - /// **Notes:** if `start` and `end` are not valid indices the iterator may panic. + /// Start and end must be valid indices or iterator may end early. fn new(inner: &'a [Bucket], start: Index, end: Index, max_size_bytes: usize) -> Self { Self { inner, @@ -217,10 +226,22 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> { } // Select next potential bucket, - // this won't overflow because `end` will never go past the slice and + // this should never overflow because `end` will never go past the slice and // we just validated that current is constrained by end. - let bucket = &self.inner[self.current.slice]; + debug_assert!( + self.current.slice < self.inner.len(), + "invariant violated, iterator pointing past the slice" + ); + let bucket = self.inner.get(self.current.slice)?; + + // Selection should never fail, because either we select the entire range, + // or we previously already split the bucket, which means this range is good. let bucket = BucketView::new(bucket).select(self.current.bucket..bucket.value.len()); + let Some(bucket) = bucket else { + debug_assert!(false, "internal invariant violated, invalid bucket split"); + relay_log::error!("Internal invariant violated, invalid bucket split, dropping all remaining buckets."); + return None; + }; match split_at( &bucket, @@ -371,24 +392,21 @@ impl<'a> BucketView<'a> { /// Selects a sub-view of the current view. /// - /// # Panics - /// - /// Function panics when: - /// * the passed range is not contained in the current view. - /// * trying to split a counter or gauge bucket. - pub fn select(mut self, range: Range) -> Self { - assert!( - range.start >= self.range.start, - "range not contained in view" - ); - assert!(range.end <= self.range.end, "range not contained in view"); - assert!( - self.can_split() || range == (0..self.inner.value.len()), - "attempt to split unsplittable bucket" - ); + /// Returns `None` when: + /// - the passed range is not contained in the current view. + /// - trying to split a counter or gauge bucket. + pub fn select(mut self, range: Range) -> Option { + if range.start < self.range.start || range.end > self.range.end { + return None; + } + + // Make sure the bucket can be split, or the entire bucket range is passed. + if !self.can_split() && range != (0..self.inner.value.len()) { + return None; + } self.range = range; - self + Some(self) } /// Whether the bucket can be split into multiple. @@ -665,7 +683,7 @@ mod tests { fn test_bucket_view_select_counter() { let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap(); - let view = BucketView::new(&bucket).select(0..1); + let view = BucketView::new(&bucket).select(0..1).unwrap(); assert_eq!(view.len(), 1); assert_eq!( serde_json::to_string(&view).unwrap(), @@ -677,30 +695,25 @@ mod tests { fn test_bucket_view_select_invalid_counter() { let bucket = Bucket::parse(b"b0:1|c", UnixTimestamp::from_secs(5000)).unwrap(); - let select_fail = |range| { - let view = BucketView::new(&bucket); - std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") - }; - - select_fail(0..0); - select_fail(0..2); - select_fail(1..1); + assert!(BucketView::new(&bucket).select(0..0).is_none()); + assert!(BucketView::new(&bucket).select(0..2).is_none()); + assert!(BucketView::new(&bucket).select(1..1).is_none()); } #[test] fn test_bucket_view_select_distribution() { let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap(); - let view = BucketView::new(&bucket).select(0..3); + let view = BucketView::new(&bucket).select(0..3).unwrap(); assert_eq!(view.len(), 3); assert_eq!( view.value(), BucketViewValue::Distribution(&[1.0, 2.0, 3.0]) ); - let view = BucketView::new(&bucket).select(1..3); + let view = BucketView::new(&bucket).select(1..3).unwrap(); assert_eq!(view.len(), 2); assert_eq!(view.value(), BucketViewValue::Distribution(&[2.0, 3.0])); - let view = BucketView::new(&bucket).select(1..5); + let view = BucketView::new(&bucket).select(1..5).unwrap(); assert_eq!(view.len(), 4); assert_eq!( view.value(), @@ -712,14 +725,9 @@ mod tests { fn test_bucket_view_select_invalid_distribution() { let bucket = Bucket::parse(b"b2:1:2:3:5:5|d", UnixTimestamp::from_secs(5000)).unwrap(); - let select_fail = |range| { - let view = BucketView::new(&bucket); - std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") - }; - - select_fail(0..6); - select_fail(5..6); - select_fail(77..99); + assert!(BucketView::new(&bucket).select(0..6).is_none()); + assert!(BucketView::new(&bucket).select(5..6).is_none()); + assert!(BucketView::new(&bucket).select(77..99).is_none()); } #[test] @@ -727,13 +735,13 @@ mod tests { let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap(); let s = [42, 75].into(); - let view = BucketView::new(&bucket).select(0..2); + let view = BucketView::new(&bucket).select(0..2).unwrap(); assert_eq!(view.len(), 2); assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..2))); - let view = BucketView::new(&bucket).select(1..2); + let view = BucketView::new(&bucket).select(1..2).unwrap(); assert_eq!(view.len(), 1); assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 1..2))); - let view = BucketView::new(&bucket).select(0..1); + let view = BucketView::new(&bucket).select(0..1).unwrap(); assert_eq!(view.len(), 1); assert_eq!(view.value(), BucketViewValue::Set(SetView::new(&s, 0..1))); } @@ -742,13 +750,9 @@ mod tests { fn test_bucket_view_select_invalid_set() { let bucket = Bucket::parse(b"b3:42:75|s", UnixTimestamp::from_secs(5000)).unwrap(); - let select_fail = |range| { - let view = BucketView::new(&bucket); - std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") - }; - - select_fail(0..3); - select_fail(2..5); + assert!(BucketView::new(&bucket).select(0..3).is_none()); + assert!(BucketView::new(&bucket).select(2..5).is_none()); + assert!(BucketView::new(&bucket).select(77..99).is_none()); } #[test] @@ -756,7 +760,7 @@ mod tests { let bucket = Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap(); - let view = BucketView::new(&bucket).select(0..5); + let view = BucketView::new(&bucket).select(0..5).unwrap(); assert_eq!(view.len(), 5); assert_eq!( view.value(), @@ -775,15 +779,10 @@ mod tests { let bucket = Bucket::parse(b"b4:25:17:42:220:85|g", UnixTimestamp::from_secs(5000)).unwrap(); - let select_fail = |range| { - let view = BucketView::new(&bucket); - std::panic::catch_unwind(|| view.select(range)).expect_err("expected to panic") - }; - - select_fail(0..1); - select_fail(0..4); - select_fail(5..5); - select_fail(5..6); + assert!(BucketView::new(&bucket).select(0..1).is_none()); + assert!(BucketView::new(&bucket).select(0..4).is_none()); + assert!(BucketView::new(&bucket).select(5..5).is_none()); + assert!(BucketView::new(&bucket).select(5..6).is_none()); } #[test] From cdbe9650d8c1cada2795a4cc0e69b93e55e516c6 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 23 Nov 2023 11:08:06 +0100 Subject: [PATCH 06/10] don't go through envelope manager, default 100kib batch size processing --- relay-config/src/config.rs | 19 ++++---- relay-server/src/actors/envelopes.rs | 60 +----------------------- relay-server/src/actors/processor.rs | 23 ++++++--- relay-server/src/actors/project.rs | 7 +-- relay-server/src/actors/project_cache.rs | 4 +- 5 files changed, 33 insertions(+), 80 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 01bb427243..e45b2b5aab 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -928,6 +928,10 @@ fn default_metrics_max_batch_size() -> ByteSize { ByteSize::mebibytes(5) } +fn default_metrics_max_batch_size_processing() -> ByteSize { + ByteSize::kibibytes(100) +} + /// Controls Sentry-internal event processing. #[derive(Serialize, Deserialize, Debug)] pub struct Processing { @@ -997,12 +1001,9 @@ pub struct Processing { /// left to hard limits. #[serde(default = "default_metrics_max_batch_size")] pub metrics_max_batch_size: ByteSize, - /// The approximate maximum number of bytes submitted in one metrics batch for processing - /// relays. - /// - /// Overrides [`Self::metrics_max_batch_size`] when specified on processing relays. - #[serde(default)] - pub metrics_max_batch_size_processing: Option, + /// The approximate maximum number of bytes submitted in one metrics batch on processing relays. + #[serde(default = "default_metrics_max_batch_size_processing")] + pub metrics_max_batch_size_processing: ByteSize, } impl Default for Processing { @@ -1023,7 +1024,7 @@ impl Default for Processing { max_rate_limit: default_max_rate_limit(), metrics_partitions: None, metrics_max_batch_size: default_metrics_max_batch_size(), - metrics_max_batch_size_processing: None, + metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(), } } } @@ -2081,9 +2082,7 @@ impl Config { self.values .processing .metrics_max_batch_size_processing - .as_ref() - .map(|s| s.as_bytes()) - .unwrap_or_else(|| self.metrics_max_batch_size_bytes()) + .as_bytes() } /// Default prefix to use when looking up project configs in Redis. This is only done when diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 9c7cb9c2c5..d09dd1ef01 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -8,7 +8,6 @@ use chrono::Utc; use relay_base_schema::project::ProjectKey; use relay_config::{Config, HttpEncoding}; use relay_event_schema::protocol::ClientReport; -use relay_metrics::Bucket; use relay_quotas::Scoping; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse}; @@ -27,9 +26,7 @@ use crate::envelope::{self, ContentType, Envelope, EnvelopeError, Item, ItemType use crate::extractors::{PartialDsn, RequestMeta}; use crate::http::{HttpError, Request, RequestBuilder, Response}; use crate::statsd::RelayHistograms; -use crate::utils::{ExtractionMode, ManagedEnvelope}; - -use super::processor::EncodeMetrics; +use crate::utils::ManagedEnvelope; /// Error created while handling [`SendEnvelope`]. #[derive(Debug, thiserror::Error)] @@ -139,26 +136,11 @@ pub struct SendClientReports { pub scoping: Scoping, } -/// Sends a batch of pre-aggregated metrics to the upstream or Kafka. -/// -/// Responds with `Err` if there was an error sending some or all of the buckets, containing the -/// failed buckets. -#[derive(Debug)] -pub struct SendMetrics { - /// The pre-aggregated metric buckets. - pub buckets: Vec, - /// Scoping information for the metrics. - pub scoping: Scoping, - /// Transaction extraction mode. - pub extraction_mode: ExtractionMode, -} - /// Dispatch service for generating and submitting Envelopes. #[derive(Debug)] pub enum EnvelopeManager { SubmitEnvelope(Box), SendClientReports(SendClientReports), - SendMetrics(SendMetrics), } impl relay_system::Interface for EnvelopeManager {} @@ -179,14 +161,6 @@ impl FromMessage for EnvelopeManager { } } -impl FromMessage for EnvelopeManager { - type Response = NoResponse; - - fn from_message(message: SendMetrics, _: ()) -> Self { - Self::SendMetrics(message) - } -} - /// Service implementing the [`EnvelopeManager`] interface. /// /// This service will produce envelopes to one the following backends: @@ -330,35 +304,6 @@ impl EnvelopeManagerService { } } - async fn handle_send_metrics(&self, message: SendMetrics) { - let SendMetrics { - buckets, - scoping, - extraction_mode, - } = message; - - #[allow(unused_mut)] - let mut partitions = self.config.metrics_partitions(); - #[allow(unused_mut)] - let mut max_batch_size_bytes = self.config.metrics_max_batch_size_bytes(); - - #[cfg(feature = "processing")] - if self.store_forwarder.is_some() { - // Partitioning on processing relays does not make sense, they end up all - // in the same Kafka topic anyways and the partition key is ignored. - partitions = None; - max_batch_size_bytes = self.config.metrics_max_batch_size_bytes_processing(); - } - - self.enveloper_processor.send(EncodeMetrics { - buckets, - scoping, - extraction_mode, - max_batch_size_bytes, - partitions, - }); - } - async fn handle_send_client_reports(&self, message: SendClientReports) { let SendClientReports { client_reports, @@ -398,9 +343,6 @@ impl EnvelopeManagerService { EnvelopeManager::SendClientReports(message) => { self.handle_send_client_reports(message).await; } - EnvelopeManager::SendMetrics(message) => { - self.handle_send_metrics(message).await; - } } } } diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 01c4c01e3d..307185cf21 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -498,10 +498,6 @@ pub struct EncodeMetrics { pub scoping: Scoping, /// Transaction metrics extraction mode. pub extraction_mode: ExtractionMode, - /// Approximate size in bytes to batch buckets. - pub max_batch_size_bytes: usize, - /// Amount of logical partitions for the buckets. - pub partitions: Option, } /// Applies rate limits to metrics buckets and forwards them to the envelope manager. @@ -3108,11 +3104,26 @@ impl EnvelopeProcessorService { let EncodeMetrics { buckets, scoping, - max_batch_size_bytes, extraction_mode, - partitions, } = message; + #[cfg(not(feature = "processing"))] + let (partitions, max_batch_size_bytes) = { + ( + self.inner.config.metrics_partitions(), + self.inner.config.metrics_max_batch_size_bytes(), + ) + }; + #[cfg(feature = "processing")] + let (partitions, max_batch_size_bytes) = { + // Partitioning on processing relays does not make sense, they end up all + // in the same Kafka topic anyways and the partition key is ignored. + ( + None, + self.inner.config.metrics_max_batch_size_bytes_processing(), + ) + }; + let upstream = self.inner.config.upstream_descriptor(); let dsn = PartialDsn { scheme: upstream.scheme(), diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 7cfbd61584..bdd9b46c8e 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -19,7 +19,6 @@ use smallvec::SmallVec; use tokio::time::Instant; use url::Url; -use crate::actors::envelopes::{EnvelopeManager, SendMetrics}; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::actors::processor::EnvelopeProcessor; #[cfg(feature = "processing")] @@ -32,6 +31,8 @@ use crate::utils::{ EnvelopeLimiter, ExtractionMode, ManagedEnvelope, MetricsLimiter, RetryBackoff, }; +use super::processor::EncodeMetrics; + /// The expiry status of a project state. Return value of [`ProjectState::check_expiry`]. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] enum Expiry { @@ -1020,7 +1021,7 @@ impl Project { pub fn flush_buckets( &mut self, project_cache: Addr, - envelope_manager: Addr, + envelope_processor: Addr, buckets: Vec, ) { let Some(project_state) = self.get_cached_state(project_cache, false) else { @@ -1043,7 +1044,7 @@ impl Project { let extraction_mode = ExtractionMode::from_usage(usage); if !buckets.is_empty() { - envelope_manager.send(SendMetrics { + envelope_processor.send(EncodeMetrics { buckets, scoping, extraction_mode, diff --git a/relay-server/src/actors/project_cache.rs b/relay-server/src/actors/project_cache.rs index 8fc5663e91..3043ad5d95 100644 --- a/relay-server/src/actors/project_cache.rs +++ b/relay-server/src/actors/project_cache.rs @@ -858,11 +858,11 @@ impl ProjectCacheBroker { } fn handle_flush_buckets(&mut self, message: FlushBuckets) { - let envelope_manager = self.services.envelope_manager.clone(); + let envelope_processor = self.services.envelope_processor.clone(); let project_cache = self.services.project_cache.clone(); self.get_or_create_project(message.project_key) - .flush_buckets(project_cache, envelope_manager, message.buckets); + .flush_buckets(project_cache, envelope_processor, message.buckets); } fn handle_buffer_index(&mut self, message: UpdateBufferIndex) { From 550cd3954d84f522777ba99e46de4aa69fa0e359 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 23 Nov 2023 13:05:48 +0100 Subject: [PATCH 07/10] fix test, config value --- tests/integration/test_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index e19f8986e5..3bdf6e18e8 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -172,7 +172,7 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve relay_config = { "processing": { "max_session_secs_in_past": forever, - "metrics_max_batch_size": max_batch_size, + "metrics_max_batch_size_processing": max_batch_size, }, "aggregator": { "bucket_interval": 1, From 2509e5b9a988f487da4aa8ffb901c117844a1a22 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 27 Nov 2023 13:10:32 +0100 Subject: [PATCH 08/10] correctly check for processing via the config --- relay-server/src/actors/processor.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/relay-server/src/actors/processor.rs b/relay-server/src/actors/processor.rs index 307185cf21..b2bcf30673 100644 --- a/relay-server/src/actors/processor.rs +++ b/relay-server/src/actors/processor.rs @@ -3107,21 +3107,18 @@ impl EnvelopeProcessorService { extraction_mode, } = message; - #[cfg(not(feature = "processing"))] - let (partitions, max_batch_size_bytes) = { - ( - self.inner.config.metrics_partitions(), - self.inner.config.metrics_max_batch_size_bytes(), - ) - }; - #[cfg(feature = "processing")] - let (partitions, max_batch_size_bytes) = { + let (partitions, max_batch_size_bytes) = if self.inner.config.processing_enabled() { // Partitioning on processing relays does not make sense, they end up all // in the same Kafka topic anyways and the partition key is ignored. ( None, self.inner.config.metrics_max_batch_size_bytes_processing(), ) + } else { + ( + self.inner.config.metrics_partitions(), + self.inner.config.metrics_max_batch_size_bytes(), + ) }; let upstream = self.inner.config.upstream_descriptor(); From a03d9c1d615780ac4f94b3d2b8f3e53e2d10fc23 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Mon, 27 Nov 2023 13:24:02 +0100 Subject: [PATCH 09/10] mini doc fixes --- relay-metrics/src/aggregator.rs | 2 ++ relay-server/src/utils/metrics_rate_limits.rs | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/relay-metrics/src/aggregator.rs b/relay-metrics/src/aggregator.rs index 991b6c5ca8..209b801c00 100644 --- a/relay-metrics/src/aggregator.rs +++ b/relay-metrics/src/aggregator.rs @@ -68,6 +68,7 @@ struct BucketKey { impl BucketKey { /// Creates a 64-bit hash of the bucket key using FnvHasher. + /// /// This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { BucketKeyRef { @@ -110,6 +111,7 @@ struct BucketKeyRef<'a> { impl<'a> BucketKeyRef<'a> { /// Creates a 64-bit hash of the bucket key using FnvHasher. + /// /// This is used for partition key computation and statsd logging. fn hash64(&self) -> u64 { let mut hasher = FnvHasher::default(); diff --git a/relay-server/src/utils/metrics_rate_limits.rs b/relay-server/src/utils/metrics_rate_limits.rs index a539ba7f27..c186e1702d 100644 --- a/relay-server/src/utils/metrics_rate_limits.rs +++ b/relay-server/src/utils/metrics_rate_limits.rs @@ -73,8 +73,7 @@ pub fn extract_transaction_count( Some(TransactionCount { count, has_profile }) } -/// Wether to extract transaction and profile count based on the usage -/// or duration metric. +/// Wether to extract transaction and profile count based on the usage or duration metric. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ExtractionMode { /// Use the usage count metric. From 1f4ba8f04ac5b01a7f569bab20a36715edb76f8e Mon Sep 17 00:00:00 2001 From: David Herberth Date: Tue, 28 Nov 2023 11:10:30 +0100 Subject: [PATCH 10/10] fix config in test --- tests/integration/test_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index 3bdf6e18e8..e19f8986e5 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -172,7 +172,7 @@ def test_metrics_max_batch_size(mini_sentry, relay, max_batch_size, expected_eve relay_config = { "processing": { "max_session_secs_in_past": forever, - "metrics_max_batch_size_processing": max_batch_size, + "metrics_max_batch_size": max_batch_size, }, "aggregator": { "bucket_interval": 1,