From cb506176da10340d3cf3b55a61f2f9da1b68bf20 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Sun, 1 Dec 2024 14:17:43 +0100 Subject: [PATCH] feat(runtime): Add series implementation for event recorder Signed-off-by: Alexander Gil --- Cargo.toml | 7 +- kube-runtime/Cargo.toml | 1 + kube-runtime/src/events.rs | 387 ++++++++++++++++++++++++++++--------- 3 files changed, 305 insertions(+), 90 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 14fc4e283..f82c7afdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,18 +48,19 @@ form_urlencoded = "1.2.0" futures = { version = "0.3.17", default-features = false } hashbrown = "0.15.0" home = "0.5.4" +hostname = "0.3" http = "1.1.0" http-body = "1.0.1" http-body-util = "0.1.2" hyper = "1.2.0" -hyper-util = "0.1.9" hyper-openssl = "0.10.2" hyper-rustls = { version = "0.27.1", default-features = false } hyper-socks2 = { version = "0.9.0", default-features = false } hyper-timeout = "0.5.1" +hyper-util = "0.1.9" json-patch = "3" -jsonptr = "0.6" jsonpath-rust = "0.7.3" +jsonptr = "0.6" k8s-openapi = { version = "0.23.0", default-features = false } openssl = "0.10.36" parking_lot = "0.12.0" @@ -74,8 +75,8 @@ schemars = "0.8.6" secrecy = "0.10.2" serde = "1.0.130" serde_json = "1.0.68" -serde-value = "0.7.0" serde_yaml = "0.9.19" +serde-value = "0.7.0" syn = "2.0.38" tame-oauth = "0.10.0" tempfile = "3.1.0" diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index a89492384..9006bd819 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -49,6 +49,7 @@ hashbrown.workspace = true k8s-openapi.workspace = true async-broadcast.workspace = true async-stream.workspace = true +hostname.workspace = true [dev-dependencies] kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" } diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 5e0d9dd6f..5b811977b 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -1,13 +1,25 @@ //! Publishes events for objects for kubernetes >= 1.19 +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + sync::Arc, +}; + use k8s_openapi::{ - api::{core::v1::ObjectReference, events::v1::Event as K8sEvent}, + api::{ + core::v1::ObjectReference, + events::v1::{Event as K8sEvent, EventSeries}, + }, apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta}, - chrono::Utc, + chrono::{Duration, Utc}, }; use kube_client::{ - api::{Api, PostParams}, - Client, + api::{Api, Patch, PatchParams, PostParams}, + Client, ResourceExt, }; +use tokio::sync::RwLock; + +const CACHE_TTL: Duration = Duration::minutes(6); /// Minimal event type for publishing through [`Recorder::publish`]. /// @@ -64,6 +76,36 @@ pub enum EventType { Warning, } +/// [`ObjectReference`] with Hash and Eq implementations +/// +/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference +#[derive(Clone, Debug, PartialEq)] +pub struct Reference(ObjectReference); + +impl Eq for Reference {} + +impl Hash for Reference { + fn hash(&self, state: &mut H) { + self.0.api_version.hash(state); + self.0.kind.hash(state); + self.0.name.hash(state); + self.0.namespace.hash(state); + self.0.uid.hash(state); + } +} + +/// Cache key for event deduplication +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct EventKey { + pub event_type: EventType, + pub action: String, + pub reason: String, + pub reporting_controller: String, + pub reporting_instance: Option, + pub regarding: Reference, + pub related: Option, +} + /// Information about the reporting controller. /// /// ``` @@ -99,7 +141,8 @@ pub struct Reporter { /// /// in the manifest of your controller. /// - /// NB: If no `instance` is provided, then `reporting_instance == reporting_controller` in the `Event`. + /// Note: If `instance` is not provided, the hostname is used. If the hostname is also + /// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`. pub instance: Option, } @@ -115,9 +158,10 @@ impl From for Reporter { impl From<&str> for Reporter { fn from(es: &str) -> Self { + let instance = hostname::get().ok().and_then(|h| h.into_string().ok()); Self { controller: es.into(), - instance: None, + instance, } } } @@ -138,6 +182,8 @@ impl From<&str> for Reporter { /// instance: std::env::var("CONTROLLER_POD_NAME").ok(), /// }; /// +/// let recorder = Recorder::new(client, reporter); +/// /// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info /// let reference = ObjectReference { /// // [...] @@ -145,15 +191,17 @@ impl From<&str> for Reporter { /// }; /// // or for k8s-openapi / kube-derive types, use Resource::object_ref: /// // let reference = myobject.object_ref(); -/// -/// let recorder = Recorder::new(client, reporter, reference); -/// recorder.publish(Event { -/// action: "Scheduling".into(), -/// reason: "Pulling".into(), -/// note: Some("Pulling image `nginx`".into()), -/// type_: EventType::Normal, -/// secondary: None, -/// }).await?; +/// recorder +/// .publish( +/// &Event { +/// action: "Scheduling".into(), +/// reason: "Pulling".into(), +/// note: Some("Pulling image `nginx`".into()), +/// type_: EventType::Normal, +/// secondary: None, +/// }, +/// &reference, +/// ).await?; /// # Ok(()) /// # } /// ``` @@ -168,13 +216,13 @@ impl From<&str> for Reporter { /// ```yaml /// - apiGroups: ["events.k8s.io"] /// resources: ["events"] -/// verbs: ["create"] +/// verbs: ["create", "patch"] /// ``` #[derive(Clone)] pub struct Recorder { - events: Api, + client: Client, reporter: Reporter, - reference: ObjectReference, + cache: Arc>>, } impl Recorder { @@ -184,13 +232,66 @@ impl Recorder { /// /// Cluster scoped objects will publish events in the "default" namespace. #[must_use] - pub fn new(client: Client, reporter: Reporter, reference: ObjectReference) -> Self { - let default_namespace = "kube-system".to_owned(); // default does not work on k8s < 1.22 - let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace)); + pub fn new(client: Client, reporter: Reporter) -> Self { + let cache = Arc::default(); Self { - events, + client, reporter, - reference, + cache, + } + } + + /// Builds unique event key based on reportingController, reportingInstance, regarding, reason + /// and note + fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey { + EventKey { + event_type: ev.type_, + action: ev.action.clone(), + reason: ev.reason.clone(), + reporting_controller: self.reporter.controller.clone(), + reporting_instance: self.reporter.instance.clone(), + regarding: Reference(regarding.clone()), + related: ev.secondary.clone().map(Reference), + } + } + + // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io + // for more detail on the fields + // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125 + fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent { + let now = Utc::now(); + K8sEvent { + action: Some(ev.action.clone()), + reason: Some(ev.reason.clone()), + deprecated_count: None, + deprecated_first_timestamp: None, + deprecated_last_timestamp: None, + deprecated_source: None, + event_time: Some(MicroTime(now)), + regarding: Some(reference.clone()), + note: ev.note.clone().map(Into::into), + metadata: ObjectMeta { + namespace: reference.namespace.clone(), + name: Some(format!( + "{}.{:x}", + reference.name.as_ref().unwrap_or(&self.reporter.controller), + now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp()) + )), + ..Default::default() + }, + reporting_controller: Some(self.reporter.controller.clone()), + reporting_instance: Some( + self.reporter + .instance + .clone() + .unwrap_or_else(|| self.reporter.controller.clone()), + ), + series: None, + type_: match ev.type_ { + EventType::Normal => Some("Normal".into()), + EventType::Warning => Some("Warning".into()), + }, + related: ev.secondary.clone(), } } @@ -198,62 +299,75 @@ impl Recorder { /// /// # Access control /// - /// The event object is created in the same namespace of the [`ObjectReference`] - /// you specified in [`Recorder::new`]. + /// The event object is created in the same namespace of the [`ObjectReference`]. /// Make sure that your controller has `create` permissions in the required namespaces /// for the `event` resource in the API group `events.k8s.io`. /// /// # Errors /// /// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes. - pub async fn publish(&self, ev: Event) -> Result<(), kube_client::Error> { - // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io - // for more detail on the fields - // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125 - self.events - .create(&PostParams::default(), &K8sEvent { - action: Some(ev.action), - reason: Some(ev.reason), - deprecated_count: None, - deprecated_first_timestamp: None, - deprecated_last_timestamp: None, - deprecated_source: None, - event_time: Some(MicroTime(Utc::now())), - regarding: Some(self.reference.clone()), - note: ev.note.map(Into::into), - metadata: ObjectMeta { - namespace: self.reference.namespace.clone(), - generate_name: Some(format!("{}-", self.reporter.controller)), - ..Default::default() - }, - reporting_controller: Some(self.reporter.controller.clone()), - reporting_instance: Some( - self.reporter - .instance - .clone() - .unwrap_or_else(|| self.reporter.controller.clone()), - ), - series: None, - type_: match ev.type_ { - EventType::Normal => Some("Normal".into()), - EventType::Warning => Some("Warning".into()), - }, - related: ev.secondary, - }) - .await?; + pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> { + let now = Utc::now(); + + // gc past events older than now + CACHE_TTL + self.cache.write().await.retain(|_, v| { + if let Some(series) = v.series.as_ref() { + series.last_observed_time.0 + CACHE_TTL > now + } else if let Some(event_time) = v.event_time.as_ref() { + event_time.0 + CACHE_TTL > now + } else { + true + } + }); + + let key = self.get_event_key(ev, reference); + let event = match self.cache.read().await.get(&key) { + Some(e) => { + let count = if let Some(s) = &e.series { s.count + 1 } else { 2 }; + let series = EventSeries { + count, + last_observed_time: MicroTime(now), + }; + let mut event = e.clone(); + event.series = Some(series); + event + } + None => self.generate_event(ev, reference), + }; + + let events = Api::namespaced( + self.client.clone(), + reference.namespace.as_ref().unwrap_or(&"default".to_string()), + ); + if event.series.is_some() { + events + .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event)) + .await?; + } else { + events.create(&PostParams::default(), &event).await?; + }; + + { + let mut cache = self.cache.write().await; + cache.insert(key, event); + } Ok(()) } } #[cfg(test)] mod test { - use k8s_openapi::api::{ - core::v1::{Event as K8sEvent, Service}, - rbac::v1::ClusterRole, - }; - use kube_client::{Api, Client, Resource}; + use super::{Event, EventKey, EventType, Recorder, Reference, Reporter}; - use super::{Event, EventType, Recorder}; + use k8s_openapi::{ + api::{ + core::v1::{ComponentStatus, Service}, + events::v1::Event as K8sEvent, + }, + apimachinery::pkg::apis::meta::v1::MicroTime, + chrono::{Duration, Utc}, + }; + use kube::{Api, Client, Resource}; #[tokio::test] #[ignore = "needs cluster (creates an event for the default kubernetes service)"] @@ -262,15 +376,18 @@ mod test { let svcs: Api = Api::namespaced(client.clone(), "default"); let s = svcs.get("kubernetes").await?; // always a kubernetes service in default - let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&())); + let recorder = Recorder::new(client.clone(), "kube".into()); recorder - .publish(Event { - type_: EventType::Normal, - reason: "VeryCoolService".into(), - note: Some("Sending kubernetes to detention".into()), - action: "Test event - plz ignore".into(), - secondary: None, - }) + .publish( + &Event { + type_: EventType::Normal, + reason: "VeryCoolService".into(), + note: Some("Sending kubernetes to detention".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) .await?; let events: Api = Api::namespaced(client, "default"); @@ -279,7 +396,27 @@ mod test { .into_iter() .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService"))) .unwrap(); - assert_eq!(found_event.message.unwrap(), "Sending kubernetes to detention"); + assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); + + recorder + .publish( + &Event { + type_: EventType::Normal, + reason: "VeryCoolService".into(), + note: Some("Sending kubernetes to detention twice".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) + .await?; + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService"))) + .unwrap(); + assert!(found_event.series.is_some()); Ok(()) } @@ -289,19 +426,22 @@ mod test { async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box> { let client = Client::try_default().await?; - let svcs: Api = Api::all(client.clone()); - let s = svcs.get("system:basic-user").await?; // always get this default ClusterRole - let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&())); + let component_status_api: Api = Api::all(client.clone()); + let s = component_status_api.get("scheduler").await?; + let recorder = Recorder::new(client.clone(), "kube".into()); recorder - .publish(Event { - type_: EventType::Normal, - reason: "VeryCoolServiceNoNamespace".into(), - note: Some("Sending kubernetes to detention without namespace".into()), - action: "Test event - plz ignore".into(), - secondary: None, - }) + .publish( + &Event { + type_: EventType::Normal, + reason: "VeryCoolServiceNoNamespace".into(), + note: Some("Sending kubernetes to detention without namespace".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) .await?; - let events: Api = Api::namespaced(client, "kube-system"); + let events: Api = Api::namespaced(client, "default"); let event_list = events.list(&Default::default()).await?; let found_event = event_list @@ -309,10 +449,83 @@ mod test { .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace"))) .unwrap(); assert_eq!( - found_event.message.unwrap(), + found_event.note.unwrap(), "Sending kubernetes to detention without namespace" ); + recorder + .publish( + &Event { + type_: EventType::Normal, + reason: "VeryCoolServiceNoNamespace".into(), + note: Some("Sending kubernetes to detention without namespace twice".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) + .await?; + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace"))) + .unwrap(); + assert!(found_event.series.is_some()); + Ok(()) + } + + #[tokio::test] + #[ignore = "needs cluster (creates an event for the default kubernetes service)"] + async fn event_recorder_cache_retain() -> Result<(), Box> { + let client = Client::try_default().await?; + + let svcs: Api = Api::namespaced(client.clone(), "default"); + let s = svcs.get("kubernetes").await?; // always a kubernetes service in default + + let reference = s.object_ref(&()); + let reporter: Reporter = "kube".into(); + let ev = Event { + type_: EventType::Normal, + reason: "TestCacheTtl".into(), + note: Some("Sending kubernetes to detention".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }; + let key = EventKey { + event_type: ev.type_, + action: ev.action.clone(), + reason: ev.reason.clone(), + reporting_controller: reporter.controller.clone(), + regarding: Reference(reference.clone()), + reporting_instance: None, + related: None, + }; + + let reporter = Reporter { + controller: "kube".into(), + instance: None, + }; + let recorder = Recorder::new(client.clone(), reporter); + + recorder.publish(&ev, &s.object_ref(&())).await?; + let now = Utc::now(); + let past = now - Duration::minutes(10); + recorder.cache.write().await.entry(key).and_modify(|e| { + e.event_time = Some(MicroTime(past)); + }); + + recorder.publish(&ev, &s.object_ref(&())).await?; + + let events: Api = Api::namespaced(client, "default"); + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl"))) + .unwrap(); + assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); + assert!(found_event.series.is_none()); + Ok(()) } }