diff --git a/prover/Cargo.lock b/prover/Cargo.lock index dbc3b3425e49..747d3df987e9 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -326,7 +326,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-util", "itoa", "matchit", @@ -341,7 +341,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -1605,6 +1605,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" +dependencies = [ + "enum-ordinalize", + "proc-macro2 1.0.85", + "quote 1.0.36", + "syn 2.0.66", +] + [[package]] name = "either" version = "1.12.0" @@ -1678,6 +1690,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2 1.0.85", + "quote 1.0.36", + "syn 2.0.66", +] + [[package]] name = "enum_dispatch" version = "0.3.13" @@ -2610,9 +2642,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.3.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2639,7 +2671,7 @@ dependencies = [ "futures-util", "headers", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-rustls", "hyper-util", "pin-project-lite", @@ -2657,7 +2689,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-util", "log", "rustls", @@ -2674,7 +2706,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.3.1", + "hyper 1.5.0", "hyper-util", "pin-project-lite", "tokio", @@ -2702,7 +2734,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -2712,20 +2744,19 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.5" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", - "tower", "tower-service", "tracing", ] @@ -3075,7 +3106,7 @@ dependencies = [ "async-trait", "base64 0.22.1", "http-body 1.0.0", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-rustls", "hyper-util", "jsonrpsee-core", @@ -3086,7 +3117,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -3206,9 +3237,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.95.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa21063c854820a77c5d7f8deeb7ffa55246d8304e4bcd8cce2956752c6604f8" +checksum = "efffeb3df0bd4ef3e5d65044573499c0e4889b988070b08c50b25b1329289a1f" dependencies = [ "k8s-openapi", "kube-client", @@ -3219,9 +3250,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.95.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c2355f5c9d8a11900e71a6fe1e47abd5ec45bf971eb4b162ffe97b46db9bb7" +checksum = "8bf471ece8ff8d24735ce78dac4d091e9fcb8d74811aeb6b75de4d1c3f5de0f1" dependencies = [ "base64 0.22.1", "bytes", @@ -3232,7 +3263,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-http-proxy", "hyper-rustls", "hyper-timeout", @@ -3243,23 +3274,23 @@ dependencies = [ "pem", "rustls", "rustls-pemfile 2.1.2", - "secrecy", + "secrecy 0.10.3", "serde", "serde_json", "serde_yaml", "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tower-http", "tracing", ] [[package]] name = "kube-core" -version = "0.95.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3030bd91c9db544a50247e7d48d7db9cf633c172732dce13351854526b1e666" +checksum = "f42346d30bb34d1d7adc5c549b691bce7aa3a1e60254e68fab7e2d7b26fe3d77" dependencies = [ "chrono", "form_urlencoded", @@ -3275,9 +3306,9 @@ dependencies = [ [[package]] name = "kube-derive" -version = "0.95.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa98be978eddd70a773aa8e86346075365bfb7eb48783410852dbf7cb57f0c27" +checksum = "f9364e04cc5e0482136c6ee8b7fb7551812da25802249f35b3def7aaa31e82ad" dependencies = [ "darling 0.20.10", "proc-macro2 1.0.85", @@ -3288,16 +3319,16 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.95.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5895cb8aa641ac922408f128b935652b34c2995f16ad7db0984f6caa50217914" +checksum = "d3fbf1f6ffa98e65f1d2a9a69338bb60605d46be7edf00237784b89e62c9bd44" dependencies = [ "ahash 0.8.11", "async-broadcast", "async-stream", "async-trait", "backoff", - "derivative", + "educe", "futures 0.3.30", "hashbrown 0.14.5", "json-patch", @@ -4876,7 +4907,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-rustls", "hyper-tls 0.6.0", "hyper-util", @@ -5321,7 +5352,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" dependencies = [ - "serde", + "zeroize", +] + +[[package]] +name = "secrecy" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" +dependencies = [ "zeroize", ] @@ -6542,7 +6581,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.3.1", + "hyper 1.5.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -6551,7 +6590,7 @@ dependencies = [ "socket2", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -6577,18 +6616,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.0", - "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -6598,15 +6653,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -7745,7 +7800,7 @@ dependencies = [ "ethabi", "hex", "num_enum 0.7.2", - "secrecy", + "secrecy 0.8.0", "serde", "serde_json", "serde_with", @@ -7830,7 +7885,7 @@ version = "0.1.0" dependencies = [ "anyhow", "rand 0.8.5", - "secrecy", + "secrecy 0.8.0", "serde", "strum", "strum_macros", @@ -8283,7 +8338,7 @@ dependencies = [ "hex", "prost 0.12.6", "rand 0.8.5", - "secrecy", + "secrecy 0.8.0", "serde_json", "serde_yaml", "time", diff --git a/prover/Cargo.toml b/prover/Cargo.toml index af022e691c1f..31c663590eff 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -32,7 +32,7 @@ indicatif = "0.16" itertools = "0.10.5" jemallocator = "0.5" k8s-openapi = { version = "0.23.0", features = ["v1_30"] } -kube = { version = "0.95.0", features = ["runtime", "derive"] } +kube = { version = "0.96.0", features = ["runtime", "derive"] } local-ip-address = "0.5.0" log = "0.4.20" md5 = "0.7.0" diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs index b800b86f3c28..e3e4c9b4df0d 100644 --- a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -29,11 +29,18 @@ where ordered.serialize(serializer) } +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ScaleEvent { + pub name: String, + pub time: DateTime, +} + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct Namespace { #[serde(serialize_with = "ordered_map")] pub deployments: HashMap, pub pods: HashMap, + pub scale_errors: Vec, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index 884174562a10..eb4249d071fe 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -21,7 +21,7 @@ struct GPUPool { name: String, gpu: Gpu, provers: HashMap, // TODO: consider using i64 everywhere to avoid type casts. - preemtions: u64, + scale_errors: usize, max_pool_size: u32, } @@ -140,6 +140,11 @@ impl Scaler { .and_then(|inner_map| inner_map.get(&gpu)) .copied() .unwrap_or(0), + scale_errors: namespace_value + .scale_errors + .iter() + .filter(|v| v.time < Utc::now() - chrono::Duration::hours(1)) // TODO Move the duration into config. + .count(), ..Default::default() }); @@ -147,6 +152,12 @@ impl Scaler { e.provers.insert(PodStatus::Running, 0); } + let recent_scale_errors = namespace_value + .scale_errors + .iter() + .filter(|v| v.time < Utc::now() - chrono::Duration::minutes(4)) // TODO Move the duration into config. This should be at least x2 or run interval. + .count(); + for ppg in namespace_value .pods .iter() @@ -158,10 +169,12 @@ impl Scaler { ..Default::default() }); let mut status = PodStatus::from_str(&ppg.pod.status).unwrap_or_default(); - if status == PodStatus::Pending - && ppg.pod.changed < Utc::now() - self.long_pending_duration - { - status = PodStatus::LongPending; + if status == PodStatus::Pending { + if ppg.pod.changed < Utc::now() - self.long_pending_duration { + status = PodStatus::LongPending; + } else if recent_scale_errors > 0 { + status = PodStatus::NeedToMove; + } } tracing::info!( "pod {}: status: {}, real status: {}", @@ -172,7 +185,7 @@ impl Scaler { e.provers.entry(status).and_modify(|n| *n += 1).or_insert(1); } - tracing::info!("From pods {:?}", gp_map.sorted_debug()); + tracing::debug!("From pods {:?}", gp_map.sorted_debug()); gp_map.into_values().collect() } @@ -195,7 +208,7 @@ impl Scaler { a.sum_by_pod_status(PodStatus::LongPending) .cmp(&b.sum_by_pod_status(PodStatus::LongPending)), ) // Sort by long Pending pods. - .then(a.preemtions.cmp(&b.preemtions)) // Sort by preemtions in the cluster. + .then(a.scale_errors.cmp(&b.scale_errors)) // Sort by scale_errors in the cluster. .then( self.cluster_priorities .get(&a.name) @@ -455,6 +468,7 @@ mod tests { }, )] .into(), + ..Default::default() }, )] .into(), @@ -521,6 +535,7 @@ mod tests { }, )] .into(), + ..Default::default() }, )] .into(), @@ -681,6 +696,7 @@ mod tests { ) ] .into(), + ..Default::default() }, )] .into(), @@ -718,6 +734,7 @@ mod tests { ) ] .into(), + ..Default::default() }, )] .into(), diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs index f94dfc3704fb..5384db082bc7 100644 --- a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use futures::{stream, StreamExt, TryStreamExt}; use k8s_openapi::api; use kube::{ @@ -9,7 +9,7 @@ use kube::{ }; use tokio::sync::Mutex; -use crate::cluster_types::{Cluster, Deployment, Namespace, Pod}; +use crate::cluster_types::{Cluster, Deployment, Namespace, Pod, ScaleEvent}; #[derive(Clone)] pub struct Watcher { @@ -62,6 +62,15 @@ impl Watcher { .map_ok(Watched::Pod) .boxed(), ); + + let events: Api = Api::namespaced(self.client.clone(), namespace); + watchers.push( + watcher(events, watcher::Config::default()) + .default_backoff() + .applied_objects() + .map_ok(Watched::Event) + .boxed(), + ); } // select on applied events from all watchers let mut combo_stream = stream::select_all(watchers); @@ -70,61 +79,92 @@ impl Watcher { enum Watched { Deploy(api::apps::v1::Deployment), Pod(api::core::v1::Pod), + Event(api::core::v1::Event), } - while let Some(o) = combo_stream.try_next().await? { + while let Some(o) = combo_stream.next().await { match o { - Watched::Deploy(d) => { - let namespace = match d.namespace() { - Some(n) => n.to_string(), - None => continue, - }; - let mut cluster = self.cluster.lock().await; - let v = cluster.namespaces.get_mut(&namespace).unwrap(); - let dep = v - .deployments - .entry(d.name_any()) - .or_insert(Deployment::default()); - let nums = d.status.clone().unwrap_or_default(); - dep.running = nums.available_replicas.unwrap_or_default(); - dep.desired = nums.replicas.unwrap_or_default(); + Ok(o) => match o { + Watched::Deploy(d) => { + let namespace = match d.namespace() { + Some(n) => n.to_string(), + None => continue, + }; + let mut cluster = self.cluster.lock().await; + let v = cluster.namespaces.get_mut(&namespace).unwrap(); + let dep = v + .deployments + .entry(d.name_any()) + .or_insert(Deployment::default()); + let nums = d.status.clone().unwrap_or_default(); + dep.running = nums.available_replicas.unwrap_or_default(); + dep.desired = nums.replicas.unwrap_or_default(); - tracing::info!( - "Got deployment: {}, size: {}/{} un {}", - d.name_any(), - nums.available_replicas.unwrap_or_default(), - nums.replicas.unwrap_or_default(), - nums.unavailable_replicas.unwrap_or_default(), - ) - } - Watched::Pod(p) => { - let namespace = match p.namespace() { - Some(n) => n.to_string(), - None => continue, - }; - let mut cluster = self.cluster.lock().await; - let v = cluster.namespaces.get_mut(&namespace).unwrap(); - let pod = v.pods.entry(p.name_any()).or_insert(Pod::default()); - pod.owner = p - .owner_references() - .iter() - .map(|x| format!("{}/{}", x.kind.clone(), x.name.clone())) - .collect::>() - .join(":"); - // TODO: Collect replica sets to match deployments and pods. - let phase = p - .status - .clone() - .unwrap_or_default() - .phase - .unwrap_or_default(); - if phase != pod.status { - // TODO: try to get an idea how to set correct value on restart. - pod.changed = Utc::now(); + tracing::info!( + "Got deployment: {}, size: {}/{} un {}", + d.name_any(), + nums.available_replicas.unwrap_or_default(), + nums.replicas.unwrap_or_default(), + nums.unavailable_replicas.unwrap_or_default(), + ) } - pod.status = phase; + Watched::Pod(p) => { + let namespace = match p.namespace() { + Some(n) => n.to_string(), + None => continue, + }; + let mut cluster = self.cluster.lock().await; + let v = cluster.namespaces.get_mut(&namespace).unwrap(); + let pod = v.pods.entry(p.name_any()).or_insert(Pod::default()); + pod.owner = p + .owner_references() + .iter() + .map(|x| format!("{}/{}", x.kind.clone(), x.name.clone())) + .collect::>() + .join(":"); + // TODO: Collect replica sets to match deployments and pods. + let phase = p + .status + .clone() + .unwrap_or_default() + .phase + .unwrap_or_default(); + if phase != pod.status { + // TODO: try to get an idea how to set correct value on restart. + pod.changed = Utc::now(); + } + pod.status = phase; - tracing::info!("Got pod: {}", p.name_any()) - } + tracing::info!("Got pod: {}", p.name_any()) + } + Watched::Event(e) => { + let namespace: String = match e.namespace() { + Some(n) => n, + None => "".into(), + }; + let name = e.name_any(); + let reason = e.reason.unwrap_or_default(); + if reason != "FailedScaleUp" { + // Ignore all events which are not scale issues. + continue; + } + let time: DateTime = match e.last_timestamp { + Some(t) => t.0, + None => Utc::now(), + }; + tracing::debug!( + "Got event: {}/{}, message: {:?}; action: {:?}, reason: {:?}", + namespace, + name, + e.message.unwrap_or_default(), + e.action.unwrap_or_default(), + reason + ); + let mut cluster = self.cluster.lock().await; + let v = cluster.namespaces.get_mut(&namespace).unwrap(); + v.scale_errors.push(ScaleEvent { name, time }) + } + }, + Err(err) => tracing::warn!("Error during watch: {err:?}"), } }