From 5deac5d1fbf66a68c61a669c350a0cc0c9147d60 Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Thu, 27 Jun 2024 17:35:44 +0200 Subject: [PATCH 1/2] merge cluster metrics across workers The default behaviour is now to take all metrics from the AggregatedMetrics object, merge them by cluster id, and move them to the field AggregatedMetrics::clusters. The behaviour is avoidable with the option "metrics get --workers" Note: the function merge_cluster_metrics does not merge time metrics (percentiles), they are discarded. Prometheus histograms should be mergeable though, if bucket size is consistent. --- bin/src/cli.rs | 6 +++ bin/src/command/requests.rs | 18 ++++--- bin/src/ctl/command.rs | 2 + bin/src/ctl/mod.rs | 11 +++- command/src/command.proto | 11 +++- command/src/proto/display.rs | 7 +++ command/src/proto/mod.rs | 97 ++++++++++++++++++++++++++++++++++ lib/src/metrics/local_drain.rs | 1 + 8 files changed, 145 insertions(+), 8 deletions(-) diff --git a/bin/src/cli.rs b/bin/src/cli.rs index c6e429a29..cb0aaa3c1 100644 --- a/bin/src/cli.rs +++ b/bin/src/cli.rs @@ -228,6 +228,12 @@ pub enum MetricsCmd { help = "get only the metrics of main process and workers (no cluster metrics)" )] no_clusters: bool, + #[clap( + short = 'w', + long = "workers", + help = "display metrics of each worker, without flattening by cluster id (takes more space)" + )] + workers: bool, }, } diff --git a/bin/src/command/requests.rs b/bin/src/command/requests.rs index c9fbe3939..9938a686f 100644 --- a/bin/src/command/requests.rs +++ b/bin/src/command/requests.rs @@ -532,7 +532,7 @@ impl GatheringTask for QueryMetricsTask { fn on_finish( self: Box, - _server: &mut Server, + server: &mut Server, client: &mut OptionalClient, _timed_out: bool, ) { @@ -579,12 +579,18 @@ impl GatheringTask for QueryMetricsTask { ) .collect(); + let mut aggregated_metrics = AggregatedMetrics { + main: main_metrics, + clusters: BTreeMap::new(), + workers: workers_metrics, + }; + + if !self.options.workers && server.workers.len() > 1 { + aggregated_metrics.merge_cluster_metrics(); + } + client.finish_ok_with_content( - ContentType::Metrics(AggregatedMetrics { - main: main_metrics, - workers: workers_metrics, - }) - .into(), + ContentType::Metrics(aggregated_metrics).into(), "Successfully aggregated all metrics", ); } diff --git a/bin/src/ctl/command.rs b/bin/src/ctl/command.rs index c47f4b20d..7e28e6825 100644 --- a/bin/src/ctl/command.rs +++ b/bin/src/ctl/command.rs @@ -85,6 +85,7 @@ impl CommandManager { cluster_ids: Vec, backend_ids: Vec, no_clusters: bool, + workers: bool, ) -> Result<(), CtlError> { let request: Request = RequestType::QueryMetrics(QueryMetricsOptions { list, @@ -92,6 +93,7 @@ impl CommandManager { backend_ids, metric_names, no_clusters, + workers, }) .into(); diff --git a/bin/src/ctl/mod.rs b/bin/src/ctl/mod.rs index d89808266..a7e34674e 100644 --- a/bin/src/ctl/mod.rs +++ b/bin/src/ctl/mod.rs @@ -123,7 +123,16 @@ impl CommandManager { clusters, backends, no_clusters, - } => self.get_metrics(list, refresh, names, clusters, backends, no_clusters), + workers, + } => self.get_metrics( + list, + refresh, + names, + clusters, + backends, + no_clusters, + workers, + ), _ => self.configure_metrics(cmd), }, SubCmd::Logging { filter } => self.logging_filter(filter), diff --git a/command/src/command.proto b/command/src/command.proto index caa9a1fe2..5aef97b9b 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -441,6 +441,8 @@ message QueryMetricsOptions { repeated string metric_names = 4; // query only worker and main process metrics (no cluster metrics) required bool no_clusters = 5; + // display metrics of each worker, without flattening (takes more space) + required bool workers = 6; } // options to configure metrics collection @@ -576,8 +578,12 @@ message AvailableMetrics { // Aggregated metrics of main process & workers message AggregatedMetrics { + // metric_name -> metric_value map main = 1; + // worker_id -> worker_metrics map workers = 2; + // cluster_id -> cluster_metrics + map clusters = 3; } // All metrics of a worker: proxy and clusters @@ -593,7 +599,7 @@ message WorkerMetrics { message ClusterMetrics { // metric name -> metric value map cluster = 1; - // backend_id -> (metric name-> metric value) + // list of backends with their metrics repeated BackendMetrics backends = 2; } @@ -604,8 +610,11 @@ message BackendMetrics { message FilteredMetrics { oneof inner { + // increases or decrease depending on the state uint64 gauge = 1; + // increases only int64 count = 2; + // milliseconds uint64 time = 3; Percentiles percentiles = 4; FilteredTimeSerie time_serie = 5; diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index 72280f4c8..5e6a89f2b 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -245,6 +245,13 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ println!("\nWorker {worker_id}\n========="); print_worker_metrics(worker_metrics)?; } + + // clusters + if !aggregated_metrics.clusters.is_empty() { + println!("\nClusters\n======="); + print_cluster_metrics(&aggregated_metrics.clusters); + } + Ok(()) } diff --git a/command/src/proto/mod.rs b/command/src/proto/mod.rs index c8567afd5..b2e49d182 100644 --- a/command/src/proto/mod.rs +++ b/command/src/proto/mod.rs @@ -1,3 +1,6 @@ +use std::collections::BTreeMap; + +use command::{filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics}; use prost::DecodeError; /// Contains all types received by and sent from Sōzu @@ -37,3 +40,97 @@ impl From for command::Request { } } } + +impl AggregatedMetrics { + /// Merge cluster metrics that were received from several workers + /// + /// Each workers serves the same clusters and gathers metrics on them, + /// which means we have to reduce each metric from N instances to 1. + pub fn merge_cluster_metrics(&mut self) { + for (_worker_id, worker_metrics) in &mut self.workers { + // avoid copying the cluster metrics by taking them + let clusters = std::mem::take(&mut worker_metrics.clusters); + + for (cluster_id, mut cluster_metrics) in clusters { + for (metric_name, new_value) in cluster_metrics.cluster { + if !new_value.is_mergeable() { + continue; + } + self.clusters + .entry(cluster_id.to_owned()) + .and_modify(|cluster| { + cluster + .cluster + .entry(metric_name.clone()) + .and_modify(|old_value| old_value.merge(&new_value)) + .or_insert(new_value.clone()); + }) + .or_insert(ClusterMetrics { + cluster: BTreeMap::from([(metric_name, new_value)]), + backends: Vec::new(), + }); + } + + for backend in cluster_metrics.backends.drain(..) { + for (metric_name, new_value) in &backend.metrics { + if !new_value.is_mergeable() { + continue; + } + self.clusters + .entry(cluster_id.to_owned()) + .and_modify(|cluster| { + let found_backend = cluster + .backends + .iter_mut() + .find(|present| present.backend_id == backend.backend_id); + + let Some(existing_backend) = found_backend else { + cluster.backends.push(backend.clone()); + return; + }; + + let _ = existing_backend + .metrics + .entry(metric_name.clone()) + .and_modify(|old_value| old_value.merge(&new_value)) + .or_insert(new_value.to_owned()); + }) + .or_insert(ClusterMetrics { + cluster: BTreeMap::new(), + backends: vec![backend.clone()], + }); + } + } + } + } + } +} + +impl FilteredMetrics { + pub fn merge(&mut self, right: &Self) { + match (&self.inner, &right.inner) { + (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => { + *self = Self { + inner: Some(Inner::Gauge(a + b)), + }; + } + (Some(Inner::Count(a)), Some(Inner::Count(b))) => { + *self = Self { + inner: Some(Inner::Count(a + b)), + }; + } + _ => {} + } + } + + fn is_mergeable(&self) -> bool { + match &self.inner { + Some(Inner::Gauge(_)) | Some(Inner::Count(_)) => true, + // Inner::Time and Inner::Timeserie are never used in Sōzu + Some(Inner::Time(_)) + | Some(Inner::Percentiles(_)) + | Some(Inner::TimeSerie(_)) + | None => false, + } + } +} diff --git a/lib/src/metrics/local_drain.rs b/lib/src/metrics/local_drain.rs index af91a7fca..0b0574ce8 100644 --- a/lib/src/metrics/local_drain.rs +++ b/lib/src/metrics/local_drain.rs @@ -253,6 +253,7 @@ impl LocalDrain { backend_ids, list, no_clusters, + workers: _workers, } = options; if *list { From 6b4b96b3e7e9171c2da546450894b0b4361b658d Mon Sep 17 00:00:00 2001 From: Emmanuel Bosquet Date: Mon, 8 Jul 2024 17:21:22 +0200 Subject: [PATCH 2/2] merge proxying metrics accross workers --- bin/src/cli.rs | 2 +- bin/src/command/requests.rs | 3 ++- command/src/command.proto | 6 ++++++ command/src/proto/display.rs | 13 ++++++++++--- command/src/proto/mod.rs | 27 +++++++++++++++++++-------- 5 files changed, 38 insertions(+), 13 deletions(-) diff --git a/bin/src/cli.rs b/bin/src/cli.rs index cb0aaa3c1..61871da46 100644 --- a/bin/src/cli.rs +++ b/bin/src/cli.rs @@ -231,7 +231,7 @@ pub enum MetricsCmd { #[clap( short = 'w', long = "workers", - help = "display metrics of each worker, without flattening by cluster id (takes more space)" + help = "display metrics of each worker, without merging by metric name or cluster id (takes more space)" )] workers: bool, }, diff --git a/bin/src/command/requests.rs b/bin/src/command/requests.rs index 9938a686f..c69e0b901 100644 --- a/bin/src/command/requests.rs +++ b/bin/src/command/requests.rs @@ -583,10 +583,11 @@ impl GatheringTask for QueryMetricsTask { main: main_metrics, clusters: BTreeMap::new(), workers: workers_metrics, + proxying: BTreeMap::new(), }; if !self.options.workers && server.workers.len() > 1 { - aggregated_metrics.merge_cluster_metrics(); + aggregated_metrics.merge_metrics(); } client.finish_ok_with_content( diff --git a/command/src/command.proto b/command/src/command.proto index 5aef97b9b..2d60c2d82 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -578,12 +578,18 @@ message AvailableMetrics { // Aggregated metrics of main process & workers message AggregatedMetrics { + // metrics about the main process. // metric_name -> metric_value map main = 1; + // details of worker metrics, with clusters and backends. // worker_id -> worker_metrics map workers = 2; + // if present, contains metrics of clusters and their backends, merged across all workers. // cluster_id -> cluster_metrics map clusters = 3; + // if present, proxying metrics, merged accross all workers. + // metric_name -> metric_value + map proxying = 4; } // All metrics of a worker: proxy and clusters diff --git a/command/src/proto/display.rs b/command/src/proto/display.rs index 5e6a89f2b..98308b0a3 100644 --- a/command/src/proto/display.rs +++ b/command/src/proto/display.rs @@ -240,10 +240,17 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ println!("\nMAIN PROCESS\n============"); print_proxy_metrics(&aggregated_metrics.main); + if aggregated_metrics.proxying.len() != 0 { + println!("\nPROXYING\n============"); + print_proxy_metrics(&aggregated_metrics.proxying); + } + // workers - for (worker_id, worker_metrics) in aggregated_metrics.workers.iter() { - println!("\nWorker {worker_id}\n========="); - print_worker_metrics(worker_metrics)?; + for (worker_id, worker) in aggregated_metrics.workers.iter() { + if worker.clusters.len() != 0 && worker.proxy.len() != 0 { + println!("\nWorker {worker_id}\n========="); + print_worker_metrics(worker)?; + } } // clusters diff --git a/command/src/proto/mod.rs b/command/src/proto/mod.rs index b2e49d182..21bdca653 100644 --- a/command/src/proto/mod.rs +++ b/command/src/proto/mod.rs @@ -42,16 +42,27 @@ impl From for command::Request { } impl AggregatedMetrics { - /// Merge cluster metrics that were received from several workers + /// Merge metrics that were received from several workers /// - /// Each workers serves the same clusters and gathers metrics on them, - /// which means we have to reduce each metric from N instances to 1. - pub fn merge_cluster_metrics(&mut self) { - for (_worker_id, worker_metrics) in &mut self.workers { - // avoid copying the cluster metrics by taking them - let clusters = std::mem::take(&mut worker_metrics.clusters); + /// Each worker gather the same kind of metrics, + /// for its own proxying logic, and for the same clusters with their backends. + /// This means we have to reduce each metric from N instances to 1. + pub fn merge_metrics(&mut self) { + // avoid copying the worker metrics, by taking them + let workers = std::mem::take(&mut self.workers); - for (cluster_id, mut cluster_metrics) in clusters { + for (_worker_id, worker) in workers { + for (metric_name, new_value) in worker.proxy { + if !new_value.is_mergeable() { + continue; + } + self.proxying + .entry(metric_name) + .and_modify(|old_value| old_value.merge(&new_value)) + .or_insert(new_value); + } + + for (cluster_id, mut cluster_metrics) in worker.clusters { for (metric_name, new_value) in cluster_metrics.cluster { if !new_value.is_mergeable() { continue;