Skip to content

Commit

Permalink
Add metrics to compression and existence cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
blakehatch committed Feb 1, 2024
1 parent 5470857 commit ed24b13
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
38 changes: 38 additions & 0 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size};
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::{DigestInfo, JoinHandleDropGuard};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -195,6 +196,38 @@ impl UploadState {
}
}

impl MetricsComponent for UploadState {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish("max_output_size", &self.max_output_size, "");
c.publish("input_max_size", &self.input_max_size, "");
c.publish("header_version", &self.header.version, "");
let exact_size = match self.header.upload_size {
UploadSizeInfo::ExactSize(sz) => sz,
UploadSizeInfo::MaxSize(_) => 0, // Default to 0 if not exact size
};
let max_size = match self.header.upload_size {
UploadSizeInfo::MaxSize(sz) => sz,
UploadSizeInfo::ExactSize(sz) => sz, // Use exact size if max size is not specified
};
c.publish("header_exact_size", &exact_size, "");
c.publish("header_max_size", &max_size, "");
let footer_indexes: Vec<String> = self
.footer
.indexes
.iter()
.map(|index| index.position_from_prev_index.to_string())
.collect();
let footer_indexes_str = footer_indexes.join(", ");
c.publish("footer_indexes", &footer_indexes_str, "");
c.publish("footer_index_count", &self.footer.index_count.to_string(), "");
c.publish(
"footer_uncompressed_data_size",
&self.footer.uncompressed_data_size.to_string(),
"",
);
}
}

/// This store will compress data before sending it on to the inner store.
/// Note: Currently using get_part() and trying to read part of the data will
/// result in the entire contents being read from the inner store but will
Expand Down Expand Up @@ -583,4 +616,9 @@ impl Store for CompressionStore {
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let inner_store_registry = registry.sub_registry_with_prefix("inner_store");
self.inner_store.clone().register_metrics(inner_store_registry);
}
}
12 changes: 12 additions & 0 deletions nativelink-store/src/existence_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use nativelink_error::{error_if, Error, ResultExt};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::common::DigestInfo;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::metrics_utils::{CollectorState, MetricsComponent, Registry};
use nativelink_util::store_trait::{Store, UploadSizeInfo};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -185,4 +186,15 @@ impl Store for ExistenceCacheStore {
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
Box::new(self)
}

fn register_metrics(self: Arc<Self>, registry: &mut Registry) {
let inner_store_registry = registry.sub_registry_with_prefix("inner_store");
self.inner_store.clone().register_metrics(inner_store_registry);
}
}

impl MetricsComponent for ExistenceCacheStore {
fn gather_metrics(&self, c: &mut CollectorState) {
self.existence_cache.gather_metrics(c)
}
}

0 comments on commit ed24b13

Please sign in to comment.