Skip to content

Commit

Permalink
merge proxying metrics accross workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Jul 9, 2024
1 parent 6de2bed commit 586b608
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub enum MetricsCmd {
#[clap(
short = 'w',
long = "workers",
help = "display metrics of each worker, without flattening by cluster id (takes more space)"
help = "display metrics of each worker, without merging by metric name or cluster id (takes more space)"
)]
workers: bool,
},
Expand Down
3 changes: 2 additions & 1 deletion bin/src/command/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,11 @@ impl GatheringTask for QueryMetricsTask {
main: main_metrics,
clusters: BTreeMap::new(),
workers: workers_metrics,
proxying: BTreeMap::new(),
};

if !self.options.workers && server.workers.len() > 1 {
aggregated_metrics.merge_cluster_metrics();
aggregated_metrics.merge_metrics();
}

client.finish_ok_with_content(
Expand Down
6 changes: 6 additions & 0 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,18 @@ message AvailableMetrics {

// Aggregated metrics of main process & workers
message AggregatedMetrics {
// metrics about the main process.
// metric_name -> metric_value
map<string, FilteredMetrics> main = 1;
// details of worker metrics, with clusters and backends.
// worker_id -> worker_metrics
map<string, WorkerMetrics> workers = 2;
// if present, contains metrics of clusters and their backends, merged across all workers.
// cluster_id -> cluster_metrics
map<string, ClusterMetrics> clusters = 3;
// if present, proxying metrics, merged accross all workers.
// metric_name -> metric_value
map<string, FilteredMetrics> proxying = 4;
}

// All metrics of a worker: proxy and clusters
Expand Down
13 changes: 10 additions & 3 deletions command/src/proto/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,17 @@ pub fn print_metrics(aggregated_metrics: &AggregatedMetrics) -> Result<(), Displ
println!("\nMAIN PROCESS\n============");
print_proxy_metrics(&aggregated_metrics.main);

if aggregated_metrics.proxying.len() != 0 {
println!("\nPROXYING\n============");
print_proxy_metrics(&aggregated_metrics.proxying);
}

// workers
for (worker_id, worker_metrics) in aggregated_metrics.workers.iter() {
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(worker_metrics)?;
for (worker_id, worker) in aggregated_metrics.workers.iter() {
if worker.clusters.len() != 0 && worker.proxy.len() != 0 {
println!("\nWorker {worker_id}\n=========");
print_worker_metrics(worker)?;
}
}

// clusters
Expand Down
27 changes: 19 additions & 8 deletions command/src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,27 @@ impl From<command::request::RequestType> for command::Request {
}

impl AggregatedMetrics {
/// Merge cluster metrics that were received from several workers
/// Merge metrics that were received from several workers
///
/// Each workers serves the same clusters and gathers metrics on them,
/// which means we have to reduce each metric from N instances to 1.
pub fn merge_cluster_metrics(&mut self) {
for (_worker_id, worker_metrics) in &mut self.workers {
// avoid copying the cluster metrics by taking them
let clusters = std::mem::take(&mut worker_metrics.clusters);
/// Each worker gather the same kind of metrics,
/// for its own proxying logic, and for the same clusters with their backends.
/// This means we have to reduce each metric from N instances to 1.
pub fn merge_metrics(&mut self) {
// avoid copying the worker metrics, by taking them
let workers = std::mem::take(&mut self.workers);

for (cluster_id, mut cluster_metrics) in clusters {
for (_worker_id, worker) in workers {
for (metric_name, new_value) in worker.proxy {
if !new_value.is_mergeable() {
continue;
}
self.proxying
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}

for (cluster_id, mut cluster_metrics) in worker.clusters {
for (metric_name, new_value) in cluster_metrics.cluster {
if !new_value.is_mergeable() {
continue;
Expand Down

0 comments on commit 586b608

Please sign in to comment.