Skip to content

Commit

Permalink
Add metrics for number of inodes (#781)
Browse files Browse the repository at this point in the history
* Make metrics gauges remember if they've changed

This is a small change to their behavior: they'll now emit if they're
changed to zero, whereas before they would not. But I think that's the
behavior we want anyway.

Signed-off-by: James Bornholt <bornholt@amazon.com>

* Convert absolute counters to gauges

We were abusing absolute counters to get the "only emit when changed"
behavior, which gauges now provide too.

Signed-off-by: James Bornholt <bornholt@amazon.com>

* Add metrics for number of inodes

We track the total number of inodes as well as tracking them by kind.

Signed-off-by: James Bornholt <bornholt@amazon.com>

---------

Signed-off-by: James Bornholt <bornholt@amazon.com>
  • Loading branch information
jamesbornholt authored Feb 25, 2024
1 parent 926524c commit 6d7c194
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 38 deletions.
36 changes: 18 additions & 18 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,39 +607,39 @@ impl S3CrtClientInner {

fn poll_client_metrics(s3_client: &Client) {
let metrics = s3_client.poll_client_metrics();
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_requests_being_processed",
metrics.num_requests_tracked_requests as u64
metrics.num_requests_tracked_requests as f64
);
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_requests_being_prepared",
metrics.num_requests_being_prepared as u64
metrics.num_requests_being_prepared as f64
);
metrics::absolute_counter!("s3.client.request_queue_size", metrics.request_queue_size as u64);
metrics::absolute_counter!(
metrics::gauge!("s3.client.request_queue_size", metrics.request_queue_size as f64);
metrics::gauge!(
"s3.client.num_auto_default_network_io",
metrics.num_auto_default_network_io as u64
metrics.num_auto_default_network_io as f64
);
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_auto_ranged_get_network_io",
metrics.num_auto_ranged_get_network_io as u64
metrics.num_auto_ranged_get_network_io as f64
);
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_auto_ranged_put_network_io",
metrics.num_auto_ranged_put_network_io as u64
metrics.num_auto_ranged_put_network_io as f64
);
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_auto_ranged_copy_network_io",
metrics.num_auto_ranged_copy_network_io as u64
metrics.num_auto_ranged_copy_network_io as f64
);
metrics::absolute_counter!("s3.client.num_total_network_io", metrics.num_total_network_io() as u64);
metrics::absolute_counter!(
metrics::gauge!("s3.client.num_total_network_io", metrics.num_total_network_io() as f64);
metrics::gauge!(
"s3.client.num_requests_stream_queued_waiting",
metrics.num_requests_stream_queued_waiting as u64
metrics.num_requests_stream_queued_waiting as f64
);
metrics::absolute_counter!(
metrics::gauge!(
"s3.client.num_requests_streaming_response",
metrics.num_requests_streaming_response as u64
metrics.num_requests_streaming_response as f64
);
}

Expand Down
48 changes: 40 additions & 8 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct Superblock {
#[derive(Debug)]
struct SuperblockInner {
bucket: String,
inodes: RwLock<HashMap<InodeNo, Inode>>,
inodes: RwLock<InodeMap>,
negative_cache: NegativeCache,
next_ino: AtomicU64,
mount_time: OffsetDateTime,
Expand Down Expand Up @@ -121,7 +121,7 @@ impl Superblock {
},
);

let mut inodes = HashMap::new();
let mut inodes = InodeMap::default();
inodes.insert(ROOT_INODE_NO, root);

let negative_cache = NegativeCache::new(config.cache_config.negative_cache_size, config.cache_config.file_ttl);
Expand Down Expand Up @@ -1023,12 +1023,7 @@ impl SuperblockInner {
is_new_file: bool,
) -> Result<Inode, InodeError> {
if !valid_inode_name(name) {
let kind = if kind == InodeKind::Directory {
"directory"
} else {
"file"
};
warn!(?name, "invalid file name; {} will not be available", kind);
warn!(?name, "invalid file name; {} will not be available", kind.as_str());
return Err(InodeError::InvalidFileName(OsString::from(name)));
}

Expand Down Expand Up @@ -1432,6 +1427,15 @@ pub enum InodeKind {
Directory,
}

impl InodeKind {
fn as_str(&self) -> &'static str {
match self {
InodeKind::File => "file",
InodeKind::Directory => "directory",
}
}
}

impl From<InodeKind> for FileType {
fn from(kind: InodeKind) -> Self {
match kind {
Expand Down Expand Up @@ -1571,6 +1575,34 @@ impl InodeStat {
}
}

/// A wrapper around a `HashMap<InodeNo, Inode>`` that just takes care of metrics when inodes are
/// added or removed.
#[derive(Debug, Default)]
struct InodeMap {
map: HashMap<InodeNo, Inode>,
}

impl InodeMap {
fn get(&self, ino: &InodeNo) -> Option<&Inode> {
self.map.get(ino)
}

fn insert(&mut self, ino: InodeNo, inode: Inode) -> Option<Inode> {
metrics::increment_gauge!("fs.inodes", 1.0);
metrics::increment_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str());
self.map.insert(ino, inode).inspect(Self::remove_metrics)
}

fn remove(&mut self, ino: &InodeNo) -> Option<Inode> {
self.map.remove(ino).inspect(Self::remove_metrics)
}

fn remove_metrics(inode: &Inode) {
metrics::decrement_gauge!("fs.inodes", 1.0);
metrics::decrement_gauge!("fs.inode_kinds", 1.0, "kind" => inode.kind().as_str());
}
}

#[derive(Debug, Error)]
pub enum InodeError {
#[error("error from ObjectClient")]
Expand Down
11 changes: 6 additions & 5 deletions mountpoint-s3/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
}
Metric::Gauge(inner) => {
assert_eq!(key.name(), TEST_GAUGE);
let value = inner.load();
let value = inner.load_if_changed();
let label = key.labels().next().unwrap();
if label == &Label::new("type", "processing") {
assert_eq!(value, Some(2.0));
Expand Down Expand Up @@ -271,21 +271,22 @@ mod tests {
let metric = entry.value_mut();
match metric {
Metric::Counter(inner) => assert!(inner.load_and_reset().is_none()),
// Gauges don't reset on their own
Metric::Gauge(inner) => assert!(inner.load().is_some()),
Metric::Gauge(inner) => assert!(inner.load_if_changed().is_none()),
Metric::Histogram(inner) => assert!(inner.run_and_reset(|_| panic!("unreachable")).is_none()),
}
}

// Set the gauges to zero and check they're no longer emitted
// Set the gauges to zero and check they emit their change only once
metrics::gauge!(TEST_GAUGE, 0.0, "type" => "processing");
metrics::gauge!(TEST_GAUGE, 0.0, "type" => "in_queue");
for mut entry in sink.metrics.iter_mut() {
let metric = entry.value_mut();
let Metric::Gauge(inner) = metric else {
continue;
};
assert!(inner.load().is_none());
// We want to emit once to reflect that it's changed to 0, and then not emit again
assert!(inner.load_if_changed().is_some());
assert!(inner.load_if_changed().is_none());
}
}
}
Expand Down
18 changes: 11 additions & 7 deletions mountpoint-s3/src/metrics/data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use crate::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use crate::sync::{Arc, Mutex};

/// A single metric
Expand Down Expand Up @@ -56,7 +56,7 @@ impl Metric {
}
}
// Gauges can't reset because they can be incremented/decremented
Metric::Gauge(inner) => inner.load().map(|value| format!("{}", value)),
Metric::Gauge(inner) => inner.load_if_changed().map(|value| format!("{}", value)),
Metric::Histogram(histogram) => histogram.run_and_reset(|histogram| {
format!(
"n={}: min={} p10={} p50={} avg={:.2} p90={} p99={} p99.9={} max={}",
Expand Down Expand Up @@ -112,6 +112,7 @@ impl ValueAndCount {
#[derive(Debug, Default)]
pub struct AtomicGauge {
bits: AtomicU64,
changed: AtomicBool,
}

impl metrics::GaugeFn for AtomicGauge {
Expand All @@ -135,14 +136,17 @@ impl AtomicGauge {
Some(f(f64::from_bits(old_bits)).to_bits())
})
.expect("closure always returns Some");
self.changed.store(true, Ordering::SeqCst);
}

pub fn load(&self) -> Option<f64> {
let value = f64::from_bits(self.bits.load(Ordering::SeqCst));
if value == 0.0 {
None
/// Return the current value of this gauge if it has changed since the last call to this method.
/// Note that "changed" just means another `gauge!()` call has occurred; the actual value may
/// still be the same.
pub fn load_if_changed(&self) -> Option<f64> {
if self.changed.swap(false, Ordering::SeqCst) {
Some(f64::from_bits(self.bits.load(Ordering::SeqCst)))
} else {
Some(value)
None
}
}
}
Expand Down

0 comments on commit 6d7c194

Please sign in to comment.