Skip to content

Commit

Permalink
Add metrics to LocalWorker and RunningActionsManager
Browse files Browse the repository at this point in the history
Adds the states to LocalWorker and RunningActionManager
and adds a bunch of framework to make publishing stats
about futures and other related items easier.
  • Loading branch information
allada committed Jul 20, 2023
1 parent 87bd0e6 commit f0a526b
Show file tree
Hide file tree
Showing 13 changed files with 834 additions and 289 deletions.
161 changes: 96 additions & 65 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use axum::Router;
Expand Down Expand Up @@ -64,54 +65,60 @@ struct Args {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut root_metrics_registry = <Registry>::with_prefix("turbo_cache");

let args = Args::parse();
// Note: We cannot mutate args, so we create another variable for it here.
let mut config_file = args.config_file;
if config_file.eq(DEFAULT_CONFIG_FILE) {
let r = Runfiles::create().err_tip(|| "Failed to create runfiles lookup object")?;
config_file = r
.rlocation("turbo_cache/config/examples/basic_cas.json")
.into_os_string()
.into_string()
.unwrap();
}

env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.format_timestamp_millis()
.init();
let cfg: CasConfig = {
let args = Args::parse();
// Note: We cannot mutate args, so we create another variable for it here.
let mut config_file = args.config_file;
if config_file.eq(DEFAULT_CONFIG_FILE) {
let r = Runfiles::create().err_tip(|| "Failed to create runfiles lookup object")?;
config_file = r
.rlocation("turbo_cache/config/examples/basic_cas.json")
.into_os_string()
.into_string()
.unwrap();
}

let json_contents = String::from_utf8(
tokio::fs::read(&config_file)
.await
.err_tip(|| format!("Could not open config file {}", config_file))?,
)?;
let cfg: CasConfig = json5::from_str(&json_contents)?;
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn"))
.format_timestamp_millis()
.init();

// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
let global_cfg = if let Some(mut global_cfg) = cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
}
let json_contents = String::from_utf8(
tokio::fs::read(&config_file)
.await
.err_tip(|| format!("Could not open config file {}", config_file))?,
)?;
json5::from_str(&json_contents)?
};
set_open_file_limit(global_cfg.max_open_files);

let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");
{
// Note: If the default changes make sure you update the documentation in
// `config/cas_server.rs`.
const DEFAULT_MAX_OPEN_FILES: usize = 512;
let global_cfg = if let Some(mut global_cfg) = cfg.global {
if global_cfg.max_open_files == 0 {
global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
}
global_cfg
} else {
GlobalConfig {
max_open_files: DEFAULT_MAX_OPEN_FILES,
}
};
set_open_file_limit(global_cfg.max_open_files);
}

let store_manager = Arc::new(StoreManager::new());
for (name, store_cfg) in cfg.stores {
let store_metrics = root_store_metrics.sub_registry_with_prefix(&name);
store_manager.add_store(
&name,
store_factory(&store_cfg, &store_manager, store_metrics)
.await
.err_tip(|| format!("Failed to create store '{}'", name))?,
);
{
let root_store_metrics = root_metrics_registry.sub_registry_with_prefix("stores");
for (name, store_cfg) in cfg.stores {
let store_metrics = root_store_metrics.sub_registry_with_prefix(&name);
store_manager.add_store(
&name,
store_factory(&store_cfg, &store_manager, store_metrics)
.await
.err_tip(|| format!("Failed to create store '{}'", name))?,
);
}
}

let mut action_schedulers = HashMap::new();
Expand All @@ -131,31 +138,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

let mut futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
for worker_cfg in cfg.workers.unwrap_or(vec![]) {
let spawn_fut = match worker_cfg {
WorkerConfig::local(local_worker_cfg) => {
let fast_slow_store = store_manager
.get_store(&local_worker_cfg.cas_fast_slow_store)
.err_tip(|| {
{
let worker_cfgs = cfg.workers.unwrap_or(vec![]);
let root_worker_metrics = root_metrics_registry.sub_registry_with_prefix("workers");
let mut worker_names = HashSet::with_capacity(worker_cfgs.len());
for (i, worker_cfg) in worker_cfgs.into_iter().enumerate() {
let spawn_fut = match worker_cfg {
WorkerConfig::local(local_worker_cfg) => {
let fast_slow_store =
store_manager
.get_store(&local_worker_cfg.cas_fast_slow_store)
.err_tip(|| {
format!(
"Failed to find store for cas_store_ref in worker config : {}",
local_worker_cfg.cas_fast_slow_store
)
})?;
let ac_store = store_manager.get_store(&local_worker_cfg.ac_store).err_tip(|| {
format!(
"Failed to find store for cas_store_ref in worker config : {}",
local_worker_cfg.cas_fast_slow_store
"Failed to find store for ac_store_ref in worker config : {}",
local_worker_cfg.ac_store
)
})?;
let ac_store = store_manager.get_store(&local_worker_cfg.ac_store).err_tip(|| {
format!(
"Failed to find store for ac_store_ref in worker config : {}",
local_worker_cfg.ac_store
)
})?;
let local_worker =
new_local_worker(Arc::new(local_worker_cfg), fast_slow_store.clone(), ac_store.clone())
.await
.err_tip(|| "Could not make LocalWorker")?;
tokio::spawn(local_worker.run())
}
};
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
let local_worker =
new_local_worker(Arc::new(local_worker_cfg), fast_slow_store.clone(), ac_store.clone())
.await
.err_tip(|| "Could not make LocalWorker")?;
let name = if local_worker.name().is_empty() {
format!("worker_{}", i)
} else {
local_worker.name().clone()
};
if worker_names.contains(&name) {
Err(Box::new(make_err!(
Code::InvalidArgument,
"Duplicate worker name '{}' found in config",
name
)))?;
}
let worker_metrics = root_worker_metrics.sub_registry_with_prefix(&name);
local_worker.register_metrics(worker_metrics);
worker_names.insert(name);
tokio::spawn(local_worker.run())
}
};
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
}
}

fn into_encoding(from: &CompressionAlgorithm) -> Option<CompressionEncoding> {
Expand Down Expand Up @@ -354,6 +382,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
prometheus_client::encoding::text::encode(&mut buf, &root_metrics_registry)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
// This is a hack to get around this bug: https://github.com/prometheus/client_rust/issues/155
buf = buf.replace("turbo_cache_turbo_cache_stores_", "");
buf = buf.replace("turbo_cache_turbo_cache_workers_", "");
let body = Body::from(buf);
Response::builder()
.header(
Expand Down
4 changes: 2 additions & 2 deletions cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,12 @@ impl<Fe: FileEntry> StoreTrait for FilesystemStore<Fe> {
impl<Fe: FileEntry> MetricsComponent for FilesystemStore<Fe> {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"read_buff_size",
"read_buff_size_bytes",
&self.read_buffer_size,
"Size of the configured read buffer size",
);
c.publish(
"active_drop_spawns",
"active_drop_spawns_total",
&self.shared_context.active_drop_spawns,
"Number of active drop spawns",
);
Expand Down
8 changes: 4 additions & 4 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,22 @@ impl StoreTrait for VerifyStore {
impl MetricsComponent for VerifyStore {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"verify_size",
"verify_size_enabled",
&self.verify_size,
"If the verification store is verifying the size of the data",
);
c.publish(
"verify_hash",
"verify_hash_enabled",
&self.verify_hash,
"If the verification store is verifying the hash of the data",
);
c.publish(
"size_verification_failures",
"size_verification_failures_total",
&self.size_verification_failures,
"Number of failures the verification store had due to size mismatches",
);
c.publish(
"hash_verification_failures",
"hash_verification_failures_total",
&self.hash_verification_failures,
"Number of failures the verification store had due to hash mismatches",
);
Expand Down
3 changes: 3 additions & 0 deletions cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:async-lock",
"@crate_index//:futures",
"@crate_index//:tokio",
"@crate_index//:tokio-stream",
Expand All @@ -37,6 +39,7 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:bytes",
"@crate_index//:filetime",
"@crate_index//:futures",
Expand Down
Loading

0 comments on commit f0a526b

Please sign in to comment.