Skip to content

Commit

Permalink
fix(prover): Create reqwest client only once (#3324)
Browse files Browse the repository at this point in the history
## What ❔
Create reqwest client only once. 

Additionally HttpClient exports metric `calls` with all the requests and
correct status codes.


<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔
Creating reqwest client is expensive because it initializes TLS, loads
certificates, etc. So it should be create only once and reused.

Create new internal mod http_client instead of patching zksync_utils
because fn `send_request_with_retries` is used only in prover_autoscaler
and outdated prover_fri, which will be removed soon.


<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.

ref ZKD-1855
  • Loading branch information
yorik authored Nov 27, 2024
1 parent 7b6e590 commit 40f8123
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 46 deletions.
24 changes: 10 additions & 14 deletions prover/crates/bin/prover_autoscaler/src/global/queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@ use std::{collections::HashMap, ops::Deref};
use anyhow::{Context, Ok};
use reqwest::Method;
use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport};
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::{
config::QueueReportFields,
metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE},
};

const MAX_RETRIES: usize = 5;
use crate::{config::QueueReportFields, http_client::HttpClient};

pub struct Queue(HashMap<(String, QueueReportFields), u64>);

Expand All @@ -23,6 +17,7 @@ impl Deref for Queue {

#[derive(Default)]
pub struct Queuer {
http_client: HttpClient,
pub prover_job_monitor_url: String,
}

Expand All @@ -40,8 +35,9 @@ fn target_to_queue(target: QueueReportFields, report: &QueueReport) -> u64 {
}

impl Queuer {
pub fn new(pjm_url: String) -> Self {
pub fn new(http_client: HttpClient, pjm_url: String) -> Self {
Self {
http_client,
prover_job_monitor_url: pjm_url,
}
}
Expand All @@ -50,13 +46,13 @@ impl Queuer {
/// list of jobs.
pub async fn get_queue(&self, jobs: &[QueueReportFields]) -> anyhow::Result<Queue> {
let url = &self.prover_job_monitor_url;
let response = send_request_with_retries(url, MAX_RETRIES, Method::GET, None, None).await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}")
})?;
let response = self
.http_client
.send_request_with_retries(url, Method::GET, None, None)
.await;
let response = response
.map_err(|err| anyhow::anyhow!("Failed fetching queue from URL: {url}: {err:?}"))?;

AUTOSCALER_METRICS.calls[&(url.clone(), response.status().as_u16())].inc();
let response = response
.json::<Vec<VersionedQueueReport>>()
.await
Expand Down
37 changes: 17 additions & 20 deletions prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ use reqwest::{
};
use tokio::sync::Mutex;
use url::Url;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::{
agent::{ScaleRequest, ScaleResponse},
cluster_types::{Cluster, Clusters},
metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE},
http_client::HttpClient,
task_wiring::Task,
};

const MAX_RETRIES: usize = 5;

#[derive(Default)]
pub struct WatchedData {
pub clusters: Clusters,
Expand All @@ -36,16 +33,18 @@ pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {

#[derive(Default, Clone)]
pub struct Watcher {
http_client: HttpClient,
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub dry_run: bool,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>, dry_run: bool) -> Self {
pub fn new(http_client: HttpClient, agent_urls: Vec<String>, dry_run: bool) -> Self {
let size = agent_urls.len();
Self {
http_client,
cluster_agents: agent_urls
.into_iter()
.map(|u| {
Expand Down Expand Up @@ -92,26 +91,25 @@ impl Watcher {
.unwrap()
.to_string();
tracing::debug!("Sending scale request to {}, data: {:?}.", url, sr);
let http_client = self.http_client.clone();
tokio::spawn(async move {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if dry_run {
tracing::info!("Dry-run mode, not sending the request.");
return Ok((id, Ok(ScaleResponse::default())));
}
let response = send_request_with_retries(
&url,
MAX_RETRIES,
Method::POST,
Some(headers),
Some(serde_json::to_vec(&sr)?),
)
.await;
let response = http_client
.send_request_with_retries(
&url,
Method::POST,
Some(headers),
Some(serde_json::to_vec(&sr)?),
)
.await;
let response = response.map_err(|err| {
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?;
AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc();
let response = response
.json::<ScaleResponse>()
.await
Expand Down Expand Up @@ -164,21 +162,20 @@ impl Task for Watcher {
.enumerate()
.map(|(i, a)| {
tracing::debug!("Getting cluster data from agent {}.", a);
let http_client = self.http_client.clone();
tokio::spawn(async move {
let url: String = a
.clone()
.join("/cluster")
.context("Failed to join URL with /cluster")?
.to_string();
let response =
send_request_with_retries(&url, MAX_RETRIES, Method::GET, None, None).await;
let response = http_client
.send_request_with_retries(&url, Method::GET, None, None)
.await;

let response = response.map_err(|err| {
// TODO: refactor send_request_with_retries to return status.
AUTOSCALER_METRICS.calls[&(url.clone(), DEFAULT_ERROR_CODE)].inc();
anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}")
})?;
AUTOSCALER_METRICS.calls[&(url, response.status().as_u16())].inc();
let response = response
.json::<Cluster>()
.await
Expand Down
94 changes: 94 additions & 0 deletions prover/crates/bin/prover_autoscaler/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use reqwest::{header::HeaderMap, Client, Error, Method, Response, StatusCode};
use tokio::time::{sleep, Duration};

use crate::metrics::AUTOSCALER_METRICS;

#[derive(Clone)]
pub struct HttpClient {
client: Client,
max_retries: usize,
}

impl Default for HttpClient {
fn default() -> Self {
Self {
client: Client::new(),
max_retries: 5,
}
}
}

#[derive(Debug)]
pub enum HttpError {
ReqwestError(Error),
RetryExhausted(String),
}

impl HttpClient {
/// Method to send HTTP request with fixed number of retires with exponential back-offs.
pub async fn send_request_with_retries(
&self,
url: &str,
method: Method,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, HttpError> {
let mut retries = 0usize;
let mut delay = Duration::from_secs(1);
loop {
let result = self
.send_request(url, method.clone(), headers.clone(), body.clone())
.await;
AUTOSCALER_METRICS.calls[&(
url.into(),
match result {
Ok(ref response) => response.status().as_u16(),
Err(ref err) => err
.status()
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
.as_u16(),
},
)]
.inc();
match result {
Ok(response) if response.status().is_success() => return Ok(response),
Ok(response) => {
tracing::error!("Received non OK http response {:?}", response.status())
}
Err(err) => tracing::error!("Error while sending http request {:?}", err),
}

if retries >= self.max_retries {
return Err(HttpError::RetryExhausted(format!(
"All {} http retires failed",
self.max_retries
)));
}
retries += 1;
sleep(delay).await;
delay = delay.checked_mul(2).unwrap_or(Duration::MAX);
}
}

async fn send_request(
&self,
url: &str,
method: Method,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, Error> {
let mut request = self.client.request(method, url);

if let Some(headers) = headers {
request = request.headers(headers);
}

if let Some(body) = body {
request = request.body(body);
}

let request = request.build()?;
let response = self.client.execute(request).await?;
Ok(response)
}
}
15 changes: 10 additions & 5 deletions prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@ use reqwest::{
Method,
};
use tokio::sync::Mutex;
use zksync_utils::http_with_retries::send_request_with_retries;

use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent};
use crate::{
cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent},
http_client::HttpClient,
};

#[derive(Clone)]
pub struct Watcher {
pub client: kube::Client,
pub cluster: Arc<Mutex<Cluster>>,
}

async fn get_cluster_name() -> anyhow::Result<String> {
async fn get_cluster_name(http_client: HttpClient) -> anyhow::Result<String> {
let mut headers = HeaderMap::new();
headers.insert("Metadata-Flavor", HeaderValue::from_static("Google"));
let url = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name";
let response = send_request_with_retries(url, 5, Method::GET, Some(headers), None).await;
let response = http_client
.send_request_with_retries(url, Method::GET, Some(headers), None)
.await;
response
.map_err(|err| anyhow::anyhow!("Failed fetching response from url: {url}: {err:?}"))?
.text()
Expand All @@ -37,6 +41,7 @@ async fn get_cluster_name() -> anyhow::Result<String> {

impl Watcher {
pub async fn new(
http_client: HttpClient,
client: kube::Client,
cluster_name: Option<String>,
namespaces: Vec<String>,
Expand All @@ -48,7 +53,7 @@ impl Watcher {

let cluster_name = match cluster_name {
Some(c) => c,
None => get_cluster_name()
None => get_cluster_name(http_client)
.await
.expect("Load cluster_name from GCP"),
};
Expand Down
1 change: 1 addition & 0 deletions prover/crates/bin/prover_autoscaler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod agent;
pub(crate) mod cluster_types;
pub mod config;
pub mod global;
pub mod http_client;
pub mod k8s;
pub(crate) mod metrics;
pub mod task_wiring;
24 changes: 19 additions & 5 deletions prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use zksync_prover_autoscaler::{
agent,
config::{config_from_yaml, ProverAutoscalerConfig},
global::{self},
http_client::HttpClient,
k8s::{Scaler, Watcher},
task_wiring::TaskRunner,
};
Expand Down Expand Up @@ -74,6 +75,8 @@ async fn main() -> anyhow::Result<()> {

let mut tasks = vec![];

let http_client = HttpClient::default();

match opt.job {
AutoscalerType::Agent => {
tracing::info!("Starting ProverAutoscaler Agent");
Expand All @@ -84,8 +87,13 @@ async fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::ring::default_provider().install_default();
let client = kube::Client::try_default().await?;

let watcher =
Watcher::new(client.clone(), opt.cluster_name, agent_config.namespaces).await;
let watcher = Watcher::new(
http_client,
client.clone(),
opt.cluster_name,
agent_config.namespaces,
)
.await;
let scaler = Scaler::new(client, agent_config.dry_run);
tasks.push(tokio::spawn(watcher.clone().run()));
tasks.push(tokio::spawn(agent::run_server(
Expand All @@ -101,9 +109,15 @@ async fn main() -> anyhow::Result<()> {
let interval = scaler_config.scaler_run_interval;
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));
let watcher =
global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run);
let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone());
let watcher = global::watcher::Watcher::new(
http_client.clone(),
scaler_config.agents.clone(),
scaler_config.dry_run,
);
let queuer = global::queuer::Queuer::new(
http_client,
scaler_config.prover_job_monitor_url.clone(),
);
let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config);
tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?);
}
Expand Down
2 changes: 0 additions & 2 deletions prover/crates/bin/prover_autoscaler/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics};

use crate::config::Gpu;

pub const DEFAULT_ERROR_CODE: u16 = 500;

#[derive(Debug, Metrics)]
#[metrics(prefix = "autoscaler")]
pub(crate) struct AutoscalerMetrics {
Expand Down

0 comments on commit 40f8123

Please sign in to comment.