Skip to content

Commit

Permalink
chore(host_metrics source): Simplify internal collection of metrics (#…
Browse files Browse the repository at this point in the history
…13735)

All of the metrics out of this source were collected into separate
`Vec`s and then pushed into the same result `Vec`. This change creates a
result buffer and pushes all the metrics into that result. This ends up
allowing for simpler code in a number of places.
  • Loading branch information
bruceg authored Jul 28, 2022
1 parent 1cc0520 commit 3b0f45c
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 561 deletions.
28 changes: 21 additions & 7 deletions src/api/schema/metrics/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,41 +274,55 @@ impl HostMetrics {
impl HostMetrics {
/// Memory metrics
async fn memory(&self) -> MemoryMetrics {
MemoryMetrics(self.0.memory_metrics().await)
let mut buffer = self.0.buffer();
self.0.memory_metrics(&mut buffer).await;
MemoryMetrics(buffer.metrics)
}

/// Swap metrics
async fn swap(&self) -> SwapMetrics {
SwapMetrics(self.0.swap_metrics().await)
let mut buffer = self.0.buffer();
self.0.swap_metrics(&mut buffer).await;
SwapMetrics(buffer.metrics)
}

/// CPU metrics
async fn cpu(&self) -> CpuMetrics {
CpuMetrics(self.0.cpu_metrics().await)
let mut buffer = self.0.buffer();
self.0.cpu_metrics(&mut buffer).await;
CpuMetrics(buffer.metrics)
}

/// Load average metrics (*nix only)
async fn load_average(&self) -> Option<LoadAverageMetrics> {
if cfg!(unix) {
Some(LoadAverageMetrics(self.0.loadavg_metrics().await))
let mut buffer = self.0.buffer();
self.0.loadavg_metrics(&mut buffer).await;
Some(LoadAverageMetrics(buffer.metrics))
} else {
None
}
}

/// Network metrics
async fn network(&self) -> NetworkMetrics {
NetworkMetrics(self.0.network_metrics().await)
let mut buffer = self.0.buffer();
self.0.network_metrics(&mut buffer).await;
NetworkMetrics(buffer.metrics)
}

/// Filesystem metrics
async fn filesystem(&self) -> FileSystemMetrics {
FileSystemMetrics(self.0.filesystem_metrics().await)
let mut buffer = self.0.buffer();
self.0.filesystem_metrics(&mut buffer).await;
FileSystemMetrics(buffer.metrics)
}

/// Disk metrics
async fn disk(&self) -> DiskMetrics {
DiskMetrics(self.0.disk_metrics().await)
let mut buffer = self.0.buffer();
self.0.disk_metrics(&mut buffer).await;
DiskMetrics(buffer.metrics)
}
}

Expand Down
107 changes: 45 additions & 62 deletions src/sources/host_metrics/cgroups.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{io, num::ParseIntError, path::Path, path::PathBuf, str::FromStr};

use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use snafu::{ResultExt, Snafu};
use tokio::{
Expand All @@ -10,8 +9,8 @@ use tokio::{
use vector_common::btreemap;
use vector_config::configurable_component;

use super::{filter_result_sync, FilterList, HostMetrics};
use crate::event::metric::{Metric, MetricTags};
use super::{filter_result_sync, FilterList, HostMetrics, MetricsBuffer};
use crate::event::metric::MetricTags;

const MICROSECONDS: f64 = 1.0 / 1_000_000.0;

Expand Down Expand Up @@ -62,50 +61,45 @@ enum CGroupsError {
type CGroupsResult<T> = Result<T, CGroupsError>;

impl HostMetrics {
pub(super) async fn cgroups_metrics(&self) -> Vec<Metric> {
match &self.root_cgroup {
Some(root) => {
let mut recurser = CGroupRecurser::new(self);
match &root.mode {
Mode::Modern(base) => recurser.scan_modern(root, base).await,
Mode::Legacy(base) => recurser.scan_legacy(root, base).await,
Mode::Hybrid(v1base, v2base) => {
// Hybrid cgroups contain both legacy and modern cgroups, so scan them both
// for the data files. The `cpu` controller is usually found in the modern
// groups, but the top-level stats are found under the legacy controller in
// some setups. Similarly, the `memory` controller can be found in either
// location. As such, detecting exactly where to scan for the controllers
// doesn't work, so opportunistically scan for any controller files in all
// subdirectories of the given root.
recurser.scan_legacy(root, v1base).await;
recurser.scan_modern(root, v2base).await;
}
pub(super) async fn cgroups_metrics(&self, output: &mut MetricsBuffer) {
if let Some(root) = &self.root_cgroup {
output.name = "cgroups";
let mut recurser = CGroupRecurser::new(self, output);
match &root.mode {
Mode::Modern(base) => recurser.scan_modern(root, base).await,
Mode::Legacy(base) => recurser.scan_legacy(root, base).await,
Mode::Hybrid(v1base, v2base) => {
// Hybrid cgroups contain both legacy and modern cgroups, so scan them both
// for the data files. The `cpu` controller is usually found in the modern
// groups, but the top-level stats are found under the legacy controller in
// some setups. Similarly, the `memory` controller can be found in either
// location. As such, detecting exactly where to scan for the controllers
// doesn't work, so opportunistically scan for any controller files in all
// subdirectories of the given root.
recurser.scan_legacy(root, v1base).await;
recurser.scan_modern(root, v2base).await;
}
recurser.output
}
None => Vec::default(),
}
}
}

struct CGroupRecurser<'a> {
host: &'a HostMetrics,
now: DateTime<Utc>,
output: Vec<Metric>,
output: &'a mut MetricsBuffer,
buffer: String,
load_cpu: bool,
load_memory: bool,
config: &'a CGroupsConfig,
}

impl<'a> CGroupRecurser<'a> {
fn new(host: &'a HostMetrics) -> Self {
fn new(host: &'a HostMetrics, output: &'a mut MetricsBuffer) -> Self {
Self {
host,
now: Utc::now(),
output: Vec::new(),
output,
buffer: String::new(),
load_cpu: true,
load_memory: true,
config: &host.config.cgroups,
}
}

Expand Down Expand Up @@ -150,8 +144,8 @@ impl<'a> CGroupRecurser<'a> {
self.load_memory(&cgroup, &tags).await;
}

if level < self.host.config.cgroups.levels {
let groups = &self.host.config.cgroups.groups;
if level < self.config.levels {
let groups = &self.config.groups;
if let Some(children) =
filter_result_sync(cgroup.children().await, "Failed to load cgroups children.")
{
Expand All @@ -171,24 +165,21 @@ impl<'a> CGroupRecurser<'a> {
cgroup.load_cpu(&mut self.buffer).await,
"Failed to load cgroups CPU statistics.",
) {
self.output.push(self.host.counter(
self.output.counter(
"cgroup_cpu_usage_seconds_total",
self.now,
cpu.usage_usec as f64 * MICROSECONDS,
tags.clone(),
));
self.output.push(self.host.counter(
);
self.output.counter(
"cgroup_cpu_user_seconds_total",
self.now,
cpu.user_usec as f64 * MICROSECONDS,
tags.clone(),
));
self.output.push(self.host.counter(
);
self.output.counter(
"cgroup_cpu_system_seconds_total",
self.now,
cpu.system_usec as f64 * MICROSECONDS,
tags.clone(),
));
);
}
}

Expand All @@ -198,30 +189,18 @@ impl<'a> CGroupRecurser<'a> {
cgroup.load_memory_current(&mut self.buffer).await,
"Failed to load cgroups current memory.",
) {
self.output.push(self.host.gauge(
"cgroup_memory_current_bytes",
self.now,
current as f64,
tags.clone(),
));
self.output
.gauge("cgroup_memory_current_bytes", current as f64, tags.clone());
}

if let Some(Some(stat)) = filter_result_sync(
cgroup.load_memory_stat(&mut self.buffer).await,
"Failed to load cgroups memory statistics.",
) {
self.output.push(self.host.gauge(
"cgroup_memory_anon_bytes",
self.now,
stat.anon as f64,
tags.clone(),
));
self.output.push(self.host.gauge(
"cgroup_memory_file_bytes",
self.now,
stat.file as f64,
tags.clone(),
));
self.output
.gauge("cgroup_memory_anon_bytes", stat.anon as f64, tags.clone());
self.output
.gauge("cgroup_memory_file_bytes", stat.file as f64, tags.clone());
}
}
}
Expand Down Expand Up @@ -490,7 +469,7 @@ mod tests {
tests::{count_name, count_tag},
HostMetrics, HostMetricsConfig,
},
join_name, join_path,
join_name, join_path, MetricsBuffer,
};

#[test]
Expand All @@ -506,7 +485,9 @@ mod tests {
#[tokio::test]
async fn generates_cgroups_metrics() {
let config: HostMetricsConfig = toml::from_str(r#"collectors = ["cgroups"]"#).unwrap();
let metrics = HostMetrics::new(config).cgroups_metrics().await;
let mut buffer = MetricsBuffer::new(None);
HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
let metrics = buffer.metrics;

assert!(!metrics.is_empty());
assert_eq!(count_tag(&metrics, "cgroup"), metrics.len());
Expand Down Expand Up @@ -619,7 +600,9 @@ mod tests {
"#
))
.unwrap();
let metrics = HostMetrics::new(config).cgroups_metrics().await;
let mut buffer = MetricsBuffer::new(None);
HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
let metrics = buffer.metrics;

assert_ne!(metrics.len(), 0);

Expand Down
98 changes: 29 additions & 69 deletions src/sources/host_metrics/cpu.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,41 @@
use std::collections::BTreeMap;

use chrono::Utc;
use futures::{stream, StreamExt};
use futures::StreamExt;
#[cfg(target_os = "linux")]
use heim::cpu::os::linux::CpuTimeExt;
use heim::units::time::second;

use super::{filter_result, HostMetrics};
use crate::event::metric::Metric;

const NAME: &str = "cpu_seconds_total";

impl HostMetrics {
pub async fn cpu_metrics(&self) -> Vec<Metric> {
pub async fn cpu_metrics(&self, output: &mut super::MetricsBuffer) {
match heim::cpu::times().await {
Ok(times) => {
times
let times: Vec<_> = times
.filter_map(|result| filter_result(result, "Failed to load/parse CPU time."))
.enumerate()
.map(|(index, times)| {
let timestamp = Utc::now();
let name = "cpu_seconds_total";
stream::iter(
vec![
self.counter(
name,
timestamp,
times.idle().get::<second>(),
BTreeMap::from([
(String::from("mode"), String::from("idle")),
(String::from("cpu"), index.to_string()),
]),
),
#[cfg(target_os = "linux")]
self.counter(
name,
timestamp,
times.nice().get::<second>(),
BTreeMap::from([
(String::from("mode"), String::from("nice")),
(String::from("cpu"), index.to_string()),
]),
),
#[cfg(target_os = "linux")]
self.counter(
name,
timestamp,
times.io_wait().get::<second>(),
BTreeMap::from([
(String::from("mode"), String::from("io_wait")),
(String::from("cpu"), index.to_string()),
]),
),
self.counter(
name,
timestamp,
times.system().get::<second>(),
BTreeMap::from([
(String::from("mode"), String::from("system")),
(String::from("cpu"), index.to_string()),
]),
),
self.counter(
name,
timestamp,
times.user().get::<second>(),
BTreeMap::from([
(String::from("mode"), String::from("user")),
(String::from("cpu"), index.to_string()),
]),
),
]
.into_iter(),
)
})
.flatten()
.collect::<Vec<_>>()
.await
.collect()
.await;
output.name = "cpu";
for (index, times) in times.into_iter().enumerate() {
let tags = |name: &str| {
BTreeMap::from([
(String::from("mode"), String::from(name)),
(String::from("cpu"), index.to_string()),
])
};
output.counter(NAME, times.idle().get::<second>(), tags("idle"));
#[cfg(target_os = "linux")]
output.counter(NAME, times.io_wait().get::<second>(), tags("io_wait"));
#[cfg(target_os = "linux")]
output.counter(NAME, times.nice().get::<second>(), tags("nice"));
output.counter(NAME, times.system().get::<second>(), tags("system"));
output.counter(NAME, times.user().get::<second>(), tags("user"));
}
}
Err(error) => {
error!(message = "Failed to load CPU times.", %error, internal_log_rate_secs = 60);
vec![]
}
}
}
Expand All @@ -88,14 +45,17 @@ impl HostMetrics {
mod tests {
use super::super::{
tests::{all_counters, count_name, count_tag},
HostMetrics, HostMetricsConfig,
HostMetrics, HostMetricsConfig, MetricsBuffer,
};

#[tokio::test]
async fn generates_cpu_metrics() {
let metrics = HostMetrics::new(HostMetricsConfig::default())
.cpu_metrics()
let mut buffer = MetricsBuffer::new(None);
HostMetrics::new(HostMetricsConfig::default())
.cpu_metrics(&mut buffer)
.await;
let metrics = buffer.metrics;

assert!(!metrics.is_empty());
assert!(all_counters(&metrics));

Expand Down
Loading

0 comments on commit 3b0f45c

Please sign in to comment.