Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(runtime): Add series implementation for event recorder #1655

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -48,18 +48,19 @@ form_urlencoded = "1.2.0"
futures = { version = "0.3.17", default-features = false }
hashbrown = "0.15.0"
home = "0.5.4"
hostname = "0.3"
http = "1.1.0"
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = "1.2.0"
hyper-util = "0.1.9"
hyper-openssl = "0.10.2"
hyper-rustls = { version = "0.27.1", default-features = false }
hyper-socks2 = { version = "0.9.0", default-features = false }
hyper-timeout = "0.5.1"
hyper-util = "0.1.9"
json-patch = "3"
jsonptr = "0.6"
jsonpath-rust = "0.7.3"
jsonptr = "0.6"
k8s-openapi = { version = "0.23.0", default-features = false }
openssl = "0.10.36"
parking_lot = "0.12.0"
@@ -74,8 +75,8 @@ schemars = "0.8.6"
secrecy = "0.10.2"
serde = "1.0.130"
serde_json = "1.0.68"
serde-value = "0.7.0"
serde_yaml = "0.9.19"
serde-value = "0.7.0"
syn = "2.0.38"
tame-oauth = "0.10.0"
tempfile = "3.1.0"
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ hashbrown.workspace = true
k8s-openapi.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true
hostname.workspace = true

[dev-dependencies]
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
387 changes: 300 additions & 87 deletions kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
//! Publishes events for objects for kubernetes >= 1.19
use std::{
collections::HashMap,
hash::{Hash, Hasher},
sync::Arc,
};

use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event as K8sEvent},
api::{
core::v1::ObjectReference,
events::v1::{Event as K8sEvent, EventSeries},
},
apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
chrono::Utc,
chrono::{Duration, Utc},
};
use kube_client::{
api::{Api, PostParams},
Client,
api::{Api, Patch, PatchParams, PostParams},
Client, ResourceExt,
};
use tokio::sync::RwLock;

const CACHE_TTL: Duration = Duration::minutes(6);

/// Minimal event type for publishing through [`Recorder::publish`].
///
@@ -64,6 +76,36 @@
Warning,
}

/// [`ObjectReference`] with Hash and Eq implementations
///
/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference
#[derive(Clone, Debug, PartialEq)]
pub struct Reference(ObjectReference);

impl Eq for Reference {}

impl Hash for Reference {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.api_version.hash(state);
self.0.kind.hash(state);
self.0.name.hash(state);
self.0.namespace.hash(state);
self.0.uid.hash(state);
}
}

/// Cache key for event deduplication
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct EventKey {
pub event_type: EventType,
pub action: String,
pub reason: String,
pub reporting_controller: String,
pub reporting_instance: Option<String>,
pub regarding: Reference,
pub related: Option<Reference>,
}

/// Information about the reporting controller.
///
/// ```
@@ -99,7 +141,8 @@
///
/// in the manifest of your controller.
///
/// NB: If no `instance` is provided, then `reporting_instance == reporting_controller` in the `Event`.
/// Note: If `instance` is not provided, the hostname is used. If the hostname is also
/// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`.
pub instance: Option<String>,
}

@@ -115,9 +158,10 @@

impl From<&str> for Reporter {
fn from(es: &str) -> Self {
let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
clux marked this conversation as resolved.
Show resolved Hide resolved
Self {
controller: es.into(),
instance: None,
instance,
}
}
}
@@ -138,22 +182,26 @@
/// instance: std::env::var("CONTROLLER_POD_NAME").ok(),
/// };
///
/// let recorder = Recorder::new(client, reporter);
///
/// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info
/// let reference = ObjectReference {
/// // [...]
/// ..Default::default()
/// };
/// // or for k8s-openapi / kube-derive types, use Resource::object_ref:
/// // let reference = myobject.object_ref();
///
/// let recorder = Recorder::new(client, reporter, reference);
/// recorder.publish(Event {
/// action: "Scheduling".into(),
/// reason: "Pulling".into(),
/// note: Some("Pulling image `nginx`".into()),
/// type_: EventType::Normal,
/// secondary: None,
/// }).await?;
/// recorder
/// .publish(
/// &Event {
/// action: "Scheduling".into(),
/// reason: "Pulling".into(),
/// note: Some("Pulling image `nginx`".into()),
/// type_: EventType::Normal,
/// secondary: None,
/// },
/// &reference,
/// ).await?;
/// # Ok(())
/// # }
/// ```
@@ -168,13 +216,13 @@
/// ```yaml
/// - apiGroups: ["events.k8s.io"]
/// resources: ["events"]
/// verbs: ["create"]
/// verbs: ["create", "patch"]
/// ```
#[derive(Clone)]
pub struct Recorder {
events: Api<K8sEvent>,
client: Client,
reporter: Reporter,
reference: ObjectReference,
cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
}

impl Recorder {
@@ -184,76 +232,142 @@
///
/// Cluster scoped objects will publish events in the "default" namespace.
#[must_use]
pub fn new(client: Client, reporter: Reporter, reference: ObjectReference) -> Self {
let default_namespace = "kube-system".to_owned(); // default does not work on k8s < 1.22
let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace));
pub fn new(client: Client, reporter: Reporter) -> Self {
let cache = Arc::default();
Self {
events,
client,
reporter,
reference,
cache,
}
}

/// Builds unique event key based on reportingController, reportingInstance, regarding, reason
/// and note
fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
EventKey {
event_type: ev.type_,
action: ev.action.clone(),
reason: ev.reason.clone(),
reporting_controller: self.reporter.controller.clone(),
reporting_instance: self.reporter.instance.clone(),
regarding: Reference(regarding.clone()),
related: ev.secondary.clone().map(Reference),
}
}

// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
// for more detail on the fields
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
let now = Utc::now();
K8sEvent {
action: Some(ev.action.clone()),
reason: Some(ev.reason.clone()),
deprecated_count: None,
deprecated_first_timestamp: None,
deprecated_last_timestamp: None,
deprecated_source: None,
event_time: Some(MicroTime(now)),
regarding: Some(reference.clone()),
note: ev.note.clone().map(Into::into),
metadata: ObjectMeta {
namespace: reference.namespace.clone(),
name: Some(format!(
"{}.{:x}",
reference.name.as_ref().unwrap_or(&self.reporter.controller),
now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp())
)),
..Default::default()
},
reporting_controller: Some(self.reporter.controller.clone()),
reporting_instance: Some(
self.reporter
.instance
.clone()
.unwrap_or_else(|| self.reporter.controller.clone()),
),
series: None,
type_: match ev.type_ {
EventType::Normal => Some("Normal".into()),
EventType::Warning => Some("Warning".into()),
},
related: ev.secondary.clone(),
}
}

/// Publish a new Kubernetes' event.
///
/// # Access control
///
/// The event object is created in the same namespace of the [`ObjectReference`]
/// you specified in [`Recorder::new`].
/// The event object is created in the same namespace of the [`ObjectReference`].
/// Make sure that your controller has `create` permissions in the required namespaces
/// for the `event` resource in the API group `events.k8s.io`.
///
/// # Errors
///
/// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes.
pub async fn publish(&self, ev: Event) -> Result<(), kube_client::Error> {
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
// for more detail on the fields
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
self.events
.create(&PostParams::default(), &K8sEvent {
action: Some(ev.action),
reason: Some(ev.reason),
deprecated_count: None,
deprecated_first_timestamp: None,
deprecated_last_timestamp: None,
deprecated_source: None,
event_time: Some(MicroTime(Utc::now())),
regarding: Some(self.reference.clone()),
note: ev.note.map(Into::into),
metadata: ObjectMeta {
namespace: self.reference.namespace.clone(),
generate_name: Some(format!("{}-", self.reporter.controller)),
..Default::default()
},
reporting_controller: Some(self.reporter.controller.clone()),
reporting_instance: Some(
self.reporter
.instance
.clone()
.unwrap_or_else(|| self.reporter.controller.clone()),
),
series: None,
type_: match ev.type_ {
EventType::Normal => Some("Normal".into()),
EventType::Warning => Some("Warning".into()),
},
related: ev.secondary,
})
.await?;
pub async fn publish(&self, ev: &Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
pando85 marked this conversation as resolved.
Show resolved Hide resolved
let now = Utc::now();

// gc past events older than now + CACHE_TTL
self.cache.write().await.retain(|_, v| {
if let Some(series) = v.series.as_ref() {
series.last_observed_time.0 + CACHE_TTL > now

Check warning on line 315 in kube-runtime/src/events.rs

Codecov / codecov/patch

kube-runtime/src/events.rs#L315

Added line #L315 was not covered by tests
} else if let Some(event_time) = v.event_time.as_ref() {
event_time.0 + CACHE_TTL > now
} else {
true

Check warning on line 319 in kube-runtime/src/events.rs

Codecov / codecov/patch

kube-runtime/src/events.rs#L319

Added line #L319 was not covered by tests
}
});

let key = self.get_event_key(ev, reference);
let event = match self.cache.read().await.get(&key) {
Some(e) => {
let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
let series = EventSeries {
count,
last_observed_time: MicroTime(now),
};
let mut event = e.clone();
event.series = Some(series);
event
}
None => self.generate_event(ev, reference),
};

let events = Api::namespaced(
self.client.clone(),
reference.namespace.as_ref().unwrap_or(&"default".to_string()),
);
if event.series.is_some() {
events
.patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
.await?;
} else {
events.create(&PostParams::default(), &event).await?;
};

{
let mut cache = self.cache.write().await;
cache.insert(key, event);
}
Ok(())
}
}

#[cfg(test)]
mod test {
use k8s_openapi::api::{
core::v1::{Event as K8sEvent, Service},
rbac::v1::ClusterRole,
};
use kube_client::{Api, Client, Resource};
use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};

use super::{Event, EventType, Recorder};
use k8s_openapi::{
api::{
core::v1::{ComponentStatus, Service},
events::v1::Event as K8sEvent,
},
apimachinery::pkg::apis::meta::v1::MicroTime,
chrono::{Duration, Utc},
};
use kube::{Api, Client, Resource};

#[tokio::test]
#[ignore = "needs cluster (creates an event for the default kubernetes service)"]
@@ -262,15 +376,18 @@

let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
let recorder = Recorder::new(client.clone(), "kube".into());
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.publish(
&Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
.await?;
let events: Api<K8sEvent> = Api::namespaced(client, "default");

@@ -279,7 +396,27 @@
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
.unwrap();
assert_eq!(found_event.message.unwrap(), "Sending kubernetes to detention");
assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");

recorder
.publish(
&Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention twice".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
.await?;

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
.unwrap();
assert!(found_event.series.is_some());

Ok(())
}
@@ -289,30 +426,106 @@
async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;

let svcs: Api<ClusterRole> = Api::all(client.clone());
let s = svcs.get("system:basic-user").await?; // always get this default ClusterRole
let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
let component_status_api: Api<ComponentStatus> = Api::all(client.clone());
let s = component_status_api.get("scheduler").await?;
let recorder = Recorder::new(client.clone(), "kube".into());
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.publish(
&Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
clux marked this conversation as resolved.
Show resolved Hide resolved
.await?;
let events: Api<K8sEvent> = Api::namespaced(client, "kube-system");
let events: Api<K8sEvent> = Api::namespaced(client, "default");

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
.unwrap();
assert_eq!(
found_event.message.unwrap(),
found_event.note.unwrap(),
"Sending kubernetes to detention without namespace"
);

recorder
.publish(
&Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace twice".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
.await?;

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
.unwrap();
assert!(found_event.series.is_some());
Ok(())
}

#[tokio::test]
#[ignore = "needs cluster (creates an event for the default kubernetes service)"]
async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;

let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
let s = svcs.get("kubernetes").await?; // always a kubernetes service in default

let reference = s.object_ref(&());
let reporter: Reporter = "kube".into();
let ev = Event {
type_: EventType::Normal,
reason: "TestCacheTtl".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
};
let key = EventKey {
event_type: ev.type_,
action: ev.action.clone(),
reason: ev.reason.clone(),
reporting_controller: reporter.controller.clone(),
regarding: Reference(reference.clone()),
reporting_instance: None,
related: None,
};

let reporter = Reporter {
controller: "kube".into(),
instance: None,
};
let recorder = Recorder::new(client.clone(), reporter);

recorder.publish(&ev, &s.object_ref(&())).await?;
let now = Utc::now();
let past = now - Duration::minutes(10);
recorder.cache.write().await.entry(key).and_modify(|e| {
e.event_time = Some(MicroTime(past));
});

recorder.publish(&ev, &s.object_ref(&())).await?;

let events: Api<K8sEvent> = Api::namespaced(client, "default");
let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
.unwrap();
assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
assert!(found_event.series.is_none());
clux marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
}