Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prover): Autoscaler sends scale request to appropriate agents. #3150

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions prover/crates/bin/prover_autoscaler/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ async fn get_cluster(State(app): State<App>) -> Result<Json<Cluster>, AppError>
Ok(Json(cluster))
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct ScaleDeploymentRequest {
pub namespace: String,
pub name: String,
pub size: i32,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct ScaleRequest {
pub deployments: Vec<ScaleDeploymentRequest>,
}
Expand Down
2 changes: 2 additions & 0 deletions prover/crates/bin/prover_autoscaler/src/cluster_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct Cluster {
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct Clusters {
pub clusters: HashMap<String, Cluster>,
/// Map from cluster to index in agent URLs Vec.
pub agent_ids: HashMap<String, usize>,
}

#[derive(Default, Debug, EnumString, Display, Hash, PartialEq, Eq, Clone, Copy)]
Expand Down
96 changes: 80 additions & 16 deletions prover/crates/bin/prover_autoscaler/src/global/scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use zksync_config::configs::prover_autoscaler::{Gpu, ProverAutoscalerScalerConfi

use super::{queuer, watcher};
use crate::{
agent::{ScaleDeploymentRequest, ScaleRequest},
cluster_types::{Cluster, Clusters, Pod, PodStatus},
metrics::AUTOSCALER_METRICS,
task_wiring::Task,
Expand Down Expand Up @@ -48,6 +49,16 @@ static PROVER_DEPLOYMENT_RE: Lazy<Regex> =
static PROVER_POD_RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^circuit-prover-gpu(-(?<gpu>[ltvpa]\d+))?").unwrap());

/// gpu_to_prover converts Gpu type to corresponding deployment name.
fn gpu_to_prover(gpu: Gpu) -> String {
let s = "circuit-prover-gpu";
yorik marked this conversation as resolved.
Show resolved Hide resolved
match gpu {
Gpu::Unknown => "".into(),
Gpu::L4 => s.into(),
_ => format!("{}-{}", s, gpu.to_string().to_lowercase()),
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub struct Scaler {
/// namespace to Protocol Version configuration.
namespaces: HashMap<String, String>,
Expand Down Expand Up @@ -299,6 +310,47 @@ impl Scaler {
}
}

fn diff(
namespace: &str,
provers: HashMap<GPUPoolKey, u32>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
provers
.into_iter()
.for_each(|(GPUPoolKey { cluster, gpu }, n)| {
let prover = gpu_to_prover(gpu);
clusters
.clusters
.get(&cluster)
.and_then(|c| c.namespaces.get(namespace))
.and_then(|ns| ns.deployments.get(&prover))
.map_or_else(
|| {
tracing::error!(
"Wasn't able to find deployment {} in cluster {}, namespace {}",
prover,
cluster,
namespace
)
},
|d| {
if d.desired != n as i32 {
requests
.entry(cluster.clone())
.or_default()
.deployments
.push(ScaleDeploymentRequest {
namespace: namespace.into(),
name: prover.clone(),
size: n as i32,
});
}
},
);
})
}

/// is_namespace_running returns true if there are some pods running in it.
fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool {
clusters
Expand All @@ -309,7 +361,7 @@ fn is_namespace_running(namespace: &str, clusters: &Clusters) -> bool {
.flat_map(|v| v.deployments.values())
.map(
|d| d.running + d.desired, // If there is something running or expected to run, we
// should consider the namespace.
// should re-evaluate the namespace.
)
.sum::<i32>()
> 0
Expand All @@ -320,24 +372,32 @@ impl Task for Scaler {
async fn invoke(&self) -> anyhow::Result<()> {
let queue = self.queuer.get_queue().await.unwrap();

let guard = self.watcher.data.lock().await;
if let Err(err) = watcher::check_is_ready(&guard.is_ready) {
AUTOSCALER_METRICS.clusters_not_ready.inc();
tracing::warn!("Skipping Scaler run: {}", err);
return Ok(());
}
let mut scale_requests: HashMap<String, ScaleRequest> = HashMap::new();
{
let guard = self.watcher.data.lock().await; // Keeping the lock during all calls of run() for
// consitency.
if let Err(err) = watcher::check_is_ready(&guard.is_ready) {
AUTOSCALER_METRICS.clusters_not_ready.inc();
tracing::warn!("Skipping Scaler run: {}", err);
return Ok(());
}

for (ns, ppv) in &self.namespaces {
let q = queue.queue.get(ppv).cloned().unwrap_or(0);
tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}");
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let provers = self.run(ns, q, &guard.clusters);
for (k, num) in &provers {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
for (ns, ppv) in &self.namespaces {
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
let q = queue.queue.get(ppv).cloned().unwrap_or(0);
tracing::debug!("Running eval for namespace {ns} and PPV {ppv} found queue {q}");
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let provers = self.run(ns, q, &guard.clusters);
for (k, num) in &provers {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
}
diff(ns, provers, &guard.clusters, &mut scale_requests);
}
// TODO: compare before and desired, send commands [cluster,namespace,deployment] -> provers
}
} // Unlock self.watcher.data.

if let Err(err) = self.watcher.send_scale(scale_requests).await {
tracing::error!("Failed scale request: {}", err);
}

Ok(())
Expand Down Expand Up @@ -401,6 +461,7 @@ mod tests {
},
)]
.into(),
..Default::default()
},
),
[(
Expand Down Expand Up @@ -467,6 +528,7 @@ mod tests {
)
]
.into(),
..Default::default()
},
),
[
Expand Down Expand Up @@ -552,6 +614,7 @@ mod tests {
)
]
.into(),
..Default::default()
},
),
[
Expand Down Expand Up @@ -662,6 +725,7 @@ mod tests {
)
]
.into(),
..Default::default()
},
),
[
Expand Down
97 changes: 93 additions & 4 deletions prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Context, Ok, Result};
use futures::future;
use reqwest::Method;
use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_TYPE},
Method,
};

use tokio::sync::Mutex;
use url::Url;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::{
agent::{ScaleRequest, ScaleResponse},
cluster_types::{Cluster, Clusters},
metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE},
task_wiring::Task,
Expand Down Expand Up @@ -51,13 +56,96 @@ impl Watcher {
})
.collect(),
data: Arc::new(Mutex::new(WatchedData {
clusters: Clusters {
clusters: HashMap::new(),
},
clusters: Clusters::default(),
is_ready: vec![false; size],
})),
}
}

pub async fn send_scale(&self, requests: HashMap<String, ScaleRequest>) -> anyhow::Result<()> {
let id_requests: HashMap<usize, ScaleRequest>;
{
// Convert cluster names into ids. Holding the data lock.
let guard = self.data.lock().await;
id_requests = requests
.into_iter()
.filter_map(|(cluster, scale_request)| {
guard.clusters.agent_ids.get(&cluster).map_or_else(
|| {
tracing::error!("Failed to find id for cluster {}", cluster);
None
},
|id| Some((*id, scale_request)),
)
})
.collect();
}

let handles: Vec<_> = id_requests
.into_iter()
.map(|(id, sr)| {
let url: String = self.cluster_agents[id]
.clone()
.join("/scale")
.unwrap()
.to_string();
tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr);
tokio::spawn(async move {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
let response = send_request_with_retries(
&url,
MAX_RETRIES,
Method::POST,
Some(headers),
Some(serde_json::to_vec(&sr)?),
)
.await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?;
AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc();
let response = response
.json::<ScaleResponse>()
.await
.context("Failed to read response as json");
Ok((id, response))
})
})
.collect();

future::try_join_all(
future::join_all(handles)
.await
.into_iter()
.map(|h| async move {
let (id, res) = h??;

let errors: Vec<_> = res
.expect("failed to do request to Agent")
.scale_result
.iter()
.filter_map(|e| {
if !e.is_empty() {
Some(format!("Agent {} failed to scale: {}", id, e))
} else {
None
}
})
.collect();

if !errors.is_empty() {
return Err(anyhow!(errors.join(";")));
}
Ok(())
})
.collect::<Vec<_>>(),
)
.await?;

Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -102,6 +190,7 @@ impl Task for Watcher {
let (i, res) = h??;
let c = res?;
let mut guard = self.data.lock().await;
guard.clusters.agent_ids.insert(c.name.clone(), i);
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
guard.clusters.clusters.insert(c.name.clone(), c);
guard.is_ready[i] = true;
Ok(())
Expand Down
Loading