Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
split and partition buckets in envelope processor
Browse files Browse the repository at this point in the history
Dav1dde committed Nov 17, 2023
1 parent 59fe0c5 commit 3db0b08
Showing 18 changed files with 1,545 additions and 604 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
- Scrub all DB Core Data spans differently. ([#2686](https://github.com/getsentry/relay/pull/2686))
- Support generic metrics extraction version 2. ([#2692](https://github.com/getsentry/relay/pull/2692))
- Emit error on continued project config fetch failures after a time interval. ([#2700](https://github.com/getsentry/relay/pull/2700))
- Partition and split metric buckets just before sending. ([#2682](https://github.com/getsentry/relay/pull/2682))

## 23.10.1

47 changes: 47 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
@@ -913,6 +913,10 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
@@ -968,6 +972,26 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
#[serde(default)]
pub metrics_partitions: Option<u64>,
/// The approximate maximum number of bytes submitted in one metrics batch.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch for processing
/// relays.
///
/// Overrides [`Self::metrics_max_batch_size`] when specified on processing relays.
#[serde(default)]
pub metrics_max_batch_size_processing: Option<ByteSize>,
}

impl Default for Processing {
@@ -986,6 +1010,9 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: None,
}
}
}
@@ -2023,6 +2050,26 @@ impl Config {
self.values.processing.attachment_chunk_size.as_bytes()
}

/// Amount of metric partitions.
pub fn metrics_partitions(&self) -> Option<u64> {
self.values.processing.metrics_partitions
}

/// Maximum metrics batch size in bytes.
pub fn metrics_max_batch_size_bytes(&self) -> usize {
self.values.processing.metrics_max_batch_size.as_bytes()
}

/// Maximum metrics batch size in bytes for processing relays.
pub fn metrics_max_batch_size_bytes_processing(&self) -> usize {
self.values
.processing
.metrics_max_batch_size_processing
.as_ref()
.map(|s| s.as_bytes())
.unwrap_or_else(|| self.metrics_max_batch_size_bytes())
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
/// Relay is in processing mode.
pub fn projectconfig_cache_prefix(&self) -> &str {
110 changes: 63 additions & 47 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -70,9 +70,13 @@ impl BucketKey {
// Create a 64-bit hash of the bucket key using FnvHasher.
// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
BucketKeyRef {
project_key: self.project_key,
timestamp: self.timestamp,
metric_name: &self.metric_name,
tags: &self.tags,
}
.hash64()
}

/// Estimates the number of bytes needed to encode the bucket key.
@@ -92,6 +96,28 @@ impl BucketKey {
}
}

/// Pendant to [`BucketKey`] for referenced data, not owned data.
///
/// This makes it possible to compute a hash for a [`Bucket`]
/// without destructing the bucket into a [`BucketKey`].
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct BucketKeyRef<'a> {
project_key: ProjectKey,
timestamp: UnixTimestamp,
metric_name: &'a str,
tags: &'a BTreeMap<String, String>,
}

impl<'a> BucketKeyRef<'a> {
// Create a 64-bit hash of the bucket key using FnvHasher.
// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}
}

/// Estimates the number of bytes needed to encode the tags.
///
/// Note that this does not necessarily match the exact memory footprint of the tags,
@@ -348,14 +374,6 @@ impl Ord for QueuedBucket {
}
}

/// A Bucket and its hashed key.
/// This is cheaper to pass around than a (BucketKey, Bucket) pair.
pub struct HashedBucket {
// This is only public because pop_flush_buckets is used in benchmark.
hashed_key: u64,
bucket: Bucket,
}

#[derive(Default)]
struct CostTracker {
total_cost: usize,
@@ -513,7 +531,7 @@ impl Aggregator {
/// Pop and return the buckets that are eligible for flushing out according to bucket interval.
///
/// Note that this function is primarily intended for tests.
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<HashedBucket>> {
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
@@ -526,7 +544,7 @@ impl Aggregator {
aggregator = &self.name,
);

let mut buckets = HashMap::<ProjectKey, Vec<HashedBucket>>::new();
let mut buckets = HashMap::new();
let mut stats = HashMap::new();

relay_statsd::metric!(
@@ -558,11 +576,8 @@ impl Aggregator {

buckets
.entry(key.project_key)
.or_default()
.push(HashedBucket {
hashed_key: key.hash64(),
bucket,
});
.or_insert_with(Vec::new)
.push(bucket);

false
} else {
@@ -845,35 +860,6 @@ impl Aggregator {
}
}

/// Split buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
&self,
buckets: Vec<HashedBucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => {
return BTreeMap::from([(None, buckets.into_iter().map(|x| x.bucket).collect())]);
}
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket.bucket);

// Log the distribution of buckets over partition key
relay_statsd::metric!(
histogram(MetricHistograms::PartitionKeys) = partition_key as f64,
aggregator = &self.name,
);
}
partitions
}

/// Create a new aggregator.
pub fn new(config: AggregatorConfig) -> Self {
Self::named("default".to_owned(), config)
@@ -900,6 +886,36 @@ impl fmt::Debug for Aggregator {
}
}

/// Split buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
project_key: ProjectKey,
buckets: Vec<Bucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => return BTreeMap::from([(None, buckets)]),
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let key = BucketKeyRef {
project_key,
timestamp: bucket.timestamp,
metric_name: &bucket.name,
tags: &bucket.tags,
};

let partition_key = key.hash64() % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket);

relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key);
}
partitions
}

#[cfg(test)]
mod tests {

Loading

0 comments on commit 3db0b08

Please sign in to comment.