Skip to content

Commit

Permalink
Use our own metric handle
Browse files Browse the repository at this point in the history
This commit follows on with my previous thoughts and replaces the metrics-rs
handle with something that more closely matches our internal_metrics use
case. Benefits include static memory use compared to the epoch mechanism in
metrics-rs, though we do lose perfect sample capture.

In my experiments this reduces vector's memory usage from 200Mb to 6Mb,
controlling for the leak.

Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Apr 7, 2021
1 parent 4a6bee5 commit 0f1a773
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 83 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ tokio-test = "0.4.1"
tokio01-test = "0.1.1"
tower-test = "0.4.0"
walkdir = "2.3.2"
quickcheck = "1"

[features]
# Default features for *-unknown-linux-gnu and *-apple-darwin
Expand Down
38 changes: 18 additions & 20 deletions src/event/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::EventMetadata;
use crate::metrics::Handle;
use chrono::{DateTime, Utc};
use derive_is_enum_variant::is_enum_variant;
use getset::Getters;
Expand Down Expand Up @@ -289,29 +290,26 @@ impl Metric {

/// Convert the metrics_runtime::Measurement value plus the name and
/// labels from a Key into our internal Metric format.
pub fn from_metric_kv(key: &metrics::Key, handle: &metrics_util::Handle) -> Self {
pub fn from_metric_kv(key: &metrics::Key, handle: &Handle) -> Self {
let value = match handle {
metrics_util::Handle::Counter(_) => MetricValue::Counter {
value: handle.read_counter() as f64,
Handle::Counter(counter) => MetricValue::Counter {
value: counter.count() as f64,
},
metrics_util::Handle::Gauge(_) => MetricValue::Gauge {
value: handle.read_gauge() as f64,
Handle::Gauge(gauge) => MetricValue::Gauge {
value: gauge.gauge(),
},
metrics_util::Handle::Histogram(_) => {
let mut samples = Vec::with_capacity(128);
handle.read_histogram_with_clear(|values| {
// Each sample in the source measurement has an effective
// sample rate of 1.
for value in values {
samples.push(Sample {
value: *value,
rate: 1,
});
}
});
MetricValue::Distribution {
samples,
statistic: StatisticKind::Histogram,
Handle::Histogram(histogram) => {
let mut buckets: Vec<(f64, u32)> = Vec::with_capacity(32);
histogram.buckets(&mut buckets);
let buckets: Vec<Bucket> = buckets
.into_iter()
.map(|(upper_limit, count)| Bucket { upper_limit, count })
.collect();

MetricValue::AggregatedHistogram {
buckets,
sum: histogram.sum() as f64,
count: histogram.count(),
}
}
};
Expand Down
257 changes: 257 additions & 0 deletions src/metrics/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
use metrics::GaugeValue;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};

#[derive(Debug)]
pub enum Handle {
Gauge(Gauge),
Counter(Counter),
Histogram(Histogram),
}

impl Handle {
pub(crate) fn counter() -> Self {
Handle::Counter(Counter::new())
}

pub(crate) fn increment_counter(&mut self, value: u64) {
match self {
Handle::Counter(counter) => counter.record(value),
_ => unreachable!(),
}
}

pub(crate) fn gauge() -> Self {
Handle::Gauge(Gauge::new())
}

pub(crate) fn update_gauge(&mut self, value: GaugeValue) {
match self {
Handle::Gauge(gauge) => gauge.record(value),
_ => unreachable!(),
}
}

pub(crate) fn histogram() -> Self {
Handle::Histogram(Histogram::new())
}

pub(crate) fn record_histogram(&mut self, value: f64) {
match self {
Handle::Histogram(h) => h.record(value),
_ => unreachable!(),
};
}
}

#[derive(Debug)]
pub struct Histogram {
pub buckets: Box<[(f64, AtomicU32); 22]>,
pub count: AtomicU32,
pub sum: AtomicU64,
}

impl Histogram {
pub(crate) fn new() -> Self {
// Box to avoid having this large array inline to the structure, blowing
// out cache coherence.
//
// The sequence here is based on powers of two. Other sequences are more
// suitable for different distributions but since our present use case
// is mostly non-negative and measures smallish latencies we cluster
// around but never quite get to zero with an increasingly coarse
// long-tail.
let buckets = Box::new([
(f64::NEG_INFINITY, AtomicU32::new(0)),
(0.015625, AtomicU32::new(0)),
(0.03125, AtomicU32::new(0)),
(0.0625, AtomicU32::new(0)),
(0.125, AtomicU32::new(0)),
(0.25, AtomicU32::new(0)),
(0.5, AtomicU32::new(0)),
(0.0, AtomicU32::new(0)),
(1.0, AtomicU32::new(0)),
(2.0, AtomicU32::new(0)),
(4.0, AtomicU32::new(0)),
(8.0, AtomicU32::new(0)),
(16.0, AtomicU32::new(0)),
(32.0, AtomicU32::new(0)),
(32.0, AtomicU32::new(0)),
(128.0, AtomicU32::new(0)),
(256.0, AtomicU32::new(0)),
(512.0, AtomicU32::new(0)),
(1024.0, AtomicU32::new(0)),
(2048.0, AtomicU32::new(0)),
(4096.0, AtomicU32::new(0)),
(f64::INFINITY, AtomicU32::new(0)),
]);
Self {
buckets,
count: AtomicU32::new(0),
sum: AtomicU64::new(0),
}
}

pub(crate) fn record(&mut self, value: f64) {
let mut prev_bound = f64::NEG_INFINITY;
assert!(self.buckets.len() == 22);
for (bound, bucket) in self.buckets.iter_mut() {
if value > prev_bound && value <= *bound {
bucket.fetch_add(1, Ordering::Relaxed);
break;
} else {
prev_bound = *bound;
}
}

self.count.fetch_add(1, Ordering::Relaxed);
let _ = self
.sum
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| {
let next_sum = f64::from_bits(cur) + value;
Some(next_sum.to_bits())
});
}

pub fn count(&self) -> u32 {
self.count.load(Ordering::Relaxed)
}

pub fn sum(&self) -> f64 {
f64::from_bits(self.sum.load(Ordering::Relaxed))
}

pub fn buckets(&self, buf: &mut Vec<(f64, u32)>) {
for (k, v) in self.buckets.iter() {
buf.push((*k, v.load(Ordering::Relaxed)));
}
}
}

#[derive(Debug)]
pub struct Counter {
inner: AtomicU64,
}

impl Counter {
pub(crate) fn with_count(count: u64) -> Self {
Self {
inner: AtomicU64::new(count),
}
}

pub(crate) fn new() -> Self {
Self {
inner: AtomicU64::new(0),
}
}

pub(crate) fn record(&mut self, value: u64) {
self.inner.fetch_add(value, Ordering::Relaxed);
}

pub fn count(&self) -> u64 {
self.inner.load(Ordering::Relaxed)
}
}

#[derive(Debug)]
pub struct Gauge {
inner: AtomicU64,
}

impl Gauge {
pub(crate) fn new() -> Self {
Self {
inner: AtomicU64::new(0),
}
}

pub(crate) fn record(&mut self, value: GaugeValue) {
// Because Rust lacks an atomic f64 we store gauges as AtomicU64
// and transmute back and forth to an f64 here. They have the
// same size so this operation is safe, just don't read the
// AtomicU64 directly.
let _ = self
.inner
.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| {
let val = value.update_value(f64::from_bits(cur));
Some(val.to_bits())
});
}

pub fn gauge(&self) -> f64 {
f64::from_bits(self.inner.load(Ordering::Relaxed))
}
}

#[cfg(test)]
mod test {
use crate::metrics::handle::{Counter, Histogram};
use quickcheck::{QuickCheck, TestResult};

// Adapted from https://users.rust-lang.org/t/assert-eq-for-float-numbers/7034/4?u=blt
fn nearly_equal(a: f64, b: f64) -> bool {
let abs_a = a.abs();
let abs_b = b.abs();
let diff = (a - b).abs();

if a == b {
// Handle infinities.
true
} else if a == 0.0 || b == 0.0 || diff < f64::MIN_POSITIVE {
// One of a or b is zero (or both are extremely close to it,) use absolute error.
diff < (f64::EPSILON * f64::MIN_POSITIVE)
} else {
// Use relative error.
(diff / f64::min(abs_a + abs_b, f64::MAX)) < f64::EPSILON
}
}

#[test]
fn histogram() {
fn inner(values: Vec<f64>) -> TestResult {
let mut sut = Histogram::new();
let mut model_count: u32 = 0;
let mut model_sum: f64 = 0.0;

for val in &values {
if val.is_infinite() || val.is_nan() {
continue;
}
sut.record(*val);
model_count = model_count.wrapping_add(1);
model_sum += *val;

assert_eq!(sut.count(), model_count);
assert!(nearly_equal(sut.sum(), model_sum));
}
TestResult::passed()
}

QuickCheck::new()
.tests(1_000)
.max_tests(2_000)
.quickcheck(inner as fn(Vec<f64>) -> TestResult);
}

#[test]
fn count() {
fn inner(values: Vec<u64>) -> TestResult {
let mut sut = Counter::new();
let mut model: u64 = 0;

for val in &values {
sut.record(*val);
model = model.wrapping_add(*val);

assert_eq!(sut.count(), model);
}
TestResult::passed()
}

QuickCheck::new()
.tests(1_000)
.max_tests(2_000)
.quickcheck(inner as fn(Vec<u64>) -> TestResult);
}
}
26 changes: 10 additions & 16 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
mod handle;
mod label_filter;
mod recorder;
mod registry;
#[cfg(test)]
mod tests;

pub use crate::metrics::handle::{Counter, Handle};
use crate::metrics::label_filter::VectorLabelFilter;
use crate::metrics::recorder::VectorRecorder;
use crate::metrics::registry::VectorRegistry;
use crate::{event::Metric, Event};
use metrics::{Key, KeyData, SharedString};
use metrics_tracing_context::TracingContextLayer;
use metrics_util::layers::Layer;
use metrics_util::{CompositeKey, Handle, MetricKind};
use metrics_util::{CompositeKey, MetricKind};
use once_cell::sync::OnceCell;
use std::sync::{atomic::AtomicU64, Arc};

static CONTROLLER: OnceCell<Controller> = OnceCell::new();
// Cardinality counter parameters, expose the internal metrics registry
Expand Down Expand Up @@ -52,17 +53,7 @@ pub fn init() -> crate::Result<()> {
////
//// Prepare the registry
////

let registry = VectorRegistry::default();
// We keep track of the total number of metrics tracked by vector, its
// "cardinality". This counter is an atomic so we can update it
// cross-thread. It is owned by the recorder.
let cardinality_counter = Arc::new(AtomicU64::new(1));
registry.op(
CARDINALITY_KEY.clone(),
|_| {},
|| Handle::Counter(Arc::clone(&cardinality_counter)),
);

////
//// Prepare the controller
Expand All @@ -86,7 +77,7 @@ pub fn init() -> crate::Result<()> {
// The recorder is the interface between metrics-rs and our registry. In our
// case it doesn't _do_ much other than shepherd into the registry and
// update the cardinality counter, see above, as needed.
let recorder = VectorRecorder::new(registry, cardinality_counter);
let recorder = VectorRecorder::new(registry);
let recorder: Box<dyn metrics::Recorder> = if tracing_context_layer_enabled() {
// Apply a layer to capture tracing span fields as labels.
Box::new(TracingContextLayer::new(VectorLabelFilter).layer(recorder))
Expand Down Expand Up @@ -116,11 +107,14 @@ pub fn get_controller() -> crate::Result<&'static Controller> {
/// Take a snapshot of all gathered metrics and expose them as metric
/// [`Event`]s.
pub fn capture_metrics(controller: &Controller) -> impl Iterator<Item = Event> {
controller
let mut events = controller
.registry
.map
.iter()
.map(|kv| Metric::from_metric_kv(kv.key().key(), kv.value()).into())
.collect::<Vec<Event>>()
.into_iter()
.collect::<Vec<Event>>();
let handle = Handle::Counter(Counter::with_count(events.len() as u64));
events.push(Metric::from_metric_kv(CARDINALITY_KEY.key(), &handle).into());

events.into_iter()
}
Loading

0 comments on commit 0f1a773

Please sign in to comment.