From 1cf5f8278cb0ece36326e05c03c803f928ca3ccb Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 18 Oct 2024 13:18:31 +0200 Subject: [PATCH 1/5] Improve metrics and test. Switch to circuit-prover. --- prover/Cargo.lock | 22 +++ prover/Cargo.toml | 1 + .../crates/bin/prover_autoscaler/Cargo.toml | 1 + .../prover_autoscaler/src/global/queuer.rs | 14 +- .../prover_autoscaler/src/global/scaler.rs | 172 ++++++++++++++---- .../prover_autoscaler/src/global/watcher.rs | 18 +- .../bin/prover_autoscaler/src/k8s/watcher.rs | 7 +- .../bin/prover_autoscaler/src/metrics.rs | 8 +- 8 files changed, 187 insertions(+), 56 deletions(-) diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 1d584a473d96..656ea5904cca 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -6731,6 +6731,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote 1.0.36", + "syn 2.0.66", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -8330,6 +8351,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "tracing-test", "url", "vise", "zksync_config", diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 742eee649de1..8d2a46b7b9ad 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -58,6 +58,7 @@ tokio-util = "0.7.11" toml_edit = "0.14.4" tracing = "0.1" tracing-subscriber = "0.3" +tracing-test = "0.2.5" url = "2.5.2" vise = "0.2.0" diff --git a/prover/crates/bin/prover_autoscaler/Cargo.toml b/prover/crates/bin/prover_autoscaler/Cargo.toml index 9743b45593e7..fbf3ecae9098 100644 --- a/prover/crates/bin/prover_autoscaler/Cargo.toml +++ b/prover/crates/bin/prover_autoscaler/Cargo.toml @@ -43,3 +43,4 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true url.workspace = true vise.workspace = true +tracing-test.workspace = true diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 1ef5d96386b5..4288c3f71ee3 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -5,6 +5,8 @@ use reqwest::Method; use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport; use zksync_utils::http_with_retries::send_request_with_retries; +use crate::metrics::AUTOSCALER_METRICS; + #[derive(Debug)] pub struct Queue { pub queue: HashMap, @@ -24,15 +26,19 @@ impl Queuer { pub async fn get_queue(&self) -> anyhow::Result { let url = &self.prover_job_monitor_url; - let response = send_request_with_retries(url, 5, Method::GET, None, None).await; - let res = response - .map_err(|err| anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}"))? + let res = send_request_with_retries(url, 5, Method::GET, None, None).await; + let response = res.map_err(|err| { + AUTOSCALER_METRICS.calls[&(url.clone(), 500)].inc(); + anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}") + })?; + AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); + let j = response .json::>() .await .context("Failed to read response as json")?; Ok(Queue { - queue: res + queue: j .iter() .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) .collect::>(), diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index dd3f3cf1ad3a..5444744ebb9f 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -43,10 +43,14 @@ struct GPUPoolKey { gpu: Gpu, } +//static PROVER_DEPLOYMENT_RE: Lazy = +// Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?$").unwrap()); +//static PROVER_POD_RE: Lazy = +// Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?").unwrap()); static PROVER_DEPLOYMENT_RE: Lazy = - Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?$").unwrap()); + Lazy::new(|| Regex::new(r"^circuit-prover-gpu(-(?[ltvpa]\d+))?$").unwrap()); static PROVER_POD_RE: Lazy = - Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?").unwrap()); + Lazy::new(|| Regex::new(r"^circuit-prover-gpu(-(?[ltvpa]\d+))?").unwrap()); pub struct Scaler { /// namespace to Protocol Version configuration. @@ -83,6 +87,9 @@ impl Scaler { queuer: queuer::Queuer, config: ProverAutoscalerScalerConfig, ) -> Self { + config.protocol_versions.iter().for_each(|(ns, v)| { + AUTOSCALER_METRICS.prover_protocol_version[&(ns.clone(), v.clone())].set(1); + }); Self { namespaces: config.protocol_versions, watcher, @@ -255,7 +262,7 @@ impl Scaler { } } - tracing::debug!("Queue coverd with provers: {}", total); + tracing::debug!("Queue covered with provers: {}", total); // Add required provers. if (total as u64) < q { for c in &sc { @@ -306,6 +313,7 @@ impl Task for Scaler { let guard = self.watcher.data.lock().await; if let Err(err) = watcher::check_is_ready(&guard.is_ready) { + AUTOSCALER_METRICS.clusters_not_ready.inc(); tracing::warn!("Skipping Scaler run: {}", err); return Ok(()); } @@ -339,6 +347,7 @@ mod tests { global::{queuer, watcher}, }; + #[tracing_test::traced_test] #[test] fn test_run() { let watcher = watcher::Watcher { @@ -346,53 +355,142 @@ mod tests { data: Arc::new(Mutex::new(watcher::WatchedData::default())), }; let queuer = queuer::Queuer { - prover_job_monitor_url: "".to_string(), + prover_job_monitor_url: "".into(), }; let scaler = Scaler::new( watcher, queuer, ProverAutoscalerScalerConfig { - max_provers: HashMap::from([("foo".to_string(), HashMap::from([(Gpu::L4, 100)]))]), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), ..Default::default() }, ); - let got = scaler.run( - &"prover".to_string(), - 1499, - &Clusters { - clusters: HashMap::from([( - "foo".to_string(), - Cluster { - name: "foo".to_string(), - namespaces: HashMap::from([( - "prover".to_string(), - Namespace { - deployments: HashMap::from([( - "prover-gpu-fri-spec-1".to_string(), - Deployment { + assert_eq!( + scaler.run( + &"prover".into(), + 1499, + &Clusters { + clusters: [( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + ..Default::default() + }, + )] + .into(), + pods: [( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + )] + .into(), + }, + )] + .into(), + }, + )] + .into(), + }, + ), + [( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, + }, + 3, + )] + .into(), + "3 new provers" + ); + assert_eq!( + scaler.run( + &"prover".into(), + 499, + &Clusters { + clusters: [ + ( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + ..Default::default() + }, + )] + .into(), ..Default::default() }, - )]), - pods: HashMap::from([( - "prover-gpu-fri-spec-1-c47644679-x9xqp".to_string(), - Pod { - status: "Running".to_string(), - ..Default::default() + )] + .into(), + }, + ), + ( + "bar".into(), + Cluster { + name: "bar".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + running: 1, + desired: 1, + }, + )] + .into(), + pods: [( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + )] + .into(), }, - )]), + )] + .into(), }, - )]), + ) + ] + .into(), + }, + ), + [ + ( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, }, - )]), - }, + 0, + ), + ( + GPUPoolKey { + cluster: "bar".into(), + gpu: Gpu::L4, + }, + 1, + ) + ] + .into(), + "Preserve running" ); - let want = HashMap::from([( - GPUPoolKey { - cluster: "foo".to_string(), - gpu: Gpu::L4, - }, - 3, - )]); - assert_eq!(got, want); } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 01fa68c60f84..5726548b612d 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -9,6 +9,7 @@ use zksync_utils::http_with_retries::send_request_with_retries; use crate::{ cluster_types::{Cluster, Clusters}, + metrics::AUTOSCALER_METRICS, task_wiring::Task, }; @@ -73,16 +74,19 @@ impl Task for Watcher { .join("/cluster") .context("Failed to join URL with /cluster")? .to_string(); - let response = - send_request_with_retries(&url, 5, Method::GET, None, None).await; - let res = response - .map_err(|err| { - anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") - })? + let res = send_request_with_retries(&url, 5, Method::GET, None, None).await; + + let response = res.map_err(|err| { + // TODO: refactor send_request_with_retries to return status. + AUTOSCALER_METRICS.calls[&(url.clone(), 500)].inc(); + anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") + })?; + AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); + let j = response .json::() .await .context("Failed to read response as json"); - Ok((i, res)) + Ok((i, j)) }) }) .collect(); diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index 8746d17663be..f94dfc3704fb 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -9,10 +9,7 @@ use kube::{ }; use tokio::sync::Mutex; -use crate::{ - cluster_types::{Cluster, Deployment, Namespace, Pod}, - metrics::AUTOSCALER_METRICS, -}; +use crate::cluster_types::{Cluster, Deployment, Namespace, Pod}; #[derive(Clone)] pub struct Watcher { @@ -38,8 +35,6 @@ impl Watcher { pub async fn run(self) -> anyhow::Result<()> { // TODO: add actual metrics - AUTOSCALER_METRICS.protocol_version.set(1); - AUTOSCALER_METRICS.calls.inc_by(1); // TODO: watch for a list of namespaces, get: // - deployments (name, running, desired) [done] diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index 09cbaa6ba00f..a16ebeae1beb 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -4,10 +4,14 @@ use zksync_config::configs::prover_autoscaler::Gpu; #[derive(Debug, Metrics)] #[metrics(prefix = "autoscaler")] pub(crate) struct AutoscalerMetrics { - pub protocol_version: Gauge, - pub calls: Counter, + #[metrics(labels = ["target_namespace", "protocol_version"])] + pub prover_protocol_version: LabeledFamily<(String, String), Gauge, 2>, #[metrics(labels = ["target_cluster", "target_namespace", "gpu"])] pub provers: LabeledFamily<(String, String, Gpu), Gauge, 3>, + pub clusters_not_ready: Counter, + #[metrics(labels = ["target", "status"])] + pub calls: LabeledFamily<(String, u16), Counter, 2>, + // TODO: count of command send succes/fail } #[vise::register] From 05d6807b7d24bf8719aebe3cbbd3fc6eb05168d4 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 18 Oct 2024 16:25:35 +0200 Subject: [PATCH 2/5] Add min_provers config. Improved tests. --- .../config/src/configs/prover_autoscaler.rs | 2 + .../src/proto/config/prover_autoscaler.proto | 6 + .../protobuf_config/src/prover_autoscaler.rs | 28 ++ .../prover_autoscaler/src/global/scaler.rs | 250 ++++++++++++++++-- .../prover_autoscaler/src/global/watcher.rs | 2 +- 5 files changed, 260 insertions(+), 28 deletions(-) diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index 6f83f0d2d18b..bde3d090a987 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -53,6 +53,8 @@ pub struct ProverAutoscalerScalerConfig { pub prover_speed: HashMap, /// Maximum number of provers which can be run per cluster/GPU. pub max_provers: HashMap>, + /// Minimum number of provers per namespace. + pub min_provers: HashMap, /// Duration after which pending pod considered long pending. #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] pub long_pending_duration: Duration, diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index 8363b6251199..5474455b17a3 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -39,6 +39,11 @@ message MaxProver { optional uint32 max = 2; // required } +message MinProver { + optional string namespace = 1; // required + optional uint32 min = 2; // required +} + message ProverAutoscalerScalerConfig { optional uint32 prometheus_port = 1; // required optional std.Duration scaler_run_interval = 2; // optional @@ -49,4 +54,5 @@ message ProverAutoscalerScalerConfig { repeated ProverSpeed prover_speed = 7; // optional optional uint32 long_pending_duration_s = 8; // optional repeated MaxProver max_provers = 9; // optional + repeated MinProver min_provers = 10; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index e95e4003972e..00b84d76dd44 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -103,6 +103,13 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { } acc }), + min_provers: self + .min_provers + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("min_provers")?, }) } @@ -137,6 +144,11 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { }) }) .collect(), + min_provers: this + .min_provers + .iter() + .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) + .collect(), } } } @@ -208,3 +220,19 @@ impl ProtoRepr for proto::MaxProver { } } } + +impl ProtoRepr for proto::MinProver { + type Type = (String, u32); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.namespace).context("namespace")?.clone(), + *required(&self.min).context("min")?, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + namespace: Some(this.0.to_string()), + min: Some(this.1), + } + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 5444744ebb9f..d317b2949267 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -60,6 +60,7 @@ pub struct Scaler { /// Which cluster to use first. cluster_priorities: HashMap, + min_provers: HashMap, max_provers: HashMap>, prover_speed: HashMap, long_pending_duration: chrono::Duration, @@ -95,6 +96,7 @@ impl Scaler { watcher, queuer, cluster_priorities: config.cluster_priorities, + min_provers: config.min_provers, max_provers: config.max_provers, prover_speed: config.prover_speed, long_pending_duration: chrono::Duration::seconds( @@ -207,16 +209,28 @@ impl Scaler { self.speed(gpu) * n as u64 } - fn normalize_queue(&self, gpu: Gpu, q: u64) -> u64 { + fn normalize_queue(&self, gpu: Gpu, queue: u64) -> u64 { let speed = self.speed(gpu); // Divide and round up if there's any remainder. - (q + speed - 1) / speed * speed + (queue + speed - 1) / speed * speed } - fn run(&self, namespace: &String, q: u64, clusters: &Clusters) -> HashMap { + fn run( + &self, + namespace: &String, + mut queue: u64, + clusters: &Clusters, + ) -> HashMap { let sc = self.sorted_clusters(namespace, clusters); tracing::debug!("Sorted clusters for namespace {}: {:?}", namespace, &sc); + if let Some(min) = self.min_provers.get(namespace) { + let min_queue = self.provers_to_speed(Gpu::L4, *min); + if self.normalize_queue(Gpu::L4, queue) < min_queue { + queue = min_queue; + } + } + let mut total: i64 = 0; let mut provers: HashMap = HashMap::new(); for c in &sc { @@ -235,9 +249,9 @@ impl Scaler { } // Remove unneeded pods. - if (total as u64) > self.normalize_queue(Gpu::L4, q) { + if (total as u64) > self.normalize_queue(Gpu::L4, queue) { for c in sc.iter().rev() { - let mut excess_queue = total as u64 - self.normalize_queue(c.gpu, q); + let mut excess_queue = total as u64 - self.normalize_queue(c.gpu, queue); let mut excess_provers = (excess_queue / self.speed(c.gpu)) as u32; let p = provers.entry(c.to_key()).or_default(); if *p < excess_provers { @@ -264,9 +278,9 @@ impl Scaler { tracing::debug!("Queue covered with provers: {}", total); // Add required provers. - if (total as u64) < q { + if (total as u64) < queue { for c in &sc { - let mut required_queue = q - total as u64; + let mut required_queue = queue - total as u64; let mut required_provers = (self.normalize_queue(c.gpu, required_queue) / self.speed(c.gpu)) as u32; let p = provers.entry(c.to_key()).or_default(); @@ -337,10 +351,6 @@ impl Task for Scaler { #[cfg(test)] mod tests { - use std::sync::Arc; - - use tokio::sync::Mutex; - use super::*; use crate::{ cluster_types::{Deployment, Namespace, Pod}, @@ -350,26 +360,21 @@ mod tests { #[tracing_test::traced_test] #[test] fn test_run() { - let watcher = watcher::Watcher { - cluster_agents: vec![], - data: Arc::new(Mutex::new(watcher::WatchedData::default())), - }; - let queuer = queuer::Queuer { - prover_job_monitor_url: "".into(), - }; let scaler = Scaler::new( - watcher, - queuer, + watcher::Watcher::default(), + queuer::Queuer::default(), ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover-other".into(), 2)].into(), max_provers: [ ("foo".into(), [(Gpu::L4, 100)].into()), ("bar".into(), [(Gpu::L4, 100)].into()), ] .into(), - cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), ..Default::default() }, ); + assert_eq!( scaler.run( &"prover".into(), @@ -384,9 +389,7 @@ mod tests { Namespace { deployments: [( "circuit-prover-gpu".into(), - Deployment { - ..Default::default() - }, + Deployment::default(), )] .into(), pods: [( @@ -430,9 +433,7 @@ mod tests { Namespace { deployments: [( "circuit-prover-gpu".into(), - Deployment { - ..Default::default() - }, + Deployment::default(), )] .into(), ..Default::default() @@ -493,4 +494,199 @@ mod tests { "Preserve running" ); } + + #[tracing_test::traced_test] + #[test] + fn test_run_min_provers() { + let scaler = Scaler::new( + watcher::Watcher::default(), + queuer::Queuer::default(), + ProverAutoscalerScalerConfig { + cluster_priorities: [("foo".into(), 0), ("bar".into(), 10)].into(), + min_provers: [("prover".into(), 2)].into(), + max_provers: [ + ("foo".into(), [(Gpu::L4, 100)].into()), + ("bar".into(), [(Gpu::L4, 100)].into()), + ] + .into(), + ..Default::default() + }, + ); + + assert_eq!( + scaler.run( + &"prover".into(), + 10, + &Clusters { + clusters: [ + ( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment::default(), + )] + .into(), + ..Default::default() + }, + )] + .into(), + }, + ), + ( + "bar".into(), + Cluster { + name: "bar".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment::default(), + )] + .into(), + ..Default::default() + }, + )] + .into(), + }, + ) + ] + .into(), + }, + ), + [ + ( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, + }, + 2, + ), + ( + GPUPoolKey { + cluster: "bar".into(), + gpu: Gpu::L4, + }, + 0, + ) + ] + .into(), + "Min 2 provers, non running" + ); + assert_eq!( + scaler.run( + &"prover".into(), + 0, + &Clusters { + clusters: [ + ( + "foo".into(), + Cluster { + name: "foo".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + running: 3, + desired: 3, + }, + )] + .into(), + pods: [ + ( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc2".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc3".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + ) + ] + .into(), + }, + )] + .into(), + }, + ), + ( + "bar".into(), + Cluster { + name: "bar".into(), + namespaces: [( + "prover".into(), + Namespace { + deployments: [( + "circuit-prover-gpu".into(), + Deployment { + running: 2, + desired: 2, + }, + )] + .into(), + pods: [ + ( + "circuit-prover-gpu-7c5f8fc747-gmtcr".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + ), + ( + "circuit-prover-gpu-7c5f8fc747-gmtc2".into(), + Pod { + status: "Running".into(), + ..Default::default() + }, + ) + ] + .into(), + }, + )] + .into(), + }, + ) + ] + .into(), + }, + ), + [ + ( + GPUPoolKey { + cluster: "foo".into(), + gpu: Gpu::L4, + }, + 2, + ), + ( + GPUPoolKey { + cluster: "bar".into(), + gpu: Gpu::L4, + }, + 0, + ) + ] + .into(), + "Min 2 provers, 5 running" + ); + } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 5726548b612d..ee60ea4f6a75 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -28,7 +28,7 @@ pub fn check_is_ready(v: &Vec) -> Result<()> { Ok(()) } -#[derive(Clone)] +#[derive(Default, Clone)] pub struct Watcher { /// List of base URLs of all agents. pub cluster_agents: Vec>, From 5cfbeb6e63464ab67dc7cb52a2565e0c0ae77e3a Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 18 Oct 2024 16:45:05 +0200 Subject: [PATCH 3/5] Add dry_run config option for Agent. --- core/lib/config/src/configs/prover_autoscaler.rs | 7 +++++++ .../src/proto/config/prover_autoscaler.proto | 1 + core/lib/protobuf_config/src/prover_autoscaler.rs | 4 +++- .../bin/prover_autoscaler/src/k8s/scaler.rs | 15 +++++++++++++++ prover/crates/bin/prover_autoscaler/src/main.rs | 2 +- 5 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index bde3d090a987..b24a1a26651f 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -30,6 +30,9 @@ pub struct ProverAutoscalerAgentConfig { pub namespaces: Vec, /// Watched cluster name. Also can be set via flag. pub cluster_name: Option, + /// If dry-run enabled don't do any k8s updates, just report success. + #[serde(default = "ProverAutoscalerAgentConfig::default_dry_run")] + pub dry_run: bool, } #[derive(Debug, Clone, PartialEq, Deserialize, Default)] @@ -101,6 +104,10 @@ impl ProverAutoscalerAgentConfig { pub fn default_namespaces() -> Vec { vec!["prover-blue".to_string(), "prover-red".to_string()] } + + pub fn default_dry_run() -> bool { + true + } } impl ProverAutoscalerScalerConfig { diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index 5474455b17a3..9b7f201e9b77 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -17,6 +17,7 @@ message ProverAutoscalerAgentConfig { optional uint32 http_port = 2; // required repeated string namespaces = 3; // optional optional string cluster_name = 4; // optional + optional bool dry_run = 5; // optional } message ProtocolVersion { diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index 00b84d76dd44..51f1b162d4cf 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use anyhow::Context as _; +use anyhow::Context; use time::Duration; use zksync_config::configs::{self, prover_autoscaler::Gpu}; use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; @@ -42,6 +42,7 @@ impl ProtoRepr for proto::ProverAutoscalerAgentConfig { .context("http_port")?, namespaces: self.namespaces.to_vec(), cluster_name: Some("".to_string()), + dry_run: self.dry_run.unwrap_or(Self::Type::default_dry_run()), }) } @@ -51,6 +52,7 @@ impl ProtoRepr for proto::ProverAutoscalerAgentConfig { http_port: Some(this.http_port.into()), namespaces: this.namespaces.clone(), cluster_name: this.cluster_name.clone(), + dry_run: Some(this.dry_run), } } } diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs b/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs index 170b0b106507..5e6f56aacc93 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs @@ -4,9 +4,14 @@ use kube::api::{Api, Patch, PatchParams}; #[derive(Clone)] pub struct Scaler { pub client: kube::Client, + dry_run: bool, } impl Scaler { + pub fn new(client: kube::Client, dry_run: bool) -> Self { + Self { client, dry_run } + } + pub async fn scale(&self, namespace: &str, name: &str, size: i32) -> anyhow::Result<()> { let deployments: Api = Api::namespaced(self.client.clone(), namespace); @@ -18,6 +23,16 @@ impl Scaler { "replicas": size } }); + + if self.dry_run { + tracing::info!( + "Dry run of scaled deployment/{} to {} replica(s).", + name, + size + ); + return Ok(()); + } + let pp = PatchParams::default(); deployments.patch(name, &pp, &Patch::Merge(patch)).await?; tracing::info!("Scaled deployment/{} to {} replica(s).", name, size); diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index e3aec1fbd392..45e476079a55 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -95,7 +95,7 @@ async fn main() -> anyhow::Result<()> { // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces); - let scaler = Scaler { client }; + let scaler = Scaler::new(client, agent_config.dry_run); tasks.push(tokio::spawn(watcher.clone().run())); tasks.push(tokio::spawn(agent::run_server( agent_config.http_port, From bf76fea5213066c76d6ee21818de1fa080d83c27 Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 21 Oct 2024 14:26:45 +0200 Subject: [PATCH 4/5] Refactored with the comments. --- .../prover_autoscaler/src/global/queuer.rs | 17 +++++----- .../prover_autoscaler/src/global/scaler.rs | 31 +++++++------------ .../prover_autoscaler/src/global/watcher.rs | 15 +++++---- .../bin/prover_autoscaler/src/metrics.rs | 2 ++ 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 4288c3f71ee3..5dea086de028 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -5,7 +5,9 @@ use reqwest::Method; use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport; use zksync_utils::http_with_retries::send_request_with_retries; -use crate::metrics::AUTOSCALER_METRICS; +use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; + +const MAX_RETRIES: usize = 5; #[derive(Debug)] pub struct Queue { @@ -26,19 +28,20 @@ impl Queuer { pub async fn get_queue(&self) -> anyhow::Result { let url = &self.prover_job_monitor_url; - let res = send_request_with_retries(url, 5, Method::GET, None, None).await; - let response = res.map_err(|err| { - AUTOSCALER_METRICS.calls[&(url.clone(), 500)].inc(); + let response_or_err = + send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; + let response = response_or_err.map_err(|err| { + AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}") })?; + AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); - let j = response + let json_response = response .json::>() .await .context("Failed to read response as json")?; - Ok(Queue { - queue: j + queue: json_response .iter() .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) .collect::>(), diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index d317b2949267..f6dc38e1c2e8 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -43,10 +43,6 @@ struct GPUPoolKey { gpu: Gpu, } -//static PROVER_DEPLOYMENT_RE: Lazy = -// Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?$").unwrap()); -//static PROVER_POD_RE: Lazy = -// Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?").unwrap()); static PROVER_DEPLOYMENT_RE: Lazy = Lazy::new(|| Regex::new(r"^circuit-prover-gpu(-(?[ltvpa]\d+))?$").unwrap()); static PROVER_POD_RE: Lazy = @@ -88,9 +84,13 @@ impl Scaler { queuer: queuer::Queuer, config: ProverAutoscalerScalerConfig, ) -> Self { - config.protocol_versions.iter().for_each(|(ns, v)| { - AUTOSCALER_METRICS.prover_protocol_version[&(ns.clone(), v.clone())].set(1); - }); + config + .protocol_versions + .iter() + .for_each(|(namespace, version)| { + AUTOSCALER_METRICS.prover_protocol_version[&(namespace.clone(), version.clone())] + .set(1); + }); Self { namespaces: config.protocol_versions, watcher, @@ -215,21 +215,14 @@ impl Scaler { (queue + speed - 1) / speed * speed } - fn run( - &self, - namespace: &String, - mut queue: u64, - clusters: &Clusters, - ) -> HashMap { + fn run(&self, namespace: &String, queue: u64, clusters: &Clusters) -> HashMap { let sc = self.sorted_clusters(namespace, clusters); tracing::debug!("Sorted clusters for namespace {}: {:?}", namespace, &sc); - if let Some(min) = self.min_provers.get(namespace) { - let min_queue = self.provers_to_speed(Gpu::L4, *min); - if self.normalize_queue(Gpu::L4, queue) < min_queue { - queue = min_queue; - } - } + let queue: u64 = self.min_provers.get(namespace).map_or(queue, |min| { + self.normalize_queue(Gpu::L4, queue) + .max(self.provers_to_speed(Gpu::L4, *min)) + }); let mut total: i64 = 0; let mut provers: HashMap = HashMap::new(); diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index ee60ea4f6a75..b5dbc8cbc4fb 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -9,10 +9,12 @@ use zksync_utils::http_with_retries::send_request_with_retries; use crate::{ cluster_types::{Cluster, Clusters}, - metrics::AUTOSCALER_METRICS, + metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, task_wiring::Task, }; +const MAX_RETRIES: usize = 5; + #[derive(Default)] pub struct WatchedData { pub clusters: Clusters, @@ -74,19 +76,20 @@ impl Task for Watcher { .join("/cluster") .context("Failed to join URL with /cluster")? .to_string(); - let res = send_request_with_retries(&url, 5, Method::GET, None, None).await; + let response_or_err = + send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await; - let response = res.map_err(|err| { + let response = response_or_err.map_err(|err| { // TODO: refactor send_request_with_retries to return status. - AUTOSCALER_METRICS.calls[&(url.clone(), 500)].inc(); + AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); - let j = response + let json_response = response .json::() .await .context("Failed to read response as json"); - Ok((i, j)) + Ok((i, json_response)) }) }) .collect(); diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index a16ebeae1beb..d94ac8b97e97 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -1,6 +1,8 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics}; use zksync_config::configs::prover_autoscaler::Gpu; +pub const DEFAULT_ERROR_CODE: u16 = 500; + #[derive(Debug, Metrics)] #[metrics(prefix = "autoscaler")] pub(crate) struct AutoscalerMetrics { From 9234188914a31e84d7b1e510a6d445b169ae152c Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Mon, 21 Oct 2024 15:19:57 +0200 Subject: [PATCH 5/5] Replace bad names with shadowing. --- prover/crates/bin/prover_autoscaler/src/global/queuer.rs | 9 ++++----- prover/crates/bin/prover_autoscaler/src/global/scaler.rs | 2 ++ .../crates/bin/prover_autoscaler/src/global/watcher.rs | 8 ++++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 5dea086de028..32610ebf3c3d 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -28,20 +28,19 @@ impl Queuer { pub async fn get_queue(&self) -> anyhow::Result { let url = &self.prover_job_monitor_url; - let response_or_err = - send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; - let response = response_or_err.map_err(|err| { + let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await; + let response = response.map_err(|err| { AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}") })?; AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc(); - let json_response = response + let response = response .json::>() .await .context("Failed to read response as json")?; Ok(Queue { - queue: json_response + queue: response .iter() .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) .collect::>(), diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index f6dc38e1c2e8..f10902f5dd2f 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -219,6 +219,8 @@ impl Scaler { let sc = self.sorted_clusters(namespace, clusters); tracing::debug!("Sorted clusters for namespace {}: {:?}", namespace, &sc); + // Increase queue size, if it's too small, to make sure that required min_provers are + // running. let queue: u64 = self.min_provers.get(namespace).map_or(queue, |min| { self.normalize_queue(Gpu::L4, queue) .max(self.provers_to_speed(Gpu::L4, *min)) diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index b5dbc8cbc4fb..646b320e12df 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -76,20 +76,20 @@ impl Task for Watcher { .join("/cluster") .context("Failed to join URL with /cluster")? .to_string(); - let response_or_err = + let response = send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await; - let response = response_or_err.map_err(|err| { + let response = response.map_err(|err| { // TODO: refactor send_request_with_retries to return status. AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc(); anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") })?; AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc(); - let json_response = response + let response = response .json::() .await .context("Failed to read response as json"); - Ok((i, json_response)) + Ok((i, response)) }) }) .collect();