From 6842ea460a03401006d1f2f29427c841779b1953 Mon Sep 17 00:00:00 2001 From: Yuta Yamaguchi Date: Fri, 8 Sep 2023 06:28:09 +0900 Subject: [PATCH 1/2] fix kube-runtime comment typo (#1290) Signed-off-by: ymgyt --- kube-runtime/src/controller/runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 56fc504f6..bf4d9ef9d 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -123,7 +123,7 @@ where }; }; - // Try to take take a new message that isn't already being processed + // Try to take a new message that isn't already being processed // leave the already-processing ones in the queue, so that we can take them once // we're free again. let next_msg_poll = scheduler From 259ed963eebebcac2a0ec24675f951c77bddee8a Mon Sep 17 00:00:00 2001 From: Eirik A Date: Fri, 8 Sep 2023 00:08:20 +0100 Subject: [PATCH 2/2] Revamp event and log examples to allow more parameters (#1287) Revamp event and log `examples` to allow more kubectl like parameters We already have `clap` in the dependency tree so thought it would be useful to show how to do some more things in the kubectl style. - `event_watcher` now has an optional flag to limit to one object (ala `kubectl events --for`) - `log_stream` now supports the basic flags that `kubectl logs` do - `kubectl` helpers for formatting duration are aligned across examples Signed-off-by: clux --- examples/README.md | 10 +++++- examples/event_watcher.rs | 70 +++++++++++++++++++++++++-------------- examples/kubectl.rs | 15 +++------ examples/log_stream.rs | 45 ++++++++++++++++++------- 4 files changed, 93 insertions(+), 47 deletions(-) diff --git a/examples/README.md b/examples/README.md index 9a778cc6a..3387c1670 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,7 +20,6 @@ For a basic overview of how to use the `Api` try: ```sh cargo run --example job_api -cargo run --example log_stream cargo run --example pod_api cargo run --example dynamic_api cargo run --example dynamic_jsonpath @@ -42,6 +41,15 @@ cargo run --example kubectl -- apply -f configmapgen_controller_crd.yaml Supported flags are `-lLABELSELECTOR`, `-nNAMESPACE`, `--all`, and `-oyaml`. +There are also two other examples that serve as simplistic analogues of `kubectl logs` and `kubectl events`: + +```sh +# tail logs +cargo run --example log_stream -- prometheus-promstack-kube-prometheus-prometheus-0 -c prometheus -f --since=3600 +# get events for an object +cargo run --example event_watcher -- --for=Pod/prometheus-promstack-kube-prometheus-prometheus-0 +``` + ## kube admission controller example Admission controllers are a bit of a special beast. They don't actually need `kube_client` (unless you need to verify something with the api-server) or `kube_runtime` (unless you also build a complementing reconciler) because, by themselves, they simply get changes sent to them over `https`. You will need a webserver, certificates, and either your controller deployed behind a `Service`, or as we do here: running locally with a private ip that your `k3d` cluster can reach. diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index 20c7bca2f..dcdd64b55 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -1,42 +1,64 @@ use futures::{pin_mut, TryStreamExt}; -use k8s_openapi::api::{core::v1::ObjectReference, events::v1::Event}; +use k8s_openapi::{ + api::{core::v1::ObjectReference, events::v1::Event}, + apimachinery::pkg::apis::meta::v1::Time, + chrono::Utc, +}; use kube::{ - api::Api, runtime::{watcher, WatchStreamExt}, - Client, + Api, Client, ResourceExt, }; -use tracing::info; + +/// limited variant of `kubectl events` that works on current context's namespace +/// +/// requires a new enough cluster that apis/events.k8s.io/v1 work (kubectl uses corev1::Event) +/// for old style usage of core::v1::Event see node_watcher +#[derive(clap::Parser)] +struct App { + /// Filter by object and kind + /// + /// Using --for=Pod/blog-xxxxx + /// Note that kind name is case sensitive + #[arg(long)] + r#for: Option, +} #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let client = Client::try_default().await?; + let app: App = clap::Parser::parse(); - let events: Api = Api::all(client); - let ew = watcher(events, watcher::Config::default()).applied_objects(); + let events: Api = Api::default_namespaced(client); + let mut conf = watcher::Config::default(); + if let Some(forval) = app.r#for { + if let Some((kind, name)) = forval.split_once('/') { + conf = conf.fields(&format!("regarding.kind={kind},regarding.name={name}")); + } + } + let event_stream = watcher(events, conf).default_backoff().applied_objects(); + pin_mut!(event_stream); - pin_mut!(ew); - while let Some(event) = ew.try_next().await? { - handle_event(event)?; + println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE"); + while let Some(ev) = event_stream.try_next().await? { + let age = ev.creation_timestamp().map(format_creation).unwrap_or_default(); + let reason = ev.reason.unwrap_or_default(); + let obj = ev.regarding.map(format_objref).flatten().unwrap_or_default(); + let note = ev.note.unwrap_or_default(); + println!("{0:<6} {1:<15} {2:<55} {3}", age, reason, obj, note); } Ok(()) } -// This function lets the app handle an added/modified event from k8s -fn handle_event(ev: Event) -> anyhow::Result<()> { - info!( - "{}: {} ({})", - ev.regarding.map(fmt_obj_ref).unwrap_or_default(), - ev.reason.unwrap_or_default(), - ev.note.unwrap_or_default(), - ); - Ok(()) +fn format_objref(oref: ObjectReference) -> Option { + Some(format!("{}/{}", oref.kind?, oref.name?)) } -fn fmt_obj_ref(oref: ObjectReference) -> String { - format!( - "{}/{}", - oref.kind.unwrap_or_default(), - oref.name.unwrap_or_default() - ) +fn format_creation(time: Time) -> String { + let dur = Utc::now().signed_duration_since(time.0); + match (dur.num_days(), dur.num_hours(), dur.num_minutes()) { + (days, _, _) if days > 0 => format!("{days}d"), + (_, hours, _) if hours > 0 => format!("{hours}h"), + (_, _, mins) => format!("{mins}m"), + } } diff --git a/examples/kubectl.rs b/examples/kubectl.rs index 3e69d7bb6..2dcb3aa60 100644 --- a/examples/kubectl.rs +++ b/examples/kubectl.rs @@ -3,10 +3,7 @@ //! with labels and namespace selectors supported. use anyhow::{bail, Context, Result}; use futures::{StreamExt, TryStreamExt}; -use k8s_openapi::{ - apimachinery::pkg::apis::meta::v1::Time, - chrono::{Duration, Utc}, -}; +use k8s_openapi::{apimachinery::pkg::apis::meta::v1::Time, chrono::Utc}; use kube::{ api::{Api, DynamicObject, ListParams, Patch, PatchParams, ResourceExt}, core::GroupVersionKind, @@ -103,7 +100,7 @@ impl App { let max_name = result.iter().map(|x| x.name_any().len() + 2).max().unwrap_or(63); println!("{0:) -> String { - format_duration(Utc::now().signed_duration_since(time.unwrap().0)) -} -fn format_duration(dur: Duration) -> String { +fn format_creation(time: Time) -> String { + let dur = Utc::now().signed_duration_since(time.0); match (dur.num_days(), dur.num_hours(), dur.num_minutes()) { (days, _, _) if days > 0 => format!("{days}d"), (_, hours, _) if hours > 0 => format!("{hours}h"), diff --git a/examples/log_stream.rs b/examples/log_stream.rs index a708f9585..601ab03ba 100644 --- a/examples/log_stream.rs +++ b/examples/log_stream.rs @@ -1,35 +1,56 @@ -use anyhow::{anyhow, Result}; use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::api::core::v1::Pod; use kube::{ api::{Api, LogParams}, Client, }; -use std::env; use tracing::*; +/// limited variant of kubectl logs +#[derive(clap::Parser)] +struct App { + #[arg(long, short = 'c')] + container: Option, + + #[arg(long, short = 't')] + tail: Option, + + #[arg(long, short = 'f')] + follow: bool, + + /// Since seconds + #[arg(long, short = 's')] + since: Option, + + /// Include timestamps in the log output + #[arg(long, default_value = "false")] + timestamps: bool, + + pod: String, +} + #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); + let app: App = clap::Parser::parse(); let client = Client::try_default().await?; - let mypod = env::args() - .nth(1) - .ok_or_else(|| anyhow!("Usage: log_follow "))?; - info!("Fetching logs for {:?}", mypod); - + info!("Fetching logs for {:?}", app.pod); let pods: Api = Api::default_namespaced(client); let mut logs = pods - .log_stream(&mypod, &LogParams { - follow: true, - tail_lines: Some(1), + .log_stream(&app.pod, &LogParams { + follow: app.follow, + container: app.container, + tail_lines: app.tail, + since_seconds: app.since, + timestamps: app.timestamps, ..LogParams::default() }) .await? .lines(); while let Some(line) = logs.try_next().await? { - info!("{}", line); + println!("{}", line); } Ok(()) }