Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prover): Add min_provers and dry_run features. Improve metrics and test. #3129

Merged
merged 7 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/lib/config/src/configs/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct ProverAutoscalerAgentConfig {
pub namespaces: Vec<String>,
/// Watched cluster name. Also can be set via flag.
pub cluster_name: Option<String>,
/// If dry-run enabled don't do any k8s updates, just report success.
#[serde(default = "ProverAutoscalerAgentConfig::default_dry_run")]
pub dry_run: bool,
}

#[derive(Debug, Clone, PartialEq, Deserialize, Default)]
Expand All @@ -53,6 +56,8 @@ pub struct ProverAutoscalerScalerConfig {
pub prover_speed: HashMap<Gpu, u32>,
/// Maximum number of provers which can be run per cluster/GPU.
pub max_provers: HashMap<String, HashMap<Gpu, u32>>,
/// Minimum number of provers per namespace.
pub min_provers: HashMap<String, u32>,
/// Duration after which pending pod considered long pending.
#[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")]
pub long_pending_duration: Duration,
Expand Down Expand Up @@ -99,6 +104,10 @@ impl ProverAutoscalerAgentConfig {
pub fn default_namespaces() -> Vec<String> {
vec!["prover-blue".to_string(), "prover-red".to_string()]
}

pub fn default_dry_run() -> bool {
true
}
}

impl ProverAutoscalerScalerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ message ProverAutoscalerAgentConfig {
optional uint32 http_port = 2; // required
repeated string namespaces = 3; // optional
optional string cluster_name = 4; // optional
optional bool dry_run = 5; // optional
}

message ProtocolVersion {
Expand All @@ -39,6 +40,11 @@ message MaxProver {
optional uint32 max = 2; // required
}

message MinProver {
optional string namespace = 1; // required
optional uint32 min = 2; // required
}

message ProverAutoscalerScalerConfig {
optional uint32 prometheus_port = 1; // required
optional std.Duration scaler_run_interval = 2; // optional
Expand All @@ -49,4 +55,5 @@ message ProverAutoscalerScalerConfig {
repeated ProverSpeed prover_speed = 7; // optional
optional uint32 long_pending_duration_s = 8; // optional
repeated MaxProver max_provers = 9; // optional
repeated MinProver min_provers = 10; // optional
}
32 changes: 31 additions & 1 deletion core/lib/protobuf_config/src/prover_autoscaler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use anyhow::Context as _;
use anyhow::Context;
use time::Duration;
use zksync_config::configs::{self, prover_autoscaler::Gpu};
use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt};
Expand Down Expand Up @@ -42,6 +42,7 @@ impl ProtoRepr for proto::ProverAutoscalerAgentConfig {
.context("http_port")?,
namespaces: self.namespaces.to_vec(),
cluster_name: Some("".to_string()),
dry_run: self.dry_run.unwrap_or(Self::Type::default_dry_run()),
})
}

Expand All @@ -51,6 +52,7 @@ impl ProtoRepr for proto::ProverAutoscalerAgentConfig {
http_port: Some(this.http_port.into()),
namespaces: this.namespaces.clone(),
cluster_name: this.cluster_name.clone(),
dry_run: Some(this.dry_run),
}
}
}
Expand Down Expand Up @@ -103,6 +105,13 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
}
acc
}),
min_provers: self
.min_provers
.iter()
.enumerate()
.map(|(i, e)| e.read().context(i))
.collect::<Result<_, _>>()
.context("min_provers")?,
})
}

Expand Down Expand Up @@ -137,6 +146,11 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
})
})
.collect(),
min_provers: this
.min_provers
.iter()
.map(|(k, v)| proto::MinProver::build(&(k.clone(), *v)))
.collect(),
}
}
}
Expand Down Expand Up @@ -208,3 +222,19 @@ impl ProtoRepr for proto::MaxProver {
}
}
}

impl ProtoRepr for proto::MinProver {
type Type = (String, u32);
fn read(&self) -> anyhow::Result<Self::Type> {
Ok((
required(&self.namespace).context("namespace")?.clone(),
*required(&self.min).context("min")?,
))
}
fn build(this: &Self::Type) -> Self {
Self {
namespace: Some(this.0.to_string()),
min: Some(this.1),
}
}
}
22 changes: 22 additions & 0 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tokio-util = "0.7.11"
toml_edit = "0.14.4"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-test = "0.2.5"
url = "2.5.2"
vise = "0.2.0"

Expand Down
1 change: 1 addition & 0 deletions prover/crates/bin/prover_autoscaler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing.workspace = true
url.workspace = true
vise.workspace = true
tracing-test.workspace = true
18 changes: 13 additions & 5 deletions prover/crates/bin/prover_autoscaler/src/global/queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use reqwest::Method;
use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE};

const MAX_RETRIES: usize = 5;

#[derive(Debug)]
pub struct Queue {
pub queue: HashMap<String, u64>,
Expand All @@ -24,15 +28,19 @@ impl Queuer {

pub async fn get_queue(&self) -> anyhow::Result<Queue> {
let url = &self.prover_job_monitor_url;
let response = send_request_with_retries(url, 5, Method::GET, None, None).await;
let res = response
.map_err(|err| anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}"))?
let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}")
})?;
yorik marked this conversation as resolved.
Show resolved Hide resolved

AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc();
let response = response
.json::<Vec<VersionedQueueReport>>()
.await
.context("Failed to read response as json")?;

Ok(Queue {
queue: res
queue: response
.iter()
.map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64))
.collect::<HashMap<_, _>>(),
Expand Down
Loading
Loading