Skip to content

Commit

Permalink
Buffered API for custom distribution (Rust only)
Browse files Browse the repository at this point in the history
  • Loading branch information
badboy committed Sep 17, 2024
1 parent 37f7a3b commit 4d58644
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 10 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Change Metrics Ping Scheduler to use daemon threads ([#2930](https://github.com/mozilla/glean/pull/2930))
* Dispatch metric recording for event, object and timing distribution on the task queue ([#2942](https://github.com/mozilla/glean/pull/2942))
* Rust
* **Experimental**: Buffered API for timing & memory distribution ([#2948](https://github.com/mozilla/glean/pull/2948))
* **Experimental**: Buffered API for timing, memory and custom distribution ([#2948](https://github.com/mozilla/glean/pull/2948))

# v61.0.0 (2024-08-21)

Expand Down
82 changes: 82 additions & 0 deletions glean-core/rlb/tests/custom_distribution_buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! This integration test should model how the RLB is used when embedded in another Rust application
//! (e.g. FOG/Firefox Desktop).
//!
//! We write a single test scenario per file to avoid any state keeping across runs
//! (different files run as different processes).
mod common;

use glean::ConfigurationBuilder;

mod metrics {
use glean::private::*;
use glean::{HistogramType, Lifetime};
use glean_core::CommonMetricData;
use once_cell::sync::Lazy;

#[allow(non_upper_case_globals)]
pub static measure: Lazy<CustomDistributionMetric> = Lazy::new(|| {
CustomDistributionMetric::new(
CommonMetricData {
name: "measure".into(),
category: "sample".into(),
send_in_pings: vec!["validation".into()],
lifetime: Lifetime::Ping,
disabled: false,
..Default::default()
},
0,
100,
100,
HistogramType::Linear,
)
});
}

/// Test scenario: Ensure buffered accumulation works.
#[test]
fn buffered_memory_distribution_works() {
common::enable_test_logging();

let dir = tempfile::tempdir().unwrap();
let tmpname = dir.path().to_path_buf();

let cfg = ConfigurationBuilder::new(true, tmpname, "firefox-desktop")
.with_server_endpoint("invalid-test-host")
.build();
common::initialize(cfg);

let mut buffer = metrics::measure.start_buffer();

for _ in 0..3 {
buffer.accumulate(10);
}

// Nothing accumulated while buffer uncommitted.
assert_eq!(None, metrics::measure.test_get_value(None));

// Commit buffer by dropping it.
drop(buffer);

let data = metrics::measure.test_get_value(None).unwrap();
assert_eq!(3, data.count);
assert_eq!(30, data.sum);

let mut buffer = metrics::measure.start_buffer();
for _ in 0..3 {
buffer.accumulate(10);
}
// Don't record any of this data into the metric.
buffer.abandon();

// Metric is unchanged.
let data = metrics::measure.test_get_value(None).unwrap();
assert_eq!(3, data.count);
assert_eq!(30, data.sum);

glean::shutdown(); // Cleanly shut down at the end of the test.
}
8 changes: 4 additions & 4 deletions glean-core/src/histogram/exponential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub struct PrecomputedExponential {
// Don't serialize the (potentially large) array of ranges, instead compute them on first
// access.
#[serde(skip)]
bucket_ranges: OnceCell<Vec<u64>>,
min: u64,
max: u64,
bucket_count: usize,
pub(crate) bucket_ranges: OnceCell<Vec<u64>>,
pub(crate) min: u64,
pub(crate) max: u64,
pub(crate) bucket_count: usize,
}

impl Bucketing for PrecomputedExponential {
Expand Down
8 changes: 4 additions & 4 deletions glean-core/src/histogram/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ pub struct PrecomputedLinear {
// Don't serialize the (potentially large) array of ranges, instead compute them on first
// access.
#[serde(skip)]
bucket_ranges: OnceCell<Vec<u64>>,
min: u64,
max: u64,
bucket_count: usize,
pub(crate) bucket_ranges: OnceCell<Vec<u64>>,
pub(crate) min: u64,
pub(crate) max: u64,
pub(crate) bucket_count: usize,
}

impl Bucketing for PrecomputedLinear {
Expand Down
102 changes: 102 additions & 0 deletions glean-core/src/histogram/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

//! A simple histogram implementation for exponential histograms.
use std::any::TypeId;
use std::collections::HashMap;

use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};

use crate::error::{Error, ErrorKind};
Expand Down Expand Up @@ -144,6 +146,70 @@ impl<B: Bucketing> Histogram<B> {
}
}

/// Either linear or exponential histogram bucketing
///
/// This is to be used as a single type to avoid generic use in the buffered API.
pub enum LinearOrExponential {
Linear(PrecomputedLinear),
Exponential(PrecomputedExponential),
}

impl Histogram<LinearOrExponential> {
/// A histogram using linear bucketing.
///
/// _Note:_ Special naming to avoid needing to use extensive type annotations in other parts.
/// This type is only used for the buffered API.
pub fn _linear(min: u64, max: u64, bucket_count: usize) -> Histogram<LinearOrExponential> {
Histogram {
values: HashMap::new(),
count: 0,
sum: 0,
bucketing: LinearOrExponential::Linear(PrecomputedLinear {
bucket_ranges: OnceCell::new(),
min,
max,
bucket_count,
}),
}
}

/// A histogram using expontential bucketing.
///
/// _Note:_ Special naming to avoid needing to use extensive type annotations in other parts.
/// This type is only used for the buffered API.
pub fn _exponential(min: u64, max: u64, bucket_count: usize) -> Histogram<LinearOrExponential> {
Histogram {
values: HashMap::new(),
count: 0,
sum: 0,
bucketing: LinearOrExponential::Exponential(PrecomputedExponential {
bucket_ranges: OnceCell::new(),
min,
max,
bucket_count,
}),
}
}
}

impl Bucketing for LinearOrExponential {
fn sample_to_bucket_minimum(&self, sample: u64) -> u64 {
use LinearOrExponential::*;
match self {
Linear(lin) => lin.sample_to_bucket_minimum(sample),
Exponential(exp) => exp.sample_to_bucket_minimum(sample),
}
}

fn ranges(&self) -> &[u64] {
use LinearOrExponential::*;
match self {
Linear(lin) => lin.ranges(),
Exponential(exp) => exp.ranges(),
}
}
}

impl<B> Histogram<B>
where
B: Bucketing,
Expand All @@ -165,3 +231,39 @@ where
}
}
}

impl<B> Histogram<B>
where
B: Bucketing + 'static,
B: std::fmt::Debug,
B: PartialEq,
{
/// Merges data from one histogram into the other.
///
/// ## Panics
///
/// Panics if the two histograms don't use the same bucketing.
/// Note that the `other` side can be either linear or exponential
/// and we only merge if it matches `self`'s bucketing.
// _Note:_ Unfortunately this needs a separate name from the above, otherwise it's a conflicting
// method.
// We only use it internally for the buffered API, and can guarantee correct usage that way.
pub fn _merge(&mut self, other: &Histogram<LinearOrExponential>) {
#[rustfmt::skip]
assert!(
(
TypeId::of::<B>() == TypeId::of::<PrecomputedLinear>()
&& matches!(other.bucketing, LinearOrExponential::Linear(_))
) ||
(
TypeId::of::<B>() == TypeId::of::<PrecomputedExponential>()
&& matches!(other.bucketing, LinearOrExponential::Exponential(_))
)
);
self.sum += other.sum;
self.count += other.count;
for (&bucket, &count) in &other.values {
*self.values.entry(bucket).or_insert(0) += count;
}
}
}
113 changes: 112 additions & 1 deletion glean-core/src/metrics/custom_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::mem;
use std::sync::Arc;

use crate::common_metric_data::CommonMetricDataInternal;
use crate::error_recording::{record_error, test_get_num_recorded_errors, ErrorType};
use crate::histogram::{Bucketing, Histogram, HistogramType};
use crate::histogram::{Bucketing, Histogram, HistogramType, LinearOrExponential};
use crate::metrics::{DistributionData, Metric, MetricType};
use crate::storage::StorageManager;
use crate::CommonMetricData;
Expand Down Expand Up @@ -267,4 +268,114 @@ impl CustomDistributionMetric {
test_get_num_recorded_errors(glean, self.meta(), error).unwrap_or(0)
})
}

/// **Experimental:** Start a new histogram buffer associated with this custom distribution metric.
///
/// A histogram buffer accumulates in-memory.
/// Data is recorded into the metric on drop.
pub fn start_buffer(&self) -> LocalCustomDistribution<'_> {
LocalCustomDistribution::new(self)
}

fn commit_histogram(&self, histogram: Histogram<LinearOrExponential>) {
let metric = self.clone();
crate::launch_with_glean(move |glean| {
glean
.storage()
.record_with(glean, &metric.meta, move |old_value| {
match metric.histogram_type {
HistogramType::Linear => {
let mut hist =
if let Some(Metric::CustomDistributionLinear(hist)) = old_value {
hist
} else {
Histogram::linear(
metric.range_min,
metric.range_max,
metric.bucket_count as usize,
)
};

hist._merge(&histogram);
Metric::CustomDistributionLinear(hist)
}
HistogramType::Exponential => {
let mut hist = if let Some(Metric::CustomDistributionExponential(
hist,
)) = old_value
{
hist
} else {
Histogram::exponential(
metric.range_min,
metric.range_max,
metric.bucket_count as usize,
)
};

hist._merge(&histogram);
Metric::CustomDistributionExponential(hist)
}
}
});
});
}
}

/// **Experimental:** A histogram buffer associated with a specific instance of a [`CustomDistributionMetric`].
///
/// Accumulation happens in-memory.
/// Data is merged into the metric on [`Drop::drop`].
pub struct LocalCustomDistribution<'a> {
histogram: Histogram<LinearOrExponential>,
metric: &'a CustomDistributionMetric,
}

impl<'a> LocalCustomDistribution<'a> {
/// Create a new histogram buffer referencing the custom distribution it will record into.
fn new(metric: &'a CustomDistributionMetric) -> Self {
let histogram = match metric.histogram_type {
HistogramType::Linear => Histogram::<LinearOrExponential>::_linear(
metric.range_min,
metric.range_max,
metric.bucket_count as usize,
),
HistogramType::Exponential => Histogram::<LinearOrExponential>::_exponential(
metric.range_min,
metric.range_max,
metric.bucket_count as usize,
),
};
Self { histogram, metric }
}

/// Accumulates one sample into the histogram.
///
/// The provided sample must be in the "unit" declared by the instance of the metric type
/// (e.g. if the instance this method was called on is using [`crate::TimeUnit::Second`], then
/// `sample` is assumed to be in seconds).
///
/// Accumulation happens in-memory only.
pub fn accumulate(&mut self, sample: u64) {
self.histogram.accumulate(sample)
}

/// Abandon this histogram buffer and don't commit accumulated data.
pub fn abandon(mut self) {
self.histogram.clear();
}
}

impl Drop for LocalCustomDistribution<'_> {
fn drop(&mut self) {
if self.histogram.is_empty() {
return;
}

// We want to move that value.
// A `0/0` histogram doesn't allocate.
let empty = Histogram::_linear(0, 0, 0);
let buffer = mem::replace(&mut self.histogram, empty);
self.metric.commit_histogram(buffer);
}
}

0 comments on commit 4d58644

Please sign in to comment.