From 0bdf61d235406868c770c0199167d10e59809e53 Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Tue, 24 Oct 2023 12:39:17 +0200 Subject: [PATCH] fix cluster metrics change structure of LocalDrain, to be more readable and debuggable --- lib/src/metrics/local_drain.rs | 444 +++++++++++++++++++++++---------- 1 file changed, 314 insertions(+), 130 deletions(-) diff --git a/lib/src/metrics/local_drain.rs b/lib/src/metrics/local_drain.rs index bf6d00f1a..29000cbbe 100644 --- a/lib/src/metrics/local_drain.rs +++ b/lib/src/metrics/local_drain.rs @@ -1,5 +1,9 @@ #![allow(dead_code)] -use std::{collections::BTreeMap, str, time::Instant}; +use std::{ + collections::{btree_map::Entry, BTreeMap}, + str, + time::Instant, +}; use hdrhistogram::Histogram; @@ -110,19 +114,108 @@ enum MetricMeta { ClusterBackend, } +/// local equivalent to proto::command::ClusterMetrics +#[derive(Debug)] +pub struct LocalClusterMetrics { + /// metric_name -> metric value + cluster: BTreeMap, + /// + backends: Vec, +} + +impl LocalClusterMetrics { + fn to_filtered_metrics( + &self, + metric_names: &Vec, + ) -> Result { + let cluster = self + .cluster + .iter() + .filter(|(key, _)| { + if metric_names.is_empty() { + true + } else { + metric_names.contains(key) + } + }) + .map(|(metric_name, metric)| (metric_name.to_owned(), metric.to_filtered())) + .collect(); + + let mut backends: Vec = Vec::new(); + for backend in &self.backends { + backends.push(backend.to_filtered_metrics(metric_names)?); + } + Ok(ClusterMetrics { cluster, backends }) + } + + fn metric_names(&self) -> Vec { + let mut names: Vec = self.cluster.keys().map(|k| k.to_owned()).collect(); + + for backend in &self.backends { + for name in backend.metrics_names() { + names.push(name); + } + } + names + } + + fn contains_backend(&self, backend_id: &str) -> bool { + for backend in &self.backends { + if backend.backend_id == backend_id { + return true; + } + } + false + } +} + +/// local equivalent to proto::command::BackendMetrics +#[derive(Debug, Clone)] +pub struct LocalBackendMetrics { + backend_id: String, + /// metric_name -> value + metrics: BTreeMap, +} + +impl LocalBackendMetrics { + fn to_filtered_metrics( + &self, + metric_names: &Vec, + ) -> Result { + let filtered_backend_metrics = self + .metrics + .iter() + .filter(|(key, _)| { + if metric_names.is_empty() { + true + } else { + metric_names.contains(key) + } + }) + .map(|(metric_name, value)| (metric_name.to_owned(), value.to_filtered())) + .collect::>(); + + Ok(BackendMetrics { + backend_id: self.backend_id.to_owned(), + metrics: filtered_backend_metrics, + }) + } + + fn metrics_names(&self) -> Vec { + self.metrics.keys().map(|k| k.to_owned()).collect() + } +} + /// This gathers metrics locally, to be queried by the CLI #[derive(Debug)] pub struct LocalDrain { /// a prefix to metric keys, usually "sozu-" pub prefix: String, pub created: Instant, - /// metric_name -> metric value + /// metrics of the proxy server (metric_name -> metric value) pub proxy_metrics: BTreeMap, - /// backend_id -> cluster_id - backend_to_cluster: BTreeMap, - /// BTreeMap> - /// if the key is a backend_id, we'll retrieve the cluster_id in the backend_to_cluster map - cluster_metrics: BTreeMap>, + /// cluster_id -> cluster_metrics + cluster_metrics: BTreeMap, use_tagged_metrics: bool, origin: String, disable_cluster_metrics: bool, @@ -135,7 +228,6 @@ impl LocalDrain { created: Instant::now(), proxy_metrics: BTreeMap::new(), cluster_metrics: BTreeMap::new(), - backend_to_cluster: BTreeMap::new(), use_tagged_metrics: false, origin: String::from("x"), disable_cluster_metrics: false, @@ -151,35 +243,9 @@ impl LocalDrain { } pub fn clear(&mut self) { - self.backend_to_cluster.clear(); self.cluster_metrics.clear(); } - fn get_cluster_ids(&self) -> Vec { - self.cluster_metrics - .iter() - .filter_map( - |entry| match self.backend_to_cluster.contains_key(entry.0) { - true => Some(entry.0.to_owned()), - false => None, - }, - ) - .collect() - } - - fn get_backend_ids(&self, cluster_id: &str) -> Vec { - self.backend_to_cluster - .iter() - .filter_map(|backend_to_cluster| { - if backend_to_cluster.1 == cluster_id { - Some(backend_to_cluster.0.to_owned()) - } else { - None - } - }) - .collect() - } - pub fn query(&mut self, options: &QueryMetricsOptions) -> Result { trace!( "The local drain received a metrics query with this options: {:?}", @@ -209,16 +275,19 @@ impl LocalDrain { fn list_all_metric_names(&self) -> Result { let proxy_metrics = self.proxy_metrics.keys().cloned().collect(); - let mut cluster_metrics = Vec::new(); + let mut cluster_metrics_names = Vec::new(); - for cluster_metrics_entry in self.cluster_metrics.iter() { - for (metric_name, _) in cluster_metrics_entry.1.iter() { - cluster_metrics.push(metric_name.to_owned()); + for (_, cluster_metrics) in self.cluster_metrics.iter() { + for metric_name in cluster_metrics.metric_names() { + cluster_metrics_names.push(metric_name.to_owned()); } } + cluster_metrics_names.sort(); + cluster_metrics_names.dedup(); + Ok(ContentType::AvailableMetrics(AvailableMetrics { proxy_metrics, - cluster_metrics, + cluster_metrics: cluster_metrics_names, }) .into()) } @@ -256,8 +325,8 @@ impl LocalDrain { ) -> Result, MetricError> { let mut cluster_data = BTreeMap::new(); - for cluster_id in self.get_cluster_ids() { - let cluster_metrics = self.metrics_of_one_cluster(&cluster_id, metric_names)?; + for cluster_id in self.cluster_metrics.keys() { + let cluster_metrics = self.metrics_of_one_cluster(cluster_id, metric_names)?; cluster_data.insert(cluster_id.to_owned(), cluster_metrics); } @@ -269,29 +338,14 @@ impl LocalDrain { cluster_id: &str, metric_names: &Vec, ) -> Result { - let raw_metrics = self + let aggregated = self .cluster_metrics .get(cluster_id) .ok_or(MetricError::NoMetrics(cluster_id.to_owned()))?; - let cluster: BTreeMap = raw_metrics - .iter() - .filter(|entry| { - if metric_names.is_empty() { - true - } else { - metric_names.contains(entry.0) - } - }) - .map(|entry| (entry.0.to_owned(), entry.1.to_filtered())) - .collect::>(); + let filtered = aggregated.to_filtered_metrics(metric_names)?; - let mut backends = Vec::new(); - for backend_id in self.get_backend_ids(cluster_id) { - let backend_metrics = self.metrics_of_one_backend(&backend_id, metric_names)?; - backends.push(backend_metrics); - } - Ok(ClusterMetrics { cluster, backends }) + Ok(filtered) } fn metrics_of_one_backend( @@ -299,27 +353,20 @@ impl LocalDrain { backend_id: &str, metric_names: &Vec, ) -> Result { - let backend_metrics = self - .cluster_metrics - .get(backend_id) - .ok_or(MetricError::NoMetrics(backend_id.to_owned()))?; - - let filtered_backend_metrics = backend_metrics - .iter() - .filter(|entry| { - if metric_names.is_empty() { - true - } else { - metric_names.contains(entry.0) - } - }) - .map(|entry| (entry.0.to_owned(), entry.1.to_filtered())) - .collect::>(); + for cluster_metrics in self.cluster_metrics.values() { + if let Some(backend_metrics) = cluster_metrics + .backends + .iter() + .find(|backend_metrics| backend_metrics.backend_id == backend_id) + { + return Ok(backend_metrics.to_filtered_metrics(metric_names)?); + } + } - Ok(BackendMetrics { - backend_id: backend_id.to_owned(), - metrics: filtered_backend_metrics, - }) + Err(MetricError::NoMetrics(format!( + "No metric for backend {}", + backend_id + ))) } fn query_clusters( @@ -352,19 +399,26 @@ impl LocalDrain { let mut clusters: BTreeMap = BTreeMap::new(); for backend_id in backend_ids { - let cluster_id = self - .backend_to_cluster - .get(backend_id) - .ok_or(MetricError::NoMetrics(backend_id.to_owned()))? - .to_owned(); - - let backends = vec![self.metrics_of_one_backend(backend_id, metric_names)?]; + // find the cluster + let (cluster_id, cluster) = match self + .cluster_metrics + .iter() + .find(|(_, cluster)| cluster.contains_backend(backend_id)) + { + Some(cluster) => cluster, + None => continue, + }; + + let mut backend_metrics = Vec::new(); + for backend in &cluster.backends { + backend_metrics.push(backend.to_filtered_metrics(metric_names)?); + } clusters.insert( - cluster_id, + cluster_id.to_owned(), ClusterMetrics { cluster: BTreeMap::new(), - backends, + backends: backend_metrics, }, ); } @@ -376,49 +430,111 @@ impl LocalDrain { }) } - fn receive_cluster_metric( + // TODO: implement this as a method of LocalClusterMetrics for readability + fn receive_cluster_metric_new( &mut self, metric_name: &str, cluster_id: &str, - backend_id: Option<&str>, - metric_value: MetricValue, + metric: MetricValue, ) { if self.disable_cluster_metrics { return; } - trace!( - "cluster metric: {} {} {:?} {:?}", - metric_name, - cluster_id, - backend_id, - metric_value - ); + let local_cluster_metric = + self.cluster_metrics + .entry(cluster_id.to_owned()) + .or_insert(LocalClusterMetrics { + cluster: BTreeMap::new(), + backends: Vec::new(), + }); - let cluster_or_backend_id = match backend_id { - Some(backend_id) => { - self.backend_to_cluster - .insert(backend_id.to_owned(), cluster_id.to_owned()); - backend_id + match local_cluster_metric.cluster.get_mut(metric_name) { + Some(existing_metric) => existing_metric.update(metric_name, metric), + None => { + let aggregated_metric = match AggregatedMetric::new(metric) { + Ok(m) => m, + Err(e) => { + return error!("Could not aggregate metric: {}", e.to_string()); + } + }; + local_cluster_metric + .cluster + .insert(metric_name.to_owned(), aggregated_metric); + } + } + } + + // TODO: implement this as a method of LocalBackendMetrics for readability + fn receive_backend_metric( + &mut self, + metric_name: &str, + cluster_id: &str, + backend_id: &str, + metric: MetricValue, + ) { + if self.disable_cluster_metrics { + return; + } + let aggregated_metric = match AggregatedMetric::new(metric.clone()) { + Ok(m) => m, + Err(e) => { + return error!("Could not aggregate metric: {}", e.to_string()); } - None => cluster_id, }; - let submap = self - .cluster_metrics - .entry(cluster_or_backend_id.to_owned()) - .or_insert(BTreeMap::new()); + match self.cluster_metrics.entry(cluster_id.to_owned()) { + Entry::Vacant(entry) => { + let mut metrics = BTreeMap::new(); + metrics.insert(metric_name.to_owned(), aggregated_metric.clone()); + let backends = [LocalBackendMetrics { + backend_id: backend_id.to_owned(), + metrics, + }] + .to_vec(); + + entry.insert(LocalClusterMetrics { + cluster: BTreeMap::new(), + backends, + }); + return; + } + Entry::Occupied(mut entry) => { + for backend_metrics in &mut entry.get_mut().backends { + if &backend_metrics.backend_id == backend_id { + if let Some(existing_metric) = backend_metrics.metrics.get_mut(metric_name) + { + existing_metric.update(metric_name, metric); + return; + } + } + } - match submap.get_mut(metric_name) { - Some(existing_metric) => existing_metric.update(metric_name, metric_value), + let mut metrics = BTreeMap::new(); + metrics.insert(metric_name.to_owned(), aggregated_metric); + let backend = LocalBackendMetrics { + backend_id: backend_id.to_owned(), + metrics, + }; + + entry.get_mut().backends.push(backend); + } + } + } + + fn receive_proxy_metric(&mut self, metric_name: &str, metric: MetricValue) { + match self.proxy_metrics.get_mut(metric_name) { + Some(stored_metric) => stored_metric.update(metric_name, metric), None => { - let aggregated_metric = match AggregatedMetric::new(metric_value) { + let aggregated_metric = match AggregatedMetric::new(metric) { Ok(m) => m, Err(e) => { return error!("Could not aggregate metric: {}", e.to_string()); } }; - submap.insert(metric_name.to_owned(), aggregated_metric); + + self.proxy_metrics + .insert(String::from(metric_name), aggregated_metric); } } } @@ -440,26 +556,94 @@ impl Subscriber for LocalDrain { metric ); - // cluster metrics - if let Some(id) = cluster_id { - self.receive_cluster_metric(key, id, backend_id, metric); - return; + match (cluster_id, backend_id) { + (Some(cluster_id), Some(backend_id)) => { + self.receive_backend_metric(key, cluster_id, backend_id, metric) + } + (Some(cluster_id), None) => self.receive_cluster_metric_new(key, cluster_id, metric), + (None, _) => self.receive_proxy_metric(key, metric), } + } +} +#[cfg(test)] +mod tests { + use sozu_command::proto::command::{filtered_metrics::Inner, FilteredMetrics}; - // proxy metrics - match self.proxy_metrics.get_mut(key) { - Some(stored_metric) => stored_metric.update(key, metric), - None => { - let aggregated_metric = match AggregatedMetric::new(metric) { - Ok(m) => m, - Err(e) => { - return error!("Could not aggregate metric: {}", e.to_string()); - } - }; + use super::*; - self.proxy_metrics - .insert(String::from(key), aggregated_metric); - } - } + #[test] + fn receive_and_yield_backend_metrics() { + let mut local_drain = LocalDrain::new("prefix".to_string()); + + local_drain.receive_metric( + "connections_per_backend", + Some("test-cluster"), + Some("test-backend-1"), + MetricValue::Count(1), + ); + local_drain.receive_metric( + "connections_per_backend", + Some("test-cluster"), + Some("test-backend-1"), + MetricValue::Count(1), + ); + + let mut expected_metrics_1 = BTreeMap::new(); + expected_metrics_1.insert( + "connections_per_backend".to_string(), + FilteredMetrics { + inner: Some(Inner::Count(2)), + }, + ); + + let expected_backend_metrics = BackendMetrics { + backend_id: "test-backend-1".to_string(), + metrics: expected_metrics_1, + }; + + assert_eq!( + expected_backend_metrics, + local_drain + .metrics_of_one_backend( + "test-backend-1", + &["connections_per_backend".to_string()].to_vec(), + ) + .expect("could not query metrics for this backend") + ) + } + + #[test] + fn receive_and_yield_cluster_metrics() { + let mut local_drain = LocalDrain::new("prefix".to_string()); + local_drain.receive_metric( + "http_errors", + Some("test-cluster"), + None, + MetricValue::Count(1), + ); + local_drain.receive_metric( + "http_errors", + Some("test-cluster"), + None, + MetricValue::Count(1), + ); + + let mut map = BTreeMap::new(); + map.insert( + "http_errors".to_string(), + FilteredMetrics { + inner: Some(Inner::Count(2)), + }, + ); + let expected_cluster_metrics = ClusterMetrics { + cluster: map, + backends: vec![], + }; + + let returned_cluster_metrics = local_drain + .metrics_of_one_cluster("test-cluster", &["http_errors".to_string()].to_vec()) + .expect("could not query metrics for this cluster"); + + assert_eq!(expected_cluster_metrics, returned_cluster_metrics); } }