Skip to content

Commit

Permalink
split and partition buckets in envelope processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Dav1dde committed Nov 8, 2023
1 parent 2a86991 commit 0ccce13
Show file tree
Hide file tree
Showing 16 changed files with 1,438 additions and 499 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ target

# Runtime files
/.relay/
spool.db

# Editors
/.idea/
Expand Down
105 changes: 56 additions & 49 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ impl BucketKey {
// Create a 64-bit hash of the bucket key using FnvHasher.
// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
BucketKeyRef {
project_key: self.project_key,
timestamp: self.timestamp,
metric_name: &self.metric_name,
tags: &self.tags,
}
.hash64()
}

/// Estimates the number of bytes needed to encode the bucket key.
Expand All @@ -90,6 +94,24 @@ impl BucketKey {
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
struct BucketKeyRef<'a> {
project_key: ProjectKey,
timestamp: UnixTimestamp,
metric_name: &'a str,
tags: &'a BTreeMap<String, String>,
}

impl<'a> BucketKeyRef<'a> {
// Create a 64-bit hash of the bucket key using FnvHasher.
// This is used for partition key computation and statsd logging.
fn hash64(&self) -> u64 {
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(self, &mut hasher);
hasher.finish()
}
}

/// Estimates the number of bytes needed to encode the tags.
///
/// Note that this does not necessarily match the exact memory footprint of the tags,
Expand Down Expand Up @@ -346,14 +368,6 @@ impl Ord for QueuedBucket {
}
}

/// A Bucket and its hashed key.
/// This is cheaper to pass around than a (BucketKey, Bucket) pair.
pub struct HashedBucket {
// This is only public because pop_flush_buckets is used in benchmark.
hashed_key: u64,
bucket: Bucket,
}

#[derive(Default)]
struct CostTracker {
total_cost: usize,
Expand Down Expand Up @@ -511,7 +525,7 @@ impl Aggregator {
/// Pop and return the buckets that are eligible for flushing out according to bucket interval.
///
/// Note that this function is primarily intended for tests.
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<HashedBucket>> {
pub fn pop_flush_buckets(&mut self, force: bool) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
Expand All @@ -524,7 +538,7 @@ impl Aggregator {
aggregator = &self.name,
);

let mut buckets = HashMap::<ProjectKey, Vec<HashedBucket>>::new();
let mut buckets = HashMap::<ProjectKey, Vec<_>>::new();
let mut stats = HashMap::new();

relay_statsd::metric!(
Expand Down Expand Up @@ -554,13 +568,7 @@ impl Aggregator {
tags: key.tags.clone(),
};

buckets
.entry(key.project_key)
.or_default()
.push(HashedBucket {
hashed_key: key.hash64(),
bucket,
});
buckets.entry(key.project_key).or_default().push(bucket);

false
} else {
Expand Down Expand Up @@ -843,35 +851,6 @@ impl Aggregator {
}
}

/// Split buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
&self,
buckets: Vec<HashedBucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => {
return BTreeMap::from([(None, buckets.into_iter().map(|x| x.bucket).collect())]);
}
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let partition_key = bucket.hashed_key % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket.bucket);

// Log the distribution of buckets over partition key
relay_statsd::metric!(
histogram(MetricHistograms::PartitionKeys) = partition_key as f64,
aggregator = &self.name,
);
}
partitions
}

/// Create a new aggregator.
pub fn new(config: AggregatorConfig) -> Self {
Self::named("default".to_owned(), config)
Expand All @@ -898,6 +877,34 @@ impl fmt::Debug for Aggregator {
}
}

/// Split buckets into N logical partitions, determined by the bucket key.
pub fn partition_buckets(
project_key: ProjectKey,
buckets: Vec<Bucket>,
flush_partitions: Option<u64>,
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
let flush_partitions = match flush_partitions {
None => return BTreeMap::from([(None, buckets)]),
Some(x) => x.max(1), // handle 0,
};
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
for bucket in buckets {
let key = BucketKeyRef {
project_key,
timestamp: bucket.timestamp,
metric_name: &bucket.name,
tags: &bucket.tags,
};

let partition_key = key.hash64() % flush_partitions;
partitions
.entry(Some(partition_key))
.or_default()
.push(bucket);
}
partitions
}

#[cfg(test)]
mod tests {

Expand Down
Loading

0 comments on commit 0ccce13

Please sign in to comment.