Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(metrics): Partition and split metrics buckets just before sending #2682

Merged
merged 11 commits into from
Nov 28, 2023
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

- Partition and split metric buckets just before sending. Log outcomes for metrics. ([#2682](https://github.com/getsentry/relay/pull/2682))

## 23.11.1

**Features**:
Expand Down
46 changes: 46 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,14 @@ fn default_max_rate_limit() -> Option<u32> {
Some(300) // 5 minutes
}

fn default_metrics_max_batch_size() -> ByteSize {
ByteSize::mebibytes(5)
}

fn default_metrics_max_batch_size_processing() -> ByteSize {
ByteSize::kibibytes(100)
}

/// Controls Sentry-internal event processing.
#[derive(Serialize, Deserialize, Debug)]
pub struct Processing {
Expand Down Expand Up @@ -979,6 +987,23 @@ pub struct Processing {
/// Maximum rate limit to report to clients.
#[serde(default = "default_max_rate_limit")]
pub max_rate_limit: Option<u32>,
/// The number of logical partitions that can receive flushed buckets.
///
/// If set, buckets are partitioned by (bucket key % partitions), and routed
/// by setting the header `X-Sentry-Relay-Shard`.
#[serde(default)]
pub metrics_partitions: Option<u64>,
/// The approximate maximum number of bytes submitted in one metrics batch.
///
/// This controls how big flushed batches of buckets get, depending on the number of buckets,
/// the cumulative length of their keys, and the number of raw values. Since final serialization
/// adds some additional overhead, this number is approxmate and some safety margin should be
/// left to hard limits.
#[serde(default = "default_metrics_max_batch_size")]
pub metrics_max_batch_size: ByteSize,
/// The approximate maximum number of bytes submitted in one metrics batch on processing relays.
#[serde(default = "default_metrics_max_batch_size_processing")]
pub metrics_max_batch_size_processing: ByteSize,
}

impl Default for Processing {
Expand All @@ -997,6 +1022,9 @@ impl Default for Processing {
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
max_rate_limit: default_max_rate_limit(),
metrics_partitions: None,
metrics_max_batch_size: default_metrics_max_batch_size(),
metrics_max_batch_size_processing: default_metrics_max_batch_size_processing(),
}
}
}
Expand Down Expand Up @@ -2039,6 +2067,24 @@ impl Config {
self.values.processing.attachment_chunk_size.as_bytes()
}

/// Amount of metric partitions.
pub fn metrics_partitions(&self) -> Option<u64> {
self.values.processing.metrics_partitions
}

/// Maximum metrics batch size in bytes.
pub fn metrics_max_batch_size_bytes(&self) -> usize {
self.values.processing.metrics_max_batch_size.as_bytes()
}

/// Maximum metrics batch size in bytes for processing relays.
pub fn metrics_max_batch_size_bytes_processing(&self) -> usize {
self.values
.processing
.metrics_max_batch_size_processing
.as_bytes()
}

/// Default prefix to use when looking up project configs in Redis. This is only done when
/// Relay is in processing mode.
pub fn projectconfig_cache_prefix(&self) -> &str {
Expand Down
114 changes: 65 additions & 49 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ struct BucketKey {
}

impl BucketKey {
// Create a 64-bit hash of the bucket key using FnvHasher.
// This is used for partition key computation and statsd logging.
/// Creates a 64-bit hash of the bucket key using FnvHasher.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
/// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
BucketKeyRef {
project_key: self.project_key,
timestamp: self.timestamp,
metric_name: &self.metric_name,
tags: &self.tags,
}
.hash64()
}

/// Estimates the number of bytes needed to encode the bucket key.
Expand All @@ -92,6 +96,28 @@ impl BucketKey {
}
}

/// Pendant to [`BucketKey`] for referenced data, not owned data.
///
/// This makes it possible to compute a hash for a [`Bucket`]
/// without destructing the bucket into a [`BucketKey`].
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
struct BucketKeyRef<'a> {
project_key: ProjectKey,
timestamp: UnixTimestamp,
metric_name: &'a str,
tags: &'a BTreeMap<String, String>,
}

impl<'a> BucketKeyRef<'a> {
/// Creates a 64-bit hash of the bucket key using FnvHasher.
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
/// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}
}

/// Estimates the number of bytes needed to encode the tags.
///
/// Note that this does not necessarily match the exact memory footprint of the tags,
Expand Down Expand Up @@ -348,14 +374,6 @@ impl Ord for QueuedBucket {
}
}

/// A Bucket and its hashed key.
/// This is cheaper to pass around than a (BucketKey, Bucket) pair.
pub struct HashedBucket {
// This is only public because pop_flush_buckets is used in benchmark.
hashed_key: u64,
bucket: Bucket,
}

#[derive(Default)]
struct CostTracker {
total_cost: usize,
Expand Down Expand Up @@ -513,7 +531,7 @@ impl Aggregator {
/// Pop and return the buckets that are eligible for flushing out according to bucket interval.
///
/// Note that this function is primarily intended for tests.
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<HashedBucket>> {
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
Expand All @@ -526,7 +544,7 @@ impl Aggregator {
aggregator = &self.name,
);

let mut buckets = HashMap::<ProjectKey, Vec<HashedBucket>>::new();
let mut buckets = HashMap::new();
let mut stats = HashMap::new();

relay_statsd::metric!(
Expand Down Expand Up @@ -558,11 +576,8 @@ impl Aggregator {

buckets
.entry(key.project_key)
.or_default()
.push(HashedBucket {
hashed_key: key.hash64(),
bucket,
});
.or_insert_with(Vec::new)
.push(bucket);

false
} else {
Expand Down Expand Up @@ -845,35 +860,6 @@ impl Aggregator {
}
}

/// Split buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
&self,
buckets: Vec<HashedBucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => {
return BTreeMap::from([(None, buckets.into_iter().map(|x| x.bucket).collect())]);
}
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket.bucket);

// Log the distribution of buckets over partition key
relay_statsd::metric!(
histogram(MetricHistograms::PartitionKeys) = partition_key as f64,
aggregator = &self.name,
);
}
partitions
}

/// Create a new aggregator.
pub fn new(config: AggregatorConfig) -> Self {
Self::named("default".to_owned(), config)
Expand All @@ -900,6 +886,36 @@ impl fmt::Debug for Aggregator {
}
}

/// Splits buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
Dav1dde marked this conversation as resolved.
Show resolved Hide resolved
project_key: ProjectKey,
buckets: Vec<Bucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => return BTreeMap::from([(None, buckets)]),
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let key = BucketKeyRef {
project_key,
timestamp: bucket.timestamp,
metric_name: &bucket.name,
tags: &bucket.tags,
};

let partition_key = key.hash64() % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket);

relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key);
}
partitions
}

#[cfg(test)]
mod tests {

Expand Down
Loading