Skip to content

Commit

Permalink
Add metrics to SimpleScheduler and Worker
Browse files Browse the repository at this point in the history
Adds metrics for running tasks and such to prometheus.
  • Loading branch information
allada committed Jul 28, 2023
1 parent 7563df7 commit 63f7393
Show file tree
Hide file tree
Showing 11 changed files with 883 additions and 166 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ name: CI
# Controls when the workflow will run.
on:
push:
branches: [ master ]
branches: [ main ]
pull_request:
branches: [ master ]
branches: [ main ]

jobs:
unit-test:
Expand Down
9 changes: 6 additions & 3 deletions cas/cas_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ async fn inner_main(cfg: CasConfig) -> Result<(), Box<dyn std::error::Error>> {
let mut action_schedulers = HashMap::new();
let mut worker_schedulers = HashMap::new();
if let Some(schedulers_cfg) = cfg.schedulers {
let root_scheduler_metrics = root_metrics_registry.sub_registry_with_prefix("schedulers");
for (name, scheduler_cfg) in schedulers_cfg {
let (maybe_action_scheduler, maybe_worker_scheduler) = scheduler_factory(&scheduler_cfg, &store_manager)
.await
.err_tip(|| format!("Failed to create scheduler '{}'", name))?;
let scheduler_metrics = root_scheduler_metrics.sub_registry_with_prefix(&name);
let (maybe_action_scheduler, maybe_worker_scheduler) =
scheduler_factory(&scheduler_cfg, &store_manager, scheduler_metrics)
.await
.err_tip(|| format!("Failed to create scheduler '{}'", name))?;
if let Some(action_scheduler) = maybe_action_scheduler {
action_schedulers.insert(name.clone(), action_scheduler);
}
Expand Down
5 changes: 5 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ rust_library(
":platform_property_manager",
":worker",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:tokio",
],
)
Expand Down Expand Up @@ -81,6 +82,7 @@ rust_library(
"//config",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:parking_lot",
"@crate_index//:futures",
"@crate_index//:hashbrown",
Expand Down Expand Up @@ -214,6 +216,7 @@ rust_library(
"//cas/store",
"//config",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:futures",
"@crate_index//:tokio",
],
Expand All @@ -231,6 +234,7 @@ rust_library(
":platform_property_manager",
"//proto",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:tokio",
"@crate_index//:uuid",
],
Expand All @@ -248,6 +252,7 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"//util:prometheus_utils",
"@crate_index//:prost",
"@crate_index//:prost-types",
"@crate_index//:sha2",
Expand Down
26 changes: 26 additions & 0 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sha2::{digest::Update as _, Digest as _, Sha256};
use common::{DigestInfo, HashMapExt, VecExt};
use error::{error_if, make_input_err, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use prometheus_utils::{CollectorState, MetricsComponent};
use prost::bytes::Bytes;
use proto::build::bazel::remote::execution::v2::{
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
Expand Down Expand Up @@ -644,6 +645,25 @@ impl ActionStage {
}
}

impl MetricsComponent for ActionStage {
fn gather_metrics(&self, c: &mut CollectorState) {
let (stage, maybe_exit_code) = match self {
ActionStage::Unknown => ("Unknown", None),
ActionStage::CacheCheck => ("CacheCheck", None),
ActionStage::Queued => ("Queued", None),
ActionStage::Executing => ("Executing", None),
ActionStage::Completed(action_result) => ("Completed", Some(action_result.exit_code)),
ActionStage::CompletedFromCache(proto_action_result) => {
("CompletedFromCache", Some(proto_action_result.exit_code))
}
};
c.publish("stage", &stage.to_string(), "The state of the action.");
if let Some(exit_code) = maybe_exit_code {
c.publish("exit_code", &exit_code, "The exit code of the action.");
}
}
}

impl From<&ActionStage> for execution_stage::Value {
fn from(val: &ActionStage) -> Self {
match val {
Expand Down Expand Up @@ -885,6 +905,12 @@ impl ActionState {
}
}

impl MetricsComponent for ActionState {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish("stage", &self.stage, "");
}
}

impl From<ActionState> for Operation {
fn from(val: ActionState) -> Self {
let stage = Into::<execution_stage::Value>::into(&val.stage) as i32;
Expand Down
54 changes: 47 additions & 7 deletions cas/scheduler/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -23,16 +24,34 @@ use cache_lookup_scheduler::CacheLookupScheduler;
use config::schedulers::SchedulerConfig;
use error::{Error, ResultExt};
use grpc_scheduler::GrpcScheduler;
use prometheus_utils::Registry;
use property_modifier_scheduler::PropertyModifierScheduler;
use scheduler::{ActionScheduler, WorkerScheduler};
use simple_scheduler::SimpleScheduler;
use store::StoreManager;

pub type SchedulerFactoryResults = (Option<Arc<dyn ActionScheduler>>, Option<Arc<dyn WorkerScheduler>>);

pub fn scheduler_factory<'a>(
pub async fn scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
store_manager: &'a StoreManager,
scheduler_metrics: &'a mut Registry,
) -> Result<SchedulerFactoryResults, Error> {
let mut visited_schedulers = HashSet::new();
inner_scheduler_factory(
scheduler_type_cfg,
store_manager,
scheduler_metrics,
&mut visited_schedulers,
)
.await
}

fn inner_scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
store_manager: &'a StoreManager,
scheduler_metrics: &'a mut Registry,
visited_schedulers: &'a mut HashSet<usize>,
) -> Pin<Box<dyn Future<Output = Result<SchedulerFactoryResults, Error>> + 'a>> {
Box::pin(async move {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
Expand All @@ -48,9 +67,10 @@ pub fn scheduler_factory<'a>(
let ac_store = store_manager
.get_store(&config.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager)
.await
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, scheduler_metrics, visited_schedulers)
.await
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
cas_store,
ac_store,
Expand All @@ -59,9 +79,10 @@ pub fn scheduler_factory<'a>(
(Some(cache_lookup_scheduler), worker_scheduler)
}
SchedulerConfig::property_modifier(config) => {
let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager)
.await
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, scheduler_metrics, visited_schedulers)
.await
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
config,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
Expand All @@ -72,6 +93,25 @@ pub fn scheduler_factory<'a>(

if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler) as *const () as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler) as *const () as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}

Ok(scheduler)
Expand Down
7 changes: 7 additions & 0 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::sync::watch;
use action_messages::{ActionInfo, ActionInfoHashKey, ActionStage, ActionState};
use error::Error;
use platform_property_manager::PlatformPropertyManager;
use prometheus_utils::Registry;
use worker::{Worker, WorkerId, WorkerTimestamp};

/// ActionScheduler interface is responsible for interactions between the scheduler
Expand All @@ -40,6 +41,9 @@ pub trait ActionScheduler: Sync + Send + Unpin {

/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);

/// Register the metrics for the action scheduler.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}

/// WorkerScheduler interface is responsible for interactions between the scheduler
Expand Down Expand Up @@ -78,4 +82,7 @@ pub trait WorkerScheduler: Sync + Send + Unpin {
/// Removes timed out workers from the pool. This is called periodically by an
/// external source.
async fn remove_timedout_workers(&self, now_timestamp: WorkerTimestamp) -> Result<(), Error>;

/// Register the metrics for the worker scheduler.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}
Loading

0 comments on commit 63f7393

Please sign in to comment.