diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 00000000..5e4d2492 --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +too-many-lines-threshold = 150 diff --git a/metrics-exporter-prometheus/src/exporter/builder.rs b/metrics-exporter-prometheus/src/exporter/builder.rs index af319b19..309ed8f2 100644 --- a/metrics-exporter-prometheus/src/exporter/builder.rs +++ b/metrics-exporter-prometheus/src/exporter/builder.rs @@ -44,6 +44,7 @@ pub struct PrometheusBuilder { buckets: Option>, bucket_overrides: Option>>, idle_timeout: Option, + upkeep_timeout: Duration, recency_mask: MetricKindMask, global_labels: Option>, } @@ -60,6 +61,8 @@ impl PrometheusBuilder { #[cfg(not(feature = "http-listener"))] let exporter_config = ExporterConfig::Unconfigured; + let upkeep_timeout = Duration::from_secs(5); + Self { exporter_config, #[cfg(feature = "http-listener")] @@ -70,6 +73,7 @@ impl PrometheusBuilder { buckets: None, bucket_overrides: None, idle_timeout: None, + upkeep_timeout, recency_mask: MetricKindMask::NONE, global_labels: None, } @@ -303,6 +307,16 @@ impl PrometheusBuilder { self } + /// Sets the upkeep interval. + /// + /// The upkeep task handles periodic maintenance operations, such as draining histogram data, + /// to ensure that all recorded data is up-to-date and prevent unbounded memory growth. + #[must_use] + pub fn upkeep_timeout(mut self, timeout: Duration) -> Self { + self.upkeep_timeout = timeout; + self + } + /// Adds a global label to this exporter. /// /// Global labels are applied to all metrics. Labels defined on the metric key itself have precedence @@ -409,11 +423,20 @@ impl PrometheusBuilder { pub fn build(mut self) -> Result<(PrometheusRecorder, ExporterFuture), BuildError> { #[cfg(feature = "http-listener")] let allowed_addresses = self.allowed_addresses.take(); - let exporter_config = self.exporter_config.clone(); + let upkeep_timeout = self.upkeep_timeout; + let recorder = self.build_recorder(); let handle = recorder.handle(); + let recorder_handle = handle.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(upkeep_timeout).await; + recorder_handle.run_upkeep(); + } + }); + Ok(( recorder, match exporter_config { diff --git a/metrics-exporter-prometheus/src/recorder.rs b/metrics-exporter-prometheus/src/recorder.rs index 73ef1316..21b2ea79 100644 --- a/metrics-exporter-prometheus/src/recorder.rs +++ b/metrics-exporter-prometheus/src/recorder.rs @@ -56,6 +56,9 @@ impl Inner { *entry = value; } + // Update distributions + self.drain_histograms_to_distributions(); + // Remove expired histograms let histogram_handles = self.registry.get_histogram_handles(); for (key, histogram) in histogram_handles { let gen = histogram.get_generation(); @@ -66,7 +69,7 @@ impl Inner { let (name, labels) = key_to_parts(&key, Some(&self.global_labels)); let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner); let delete_by_name = if let Some(by_name) = wg.get_mut(&name) { - by_name.remove(&labels); + by_name.swap_remove(&labels); by_name.is_empty() } else { false @@ -80,7 +83,18 @@ impl Inner { continue; } + } + let distributions = + self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone(); + + Snapshot { counters, gauges, distributions } + } + + /// Drains histogram samples into distribution. + fn drain_histograms_to_distributions(&self) { + let histogram_handles = self.registry.get_histogram_handles(); + for (key, histogram) in histogram_handles { let (name, labels) = key_to_parts(&key, Some(&self.global_labels)); let mut wg = self.distributions.write().unwrap_or_else(PoisonError::into_inner); @@ -92,11 +106,6 @@ impl Inner { histogram.get_inner().clear_with(|samples| entry.record_samples(samples)); } - - let distributions = - self.distributions.read().unwrap_or_else(PoisonError::into_inner).clone(); - - Snapshot { counters, gauges, distributions } } fn render(&self) -> String { @@ -194,6 +203,10 @@ impl Inner { output } + + fn run_upkeep(&self) { + self.drain_histograms_to_distributions(); + } } /// A Prometheus recorder. @@ -273,4 +286,10 @@ impl PrometheusHandle { pub fn render(&self) -> String { self.inner.render() } + + /// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not + /// grow unboundedly. + pub fn run_upkeep(&self) { + self.inner.run_upkeep(); + } }