diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs index d345b53e6f31..ab6b8fdf202f 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -63,6 +63,9 @@ pub struct ProverAutoscalerScalerConfig { pub long_pending_duration: Duration, /// List of simple autoscaler targets. pub scaler_targets: Vec, + /// If dry-run enabled don't send any scale requests. + #[serde(default)] + pub dry_run: bool, } #[derive( @@ -122,7 +125,7 @@ pub enum QueueReportFields { #[derive(Debug, Clone, PartialEq, Deserialize, Default)] pub struct ScalerTarget { pub queue_report_field: QueueReportFields, - pub pod_name_prefix: String, + pub deployment: String, /// Max replicas per cluster. pub max_replicas: HashMap, /// The queue will be divided by the speed and rounded up to get number of replicas. diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto index 0f723e22a93f..742181653861 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -52,9 +52,10 @@ message MaxReplica { message ScalerTarget { optional string queue_report_field = 1; // required - optional string pod_name_prefix = 2; // required + optional string deployment = 5; // required repeated MaxReplica max_replicas = 3; // required at least one optional uint64 speed = 4; // optional + reserved 2; reserved "pod_name_prefix"; } message ProverAutoscalerScalerConfig { @@ -69,4 +70,5 @@ message ProverAutoscalerScalerConfig { repeated MaxProver max_provers = 9; // optional repeated MinProver min_provers = 10; // optional repeated ScalerTarget scaler_targets = 11; // optional + optional bool dry_run = 12; // optional } diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs index c3e7c9719f13..6b67d9f620ff 100644 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -118,6 +118,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .enumerate() .map(|(i, x)| x.read().context(i).unwrap()) .collect::>(), + dry_run: self.dry_run.unwrap_or_default(), }) } @@ -158,6 +159,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig { .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) .collect(), scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(), + dry_run: Some(this.dry_run), } } } @@ -269,9 +271,7 @@ impl ProtoRepr for proto::ScalerTarget { queue_report_field: required(&self.queue_report_field) .and_then(|x| Ok((*x).parse()?)) .context("queue_report_field")?, - pod_name_prefix: required(&self.pod_name_prefix) - .context("pod_name_prefix")? - .clone(), + deployment: required(&self.deployment).context("deployment")?.clone(), max_replicas: self .max_replicas .iter() @@ -289,7 +289,7 @@ impl ProtoRepr for proto::ScalerTarget { fn build(this: &Self::Type) -> Self { Self { queue_report_field: Some(this.queue_report_field.to_string()), - pod_name_prefix: Some(this.pod_name_prefix.clone()), + deployment: Some(this.deployment.clone()), max_replicas: this .max_replicas .iter() diff --git a/prover/crates/bin/prover_autoscaler/src/agent.rs b/prover/crates/bin/prover_autoscaler/src/agent.rs index f810bc416721..030636ad6592 100644 --- a/prover/crates/bin/prover_autoscaler/src/agent.rs +++ b/prover/crates/bin/prover_autoscaler/src/agent.rs @@ -96,7 +96,7 @@ pub struct ScaleRequest { pub deployments: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Default, Serialize, Deserialize)] pub struct ScaleResponse { pub scale_result: Vec, } diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 1bdd2b251040..362fbbac0744 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -83,7 +83,7 @@ pub struct GpuScaler { pub struct SimpleScaler { queue_report_field: QueueReportFields, - pod_name_prefix: String, + deployment: String, /// Which cluster to use first. cluster_priorities: HashMap, max_replicas: HashMap, @@ -365,6 +365,47 @@ impl GpuScaler { provers } + + fn diff( + namespace: &str, + provers: HashMap, + clusters: &Clusters, + requests: &mut HashMap, + ) { + provers + .into_iter() + .for_each(|(GPUPoolKey { cluster, gpu }, replicas)| { + let prover = gpu_to_prover(gpu); + clusters + .clusters + .get(&cluster) + .and_then(|c| c.namespaces.get(namespace)) + .and_then(|ns| ns.deployments.get(&prover)) + .map_or_else( + || { + tracing::error!( + "Wasn't able to find deployment {} in cluster {}, namespace {}", + prover, + cluster, + namespace + ) + }, + |deployment| { + if deployment.desired != replicas as i32 { + requests + .entry(cluster.clone()) + .or_default() + .deployments + .push(ScaleDeploymentRequest { + namespace: namespace.into(), + name: prover.clone(), + size: replicas as i32, + }); + } + }, + ); + }) + } } #[derive(Default, Debug, PartialEq, Eq)] @@ -389,7 +430,7 @@ impl SimpleScaler { ) -> Self { Self { queue_report_field: config.queue_report_field.clone(), - pod_name_prefix: config.pod_name_prefix.clone(), + deployment: config.deployment.clone(), cluster_priorities, max_replicas: config.max_replicas.clone(), speed: config.speed, @@ -418,7 +459,7 @@ impl SimpleScaler { // Initialize pool only if we have ready deployments. pool.pods.insert(PodStatus::Running, 0); - let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap(); + let pod_re = Regex::new(&format!("^{}-", self.deployment)).unwrap(); for (_, pod) in namespace_value .pods .iter() @@ -551,47 +592,46 @@ impl SimpleScaler { pods } -} -fn diff( - namespace: &str, - provers: HashMap, - clusters: &Clusters, - requests: &mut HashMap, -) { - provers - .into_iter() - .for_each(|(GPUPoolKey { cluster, gpu }, n)| { - let prover = gpu_to_prover(gpu); + fn diff( + &self, + namespace: &str, + replicas: HashMap, + clusters: &Clusters, + requests: &mut HashMap, + ) { + let deployment_name = self.deployment.clone(); + replicas.into_iter().for_each(|(cluster, replicas)| { clusters .clusters .get(&cluster) .and_then(|c| c.namespaces.get(namespace)) - .and_then(|ns| ns.deployments.get(&prover)) + .and_then(|ns| ns.deployments.get(&deployment_name)) .map_or_else( || { tracing::error!( "Wasn't able to find deployment {} in cluster {}, namespace {}", - prover, + deployment_name, cluster, namespace ) }, - |d| { - if d.desired != n as i32 { + |deployment| { + if deployment.desired != replicas as i32 { requests .entry(cluster.clone()) .or_default() .deployments .push(ScaleDeploymentRequest { namespace: namespace.into(), - name: prover.clone(), - size: n as i32, + name: deployment_name.clone(), + size: replicas as i32, }); } }, ); }) + } } /// is_namespace_running returns true if there are some pods running in it. @@ -638,7 +678,7 @@ impl Task for Scaler { AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] .set(*num as u64); } - diff(ns, provers, &guard.clusters, &mut scale_requests); + GpuScaler::diff(ns, provers, &guard.clusters, &mut scale_requests); } // Simple Scalers. @@ -647,15 +687,15 @@ impl Task for Scaler { .get(&(ppv.to_string(), scaler.queue_report_field.clone())) .cloned() .unwrap_or(0); - tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.pod_name_prefix); + tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.deployment); if q > 0 || is_namespace_running(ns, &guard.clusters) { - let pods = scaler.run(ns, q, &guard.clusters); - for (k, num) in &pods { + let replicas = scaler.run(ns, q, &guard.clusters); + for (k, num) in &replicas { AUTOSCALER_METRICS.jobs - [&(scaler.pod_name_prefix.clone(), k.clone(), ns.clone())] + [&(scaler.deployment.clone(), k.clone(), ns.clone())] .set(*num as u64); } - // TODO: diff and add into scale_requests. + scaler.diff(ns, replicas, &guard.clusters, &mut scale_requests); } } } diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs index 6e02c0fe2fdc..95b9e32cac5b 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -38,11 +38,12 @@ pub fn check_is_ready(v: &Vec) -> Result<()> { pub struct Watcher { /// List of base URLs of all agents. pub cluster_agents: Vec>, + pub dry_run: bool, pub data: Arc>, } impl Watcher { - pub fn new(agent_urls: Vec) -> Self { + pub fn new(agent_urls: Vec, dry_run: bool) -> Self { let size = agent_urls.len(); Self { cluster_agents: agent_urls @@ -54,6 +55,7 @@ impl Watcher { ) }) .collect(), + dry_run, data: Arc::new(Mutex::new(WatchedData { clusters: Clusters::default(), is_ready: vec![false; size], @@ -80,6 +82,7 @@ impl Watcher { .collect(); } + let dry_run = self.dry_run; let handles: Vec<_> = id_requests .into_iter() .map(|(id, sr)| { @@ -92,6 +95,10 @@ impl Watcher { tokio::spawn(async move { let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + if dry_run { + tracing::info!("Dry-run mode, not sending the request."); + return Ok((id, Ok(ScaleResponse::default()))); + } let response = send_request_with_retries( &url, MAX_RETRIES, diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index 45e476079a55..ac5121dccd9c 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -110,7 +110,8 @@ async fn main() -> anyhow::Result<()> { let interval = scaler_config.scaler_run_interval.unsigned_abs(); let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); - let watcher = global::watcher::Watcher::new(scaler_config.agents.clone()); + let watcher = + global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run); let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone()); let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config); tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?);