Skip to content

Commit

Permalink
merge cluster metrics across workers
Browse files Browse the repository at this point in the history
The default behaviour is now to take all metrics from the
AggregatedMetrics object, merge them by cluster id, and
move them to the field AggregatedMetrics::clusters.
The behaviour is avoidable with the option "metrics get --workers"

Note: the function merge_cluster_metrics
does not merge time metrics (percentiles), they are discarded.
Prometheus histograms should be mergeable though, if bucket size is
consistent.
  • Loading branch information
Keksoj committed Jul 9, 2024
1 parent 4831ed2 commit 5deac5d
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 8 deletions.
6 changes: 6 additions & 0 deletions bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ pub enum MetricsCmd {
help = "get only the metrics of main process and workers (no cluster metrics)"
)]
no_clusters: bool,
#[clap(
short = 'w',
long = "workers",
help = "display metrics of each worker, without flattening by cluster id (takes more space)"
)]
workers: bool,
},
}

Expand Down
18 changes: 12 additions & 6 deletions bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl GatheringTask for QueryMetricsTask {

fn on_finish(
self: Box<Self>,
_server: &mut Server,
server: &mut Server,
client: &mut OptionalClient,
_timed_out: bool,
) {
Expand Down Expand Up @@ -579,12 +579,18 @@ impl GatheringTask for QueryMetricsTask {
)
.collect();

let mut aggregated_metrics = AggregatedMetrics {
main: main_metrics,
clusters: BTreeMap::new(),
workers: workers_metrics,
};

if !self.options.workers && server.workers.len() > 1 {
aggregated_metrics.merge_cluster_metrics();
}

client.finish_ok_with_content(
ContentType::Metrics(AggregatedMetrics {
main: main_metrics,
workers: workers_metrics,
})
.into(),
ContentType::Metrics(aggregated_metrics).into(),
"Successfully aggregated all metrics",
);
}
Expand Down
2 changes: 2 additions & 0 deletions bin/src/ctl/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ impl CommandManager {
cluster_ids: Vec<String>,
backend_ids: Vec<String>,
no_clusters: bool,
workers: bool,
) -> Result<(), CtlError> {
let request: Request = RequestType::QueryMetrics(QueryMetricsOptions {
list,
cluster_ids,
backend_ids,
metric_names,
no_clusters,
workers,
})
.into();

Expand Down
11 changes: 10 additions & 1 deletion bin/src/ctl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ impl CommandManager {
clusters,
backends,
no_clusters,
} => self.get_metrics(list, refresh, names, clusters, backends, no_clusters),
workers,
} => self.get_metrics(
list,
refresh,
names,
clusters,
backends,
no_clusters,
workers,
),
_ => self.configure_metrics(cmd),
},
SubCmd::Logging { filter } => self.logging_filter(filter),
Expand Down
11 changes: 10 additions & 1 deletion command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ message QueryMetricsOptions {
repeated string metric_names = 4;
// query only worker and main process metrics (no cluster metrics)
required bool no_clusters = 5;
// display metrics of each worker, without flattening (takes more space)
required bool workers = 6;
}

// options to configure metrics collection
Expand Down Expand Up @@ -576,8 +578,12 @@ message AvailableMetrics {

// Aggregated metrics of main process & workers
message AggregatedMetrics {
// metric_name -> metric_value
map<string, FilteredMetrics> main = 1;
// worker_id -> worker_metrics
map<string, WorkerMetrics> workers = 2;
// cluster_id -> cluster_metrics
map<string, ClusterMetrics> clusters = 3;
}

// All metrics of a worker: proxy and clusters
Expand All @@ -593,7 +599,7 @@ message WorkerMetrics {
message ClusterMetrics {
// metric name -> metric value
map<string, FilteredMetrics> cluster = 1;
// backend_id -> (metric name-> metric value)
// list of backends with their metrics
repeated BackendMetrics backends = 2;
}

Expand All @@ -604,8 +610,11 @@ message BackendMetrics {

message FilteredMetrics {
oneof inner {
// increases or decrease depending on the state
uint64 gauge = 1;
// increases only
int64 count = 2;
// milliseconds
uint64 time = 3;
Percentiles percentiles = 4;
FilteredTimeSerie time_serie = 5;
Expand Down
7 changes: 7 additions & 0 deletions command/src/proto/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(worker_metrics)?;
}

// clusters
if !aggregated_metrics.clusters.is_empty() {
println!("\nClusters\n=======");
print_cluster_metrics(&aggregated_metrics.clusters);
}

Ok(())
}

Expand Down
97 changes: 97 additions & 0 deletions command/src/proto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::collections::BTreeMap;

use command::{filtered_metrics::Inner, AggregatedMetrics, ClusterMetrics, FilteredMetrics};
use prost::DecodeError;

/// Contains all types received by and sent from Sōzu
Expand Down Expand Up @@ -37,3 +40,97 @@ impl From<command::request::RequestType> for command::Request {
}
}
}

impl AggregatedMetrics {
/// Merge cluster metrics that were received from several workers
///
/// Each workers serves the same clusters and gathers metrics on them,
/// which means we have to reduce each metric from N instances to 1.
pub fn merge_cluster_metrics(&mut self) {
for (_worker_id, worker_metrics) in &mut self.workers {
// avoid copying the cluster metrics by taking them
let clusters = std::mem::take(&mut worker_metrics.clusters);

for (cluster_id, mut cluster_metrics) in clusters {
for (metric_name, new_value) in cluster_metrics.cluster {
if !new_value.is_mergeable() {
continue;
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
cluster
.cluster
.entry(metric_name.clone())
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.clone());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::from([(metric_name, new_value)]),
backends: Vec::new(),
});
}

for backend in cluster_metrics.backends.drain(..) {
for (metric_name, new_value) in &backend.metrics {
if !new_value.is_mergeable() {
continue;
}
self.clusters
.entry(cluster_id.to_owned())
.and_modify(|cluster| {
let found_backend = cluster
.backends
.iter_mut()
.find(|present| present.backend_id == backend.backend_id);

let Some(existing_backend) = found_backend else {
cluster.backends.push(backend.clone());
return;
};

let _ = existing_backend
.metrics
.entry(metric_name.clone())
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value.to_owned());
})
.or_insert(ClusterMetrics {
cluster: BTreeMap::new(),
backends: vec![backend.clone()],
});
}
}
}
}
}
}

impl FilteredMetrics {
pub fn merge(&mut self, right: &Self) {
match (&self.inner, &right.inner) {
(Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
*self = Self {
inner: Some(Inner::Gauge(a + b)),
};
}
(Some(Inner::Count(a)), Some(Inner::Count(b))) => {
*self = Self {
inner: Some(Inner::Count(a + b)),
};
}
_ => {}
}
}

fn is_mergeable(&self) -> bool {
match &self.inner {
Some(Inner::Gauge(_)) | Some(Inner::Count(_)) => true,
// Inner::Time and Inner::Timeserie are never used in Sōzu
Some(Inner::Time(_))
| Some(Inner::Percentiles(_))
| Some(Inner::TimeSerie(_))
| None => false,
}
}
}
1 change: 1 addition & 0 deletions lib/src/metrics/local_drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl LocalDrain {
backend_ids,
list,
no_clusters,
workers: _workers,
} = options;

if *list {
Expand Down

0 comments on commit 5deac5d

Please sign in to comment.