Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Track memory footprint of metrics buckets [INGEST-1132] #1284

Merged
merged 8 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 212 additions & 8 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,21 @@ impl BucketValue {
Self::Distribution(m) => m.internal_size(),
}
}

/// Estimates the number of bytes needed to encode the bucket.
/// Note that this does not necessarily match the exact memory footprint of the bucket,
/// because datastructures might have a memory overhead.
///
/// This is very similar to [`relative_size`], which can possibly be removed.
pub fn cost(&self) -> usize {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if cost is the best name for what we're modeling here. Open for suggestions.

match self {
Self::Counter(c) => std::mem::size_of_val(c),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we talked about tracking memory allocation, but thinking about it more I would actually prefer to hardcode numbers here. If we accidentally increase the struct size, i don't think this should immediately and implicitly be reflected in abuse limits. If we change abuse limits and how they are calculated I think it's probably better to do so explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Self::Set(s) => 4 * s.len(),
Self::Gauge(_) => std::mem::size_of::<GaugeValue>(),
// Distribution values are stored as maps of (f64, u32) pairs
Self::Distribution(m) => 12 * m.internal_size(),
}
}
}

impl From<MetricValue> for BucketValue {
Expand All @@ -537,7 +552,7 @@ impl From<MetricValue> for BucketValue {
///
/// Currently either a [`MetricValue`] or another `BucketValue`.
trait MergeValue: Into<BucketValue> {
/// Merges `self` into the given `bucket_value`.
/// Merges `self` into the given `bucket_value` and returns the additional cost for storing this value.
///
/// Aggregation is performed according to the rules documented in [`BucketValue`].
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError>;
Expand All @@ -546,11 +561,21 @@ trait MergeValue: Into<BucketValue> {
impl MergeValue for BucketValue {
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError> {
match (bucket_value, self) {
(BucketValue::Counter(lhs), BucketValue::Counter(rhs)) => *lhs += rhs,
(BucketValue::Distribution(lhs), BucketValue::Distribution(rhs)) => lhs.extend(&rhs),
(BucketValue::Set(lhs), BucketValue::Set(rhs)) => lhs.extend(rhs),
(BucketValue::Gauge(lhs), BucketValue::Gauge(rhs)) => lhs.merge(rhs),
_ => return Err(AggregateMetricsErrorKind::InvalidTypes.into()),
(BucketValue::Counter(lhs), BucketValue::Counter(rhs)) => {
*lhs += rhs;
}
(BucketValue::Distribution(lhs), BucketValue::Distribution(rhs)) => {
lhs.extend(&rhs);
}
(BucketValue::Set(lhs), BucketValue::Set(rhs)) => {
lhs.extend(rhs);
}
(BucketValue::Gauge(lhs), BucketValue::Gauge(rhs)) => {
lhs.merge(rhs);
}
_ => {
return Err(AggregateMetricsErrorKind::InvalidTypes.into());
}
}

Ok(())
Expand Down Expand Up @@ -1018,6 +1043,48 @@ enum AggregatorState {
ShuttingDown,
}

#[derive(Debug, Default)]
struct CostTracker {
total_cost: usize,
// Choosing a BTreeMap instead of a HashMap here, under the assumption that a BTreeMap
// is still more efficient for the number of project keys we store.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reasoning is up for discussion.

cost_per_project_key: BTreeMap<ProjectKey, usize>,
}

impl CostTracker {
fn add_cost(&mut self, project_key: ProjectKey, cost: usize) {
self.total_cost += cost;
let project_cost = self.cost_per_project_key.entry(project_key).or_insert(0);
*project_cost += cost;
}

fn subtract_cost(&mut self, project_key: ProjectKey, cost: usize) {
match self.cost_per_project_key.entry(project_key) {
btree_map::Entry::Vacant(_) => {
relay_log::error!(
"Trying to subtract cost for a project key that has not been tracked"
);
}
btree_map::Entry::Occupied(mut entry) => {
// Handle per-project cost:
let project_cost = entry.get_mut();
if cost > *project_cost {
relay_log::error!("Subtracting a project cost higher than what we tracked");
self.total_cost = self.total_cost.saturating_sub(*project_cost);
*project_cost = 0;
} else {
*project_cost -= cost;
self.total_cost = self.total_cost.saturating_sub(cost);
}
if *project_cost == 0 {
// Remove this project_key from the map
entry.remove();
}
}
};
}
}

/// A collector of [`Metric`] submissions.
///
/// # Aggregation
Expand Down Expand Up @@ -1074,6 +1141,7 @@ pub struct Aggregator {
buckets: HashMap<BucketKey, QueuedBucket>,
receiver: Recipient<FlushBuckets>,
state: AggregatorState,
cost_tracker: CostTracker,
}

impl Aggregator {
Expand All @@ -1087,6 +1155,7 @@ impl Aggregator {
buckets: HashMap::new(),
receiver,
state: AggregatorState::Running,
cost_tracker: CostTracker::default(),
}
}

Expand Down Expand Up @@ -1200,14 +1269,19 @@ impl Aggregator {

let key = Self::validate_bucket_key(key, &self.config)?;

let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
metric_type = entry.key().metric_type.as_str(),
metric_name = &entry.key().metric_name
);
value.merge_into(&mut entry.get_mut().value)?;
let bucket_value = &mut entry.get_mut().value;
let cost_before = bucket_value.cost();
value.merge_into(bucket_value)?;
let cost_after = bucket_value.cost();
added_cost = cost_after.saturating_sub(cost_before);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably optimize this by making merge_into return the actual cost delta, but I decided against it for the sake of simplicity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be simpler to just add the cost of the single value here and have merge_into return whether or not something was added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that could work.

}
Entry::Vacant(entry) => {
relay_statsd::metric!(
Expand All @@ -1222,10 +1296,14 @@ impl Aggregator {
);

let flush_at = self.config.get_flush_time(timestamp, project_key);
entry.insert(QueuedBucket::new(flush_at, value.into()));
let bucket = value.into();
added_cost = bucket.cost();
entry.insert(QueuedBucket::new(flush_at, bucket));
}
}

self.cost_tracker.add_cost(project_key, added_cost);

Ok(())
}

Expand Down Expand Up @@ -1305,10 +1383,12 @@ impl Aggregator {

relay_statsd::metric!(timer(MetricTimers::BucketsScanDuration), {
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;
self.buckets.retain(|key, entry| {
if force || entry.elapsed() {
// Take the value and leave a placeholder behind. It'll be removed right after.
let value = std::mem::replace(&mut entry.value, BucketValue::Counter(0.0));
cost_tracker.subtract_cost(key.project_key, value.cost());
let bucket = Bucket::from_parts(key.clone(), bucket_interval, value);
buckets.entry(key.project_key).or_default().push(bucket);
false
Expand All @@ -1318,6 +1398,12 @@ impl Aggregator {
});
});

// We only emit a statsd metric for the total cost on flush (and not when merging the buckets),
// assuming that this gives us more than enough data points.
relay_statsd::metric!(
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64
);

buckets
}

Expand Down Expand Up @@ -1883,6 +1969,24 @@ mod tests {
);
}

#[test]
fn test_bucket_value_cost() {
let counter = BucketValue::Counter(123.0);
assert_eq!(counter.cost(), 8);
let set = BucketValue::Set(vec![1, 2, 3, 4, 5].into_iter().collect());
assert_eq!(set.cost(), 20);
let distribution = BucketValue::Distribution(dist![1., 2., 3.]);
assert_eq!(distribution.cost(), 36);
let gauge = BucketValue::Gauge(GaugeValue {
max: 43.,
min: 42.,
sum: 85.,
last: 43.,
count: 2,
});
assert_eq!(gauge.cost(), 40);
}

#[test]
fn test_aggregator_merge_counters() {
relay_test::setup();
Expand Down Expand Up @@ -2059,6 +2163,106 @@ mod tests {
assert_eq!(aggregator.buckets.len(), 2);
}

#[test]
fn test_cost_tracker() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
let mut cost_tracker = CostTracker::default();
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"###);
cost_tracker.add_cost(project_key1, 100);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 100,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
},
}
"###);
cost_tracker.add_cost(project_key2, 200);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
// Unknown project: Will log error, but not crash
cost_tracker.subtract_cost(project_key3, 666);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
// Subtract too much: Will log error, but not crash
cost_tracker.subtract_cost(project_key1, 666);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 200,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
cost_tracker.subtract_cost(project_key2, 20);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 180,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 180,
},
}
"###);
cost_tracker.subtract_cost(project_key2, 180);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"###);
}

#[test]
fn test_aggregator_cost_tracking() {
// Make sure that the right cost is added / subtracted
let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(test_config(), receiver);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let mut metric = Metric {
name: "c:foo".to_owned(),
unit: MetricUnit::None,
value: MetricValue::Counter(42.),
timestamp: UnixTimestamp::from_secs(999994711),
tags: BTreeMap::new(),
};
for (metric_value, expected_total_cost) in [
(MetricValue::Counter(42.), 8),
(MetricValue::Counter(42.), 8),
(MetricValue::Set(123), 12), // 8 + 1*4
(MetricValue::Set(123), 12), // Same element in set, no change
(MetricValue::Set(456), 16), // Different element in set -> +4
(MetricValue::Distribution(1.0), 28), // 1 unique element -> +12
(MetricValue::Distribution(1.0), 28), // no new element
(MetricValue::Distribution(2.0), 40), // 1 new element -> +12
] {
metric.value = metric_value;
aggregator.insert(project_key, metric.clone()).unwrap();
assert_eq!(aggregator.cost_tracker.total_cost, expected_total_cost);
}
}

#[test]
fn test_flush_bucket() {
relay_test::setup();
Expand Down
3 changes: 3 additions & 0 deletions relay-metrics/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,15 @@ impl HistogramMetric for MetricHistograms {
pub enum MetricGauges {
/// The total number of metric buckets in Relay's metrics aggregator.
Buckets,
/// The total storage cost of metric buckets in Relay's metrics aggregator.
BucketsCost,
}

impl GaugeMetric for MetricGauges {
fn name(&self) -> &'static str {
match *self {
Self::Buckets => "metrics.buckets",
Self::BucketsCost => "metrics.bucket_cost",
}
}
}