Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
too-many-lines-threshold = 150
25 changes: 24 additions & 1 deletion metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PrometheusBuilder {
buckets: Option<Vec<f64>>,
bucket_overrides: Option<HashMap<Matcher, Vec<f64>>>,
idle_timeout: Option<Duration>,
upkeep_timeout: Duration,
recency_mask: MetricKindMask,
global_labels: Option<IndexMap<String, String>>,
}
Expand All @@ -60,6 +61,8 @@ impl PrometheusBuilder {
#[cfg(not(feature = "http-listener"))]
let exporter_config = ExporterConfig::Unconfigured;

let upkeep_timeout = Duration::from_secs(5);

Self {
exporter_config,
#[cfg(feature = "http-listener")]
Expand All @@ -70,6 +73,7 @@ impl PrometheusBuilder {
buckets: None,
bucket_overrides: None,
idle_timeout: None,
upkeep_timeout,
recency_mask: MetricKindMask::NONE,
global_labels: None,
}
Expand Down Expand Up @@ -303,6 +307,16 @@ impl PrometheusBuilder {
self
}

/// Sets the upkeep interval.
///
/// The upkeep task handles periodic maintenance operations, such as draining histogram data,
/// to ensure that all recorded data is up-to-date and prevent unbounded memory growth.
#[must_use]
pub fn upkeep_timeout(mut self, timeout: Duration) -> Self {
self.upkeep_timeout = timeout;
self
}

/// Adds a global label to this exporter.
///
/// Global labels are applied to all metrics. Labels defined on the metric key itself have precedence
Expand Down Expand Up @@ -409,11 +423,20 @@ impl PrometheusBuilder {
pub fn build(mut self) -> Result<(PrometheusRecorder, ExporterFuture), BuildError> {
#[cfg(feature = "http-listener")]
let allowed_addresses = self.allowed_addresses.take();

let exporter_config = self.exporter_config.clone();
let upkeep_timeout = self.upkeep_timeout;

let recorder = self.build_recorder();
let handle = recorder.handle();

let recorder_handle = handle.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(upkeep_timeout).await;
recorder_handle.run_upkeep();
}
});

Ok((
recorder,
match exporter_config {
Expand Down
31 changes: 25 additions & 6 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ impl Inner {
*entry = value;
}

// Update distributions
self.drain_histograms_to_distributions();
// Remove expired histograms
let histogram_handles = self.registry.get_histogram_handles();
for (key, histogram) in histogram_handles {
let gen = histogram.get_generation();
Expand All @@ -66,7 +69,7 @@ impl Inner {
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));
let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
let delete_by_name = if let Some(by_name) = wg.get_mut(&name) {
by_name.remove(&labels);
by_name.swap_remove(&labels);
by_name.is_empty()
} else {
false
Expand All @@ -80,7 +83,18 @@ impl Inner {

continue;
}
}

let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}

/// Drains histogram samples into distribution.
fn drain_histograms_to_distributions(&self) {
let histogram_handles = self.registry.get_histogram_handles();
for (key, histogram) in histogram_handles {
let (name, labels) = key_to_parts(&key, Some(&self.global_labels));

let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner);
Expand All @@ -92,11 +106,6 @@ impl Inner {

histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
}

let distributions =
self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone();

Snapshot { counters, gauges, distributions }
}

fn render(&self) -> String {
Expand Down Expand Up @@ -194,6 +203,10 @@ impl Inner {

output
}

fn run_upkeep(&self) {
self.drain_histograms_to_distributions();
}
}

/// A Prometheus recorder.
Expand Down Expand Up @@ -273,4 +286,10 @@ impl PrometheusHandle {
pub fn render(&self) -> String {
self.inner.render()
}

/// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not
/// grow unboundedly.
pub fn run_upkeep(&self) {
self.inner.run_upkeep();
}
}