From fa5381f7566a5b9195951392bcaa4c2b8cd14f98 Mon Sep 17 00:00:00 2001 From: Matei David Date: Sat, 18 Feb 2023 14:08:20 +0000 Subject: [PATCH 01/12] Introduce support for persistent metadata watches The `watch` (and `watch_metadata` respectively) functions on the Api type are fallible, and watches are not recovered. Errors may happen for any reason, such as network induced errors, restarts (etcd can only cache so many resourve versions), and so on. To get around these failures, we have a `watcher()` utility in the runtime crate that manages the underlying stream in a persistent way, recovering on failure. This change introduces support for persistent metadata watches, through a `metadata_watcher` function in the same crate. Watches may be established on any type of resources, the main difference is that the returned types no longer correspond to the type of the Api. Instead, a concrete metadata type is returned. To support this with no breaking changes and to allow for more maintable code, a few utility functions and traits are introduced in the `runtime` crate. Signed-off-by: Matei David --- kube-runtime/src/watcher.rs | 365 +++++++++++++++++++++++++++--------- 1 file changed, 275 insertions(+), 90 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 7b202f0ea..2bcca3d61 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -5,9 +5,10 @@ use crate::utils::ResetTimerBackoff; use backoff::{backoff::Backoff, ExponentialBackoff}; use derivative::Derivative; -use futures::{stream::BoxStream, Stream, StreamExt}; +use futures::{stream::BoxStream, Future, Stream, StreamExt}; use kube_client::{ api::{ListParams, Resource, ResourceExt, WatchEvent}, + core::{metadata::PartialObjectMeta, ObjectList}, error::ErrorResponse, Api, Error as ClientErr, }; @@ -132,104 +133,88 @@ enum State { }, } +/// Helper to express nested `impl` return types in factories +trait AsyncFn: + Fn(String) -> Self::Future +{ + type Future: Future>>, State)> + Send; +} + +/// Allows factories to return closures that are used to drive the watcher's state +/// machine +/// +/// Closures may take any argument and must return an (event, state) +impl AsyncFn for F +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, + F: Fn(String) -> Fut, + Fut: Future>>, State)> + Send, +{ + type Future = Fut; +} + +/// Factory that returns two closures used to drive the watcher's state machine +/// +/// Used as an indirection mechanism to call `list` and `watch` on the +/// underlying Api type. +fn make_step_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( + api: &'a Api, + list_params: &'a ListParams, +) -> (impl AsyncFn + 'a, impl AsyncFn + 'a) { + let list = move |_| async { + let list = api.list(list_params).await; + step_list(list) + }; + + let watch = move |resource_version: String| async { + let watch = api.watch(list_params, &resource_version).await; + step_watch(watch, resource_version) + }; + + (list, watch) +} + +/// Factory that returns two closures used to drive the watcher's state machine +/// +/// Used as an indirection mechanism to call `list_metadata` and +/// `watch_metadata` on the underlying Api type. Closures returned are +/// specialized for use with metadata only. +fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( + api: &'a Api, + list_params: &'a ListParams, +) -> ( + impl AsyncFn + 'a, + impl AsyncFn + 'a, +) { + let list = move |_| async { + let list = api.list_metadata(list_params).await; + step_list::(list) + }; + + let watch = move |resource_version: String| async { + let watch = api.watch_metadata(list_params, &resource_version).await; + step_watch::(watch, resource_version) + }; + + (list, watch) +} + /// Progresses the watcher a single step, returning (event, state) /// /// This function should be trampolined: if event == `None` /// then the function should be called again until it returns a Some. async fn step_trampolined( - api: &Api, - list_params: &ListParams, + step_list_fn: impl AsyncFn, + step_watch_fn: impl AsyncFn, state: State, ) -> (Option>>, State) { match state { - State::Empty => match api.list(list_params).await { - Ok(list) => { - if let Some(resource_version) = list.metadata.resource_version { - (Some(Ok(Event::Restarted(list.items))), State::InitListed { - resource_version, - }) - } else { - (Some(Err(Error::NoResourceVersion)), State::Empty) - } - } - Err(err) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch list error with 403: {err:?}"); - } else { - debug!("watch list error: {err:?}"); - } - (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) - } - }, - State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream: stream.boxed(), - }), - Err(err) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch initlist error with 403: {err:?}"); - } else { - debug!("watch initlist error: {err:?}"); - } - ( - Some(Err(err).map_err(Error::WatchStartFailed)), - State::InitListed { resource_version }, - ) - } - }, + State::Empty => step_list_fn(String::new()).await, + State::InitListed { resource_version } => step_watch_fn(resource_version).await, State::Watching { resource_version, - mut stream, - } => match stream.next().await { - Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { - let resource_version = obj.resource_version().unwrap(); - (Some(Ok(Event::Applied(obj))), State::Watching { - resource_version, - stream, - }) - } - Some(Ok(WatchEvent::Deleted(obj))) => { - let resource_version = obj.resource_version().unwrap(); - (Some(Ok(Event::Deleted(obj))), State::Watching { - resource_version, - stream, - }) - } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), - Some(Ok(WatchEvent::Error(err))) => { - // HTTP GONE, means we have desynced and need to start over and re-list :( - let new_state = if err.code == 410 { - State::Empty - } else { - State::Watching { - resource_version, - stream, - } - }; - if err.code == 403 { - warn!("watcher watchevent error 403: {err:?}"); - } else { - debug!("error watchevent error: {err:?}"); - } - (Some(Err(err).map_err(Error::WatchError)), new_state) - } - Some(Err(err)) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watcher error 403: {err:?}"); - } else { - debug!("watcher error: {err:?}"); - } - (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { - resource_version, - stream, - }) - } - None => (None, State::InitListed { resource_version }), - }, + stream, + } => step_next(resource_version, stream).await, } } @@ -240,13 +225,149 @@ async fn step( mut state: State, ) -> (Result>, State) { loop { - match step_trampolined(api, list_params, state).await { + let (list_fn, watch_fn) = make_step_api(api, list_params); + match step_trampolined(list_fn, watch_fn, state).await { (Some(result), new_state) => return (result, new_state), (None, new_state) => state = new_state, } } } +/// Trampoline helper for `step_trampolined` that returns a concrete type +async fn step_metadata( + api: &Api, + list_params: &ListParams, + mut state: State, +) -> (Result>, State) { + loop { + let (list_fn, watch_fn) = make_step_metadata_api(api, list_params); + match step_trampolined::(list_fn, watch_fn, state).await { + (Some(result), new_state) => return (result, new_state), + (None, new_state) => state = new_state, + } + } +} + +/// Helper for `step_trampolined` to process the next element in a watch stream +async fn step_next( + resource_version: String, + mut stream: BoxStream<'static, kube_client::Result>>, +) -> (Option>>, State) +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + match stream.next().await { + Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { + let resource_version = obj.resource_version().unwrap(); + (Some(Ok(Event::Applied(obj))), State::Watching { + resource_version, + stream, + }) + } + Some(Ok(WatchEvent::Deleted(obj))) => { + let resource_version = obj.resource_version().unwrap(); + (Some(Ok(Event::Deleted(obj))), State::Watching { + resource_version, + stream, + }) + } + Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }), + Some(Ok(WatchEvent::Error(err))) => { + // HTTP GONE, means we have desynced and need to start over and re-list :( + let new_state = if err.code == 410 { + State::Empty + } else { + State::Watching { + resource_version, + stream, + } + }; + if err.code == 403 { + warn!("watcher watchevent error 403: {err:?}"); + } else { + debug!("error watchevent error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchError)), new_state) + } + Some(Err(err)) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watcher error 403: {err:?}"); + } else { + debug!("watcher error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { + resource_version, + stream, + }) + } + None => (None, State::InitListed { resource_version }), + } +} + +/// Helper for `step_trampolined` to initialize the watch with a LIST +/// +/// Used by closures that are returned from factories to provide indirection and +/// specialization where needed. +fn step_list(list_result: kube_client::Result>) -> (Option>>, State) +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + match list_result { + Ok(list) => { + if let Some(resource_version) = list.metadata.resource_version { + (Some(Ok(Event::Restarted(list.items))), State::InitListed { + resource_version, + }) + } else { + (Some(Err(Error::NoResourceVersion)), State::Empty) + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) + } + } +} + +/// Helper for `step_trampolined` to itialize the watch with a LIST +/// +/// Used by closures that are returned from factories to provide indirection and +/// specialization where needed. +fn step_watch( + watch_event: kube_client::Result>> + Send + 'static>, + resource_version: String, +) -> (Option>>, State) +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + match watch_event { + Ok(stream) => (None, State::Watching { + resource_version, + stream: stream.boxed(), + }), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + ( + Some(Err(err).map_err(Error::WatchStartFailed)), + State::InitListed { + resource_version: resource_version.to_string(), + }, + ) + } + } +} + /// Watches a Kubernetes Resource for changes continuously /// /// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors. @@ -309,6 +430,70 @@ pub fn watcher( ) } +/// Watches a Kubernetes Resource for changes continuously and receives only the +/// metadata +/// +/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors. +/// +/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. +/// You can apply your own backoff by not polling the stream for a duration after errors. +/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as +/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) +/// will terminate eagerly as soon as they receive an [`Err`]. +/// +/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`]. +/// Direct users may want to flatten composite events via [`WatchStreamExt`]: +/// +/// ```no_run +/// use kube::{ +/// api::{Api, ListParams, ResourceExt}, Client, +/// runtime::{watcher, WatchStreamExt} +/// }; +/// use k8s_openapi::api::core::v1::Pod; +/// use futures::{StreamExt, TryStreamExt}; +/// #[tokio::main] +/// async fn main() -> Result<(), watcher::Error> { +/// let client = Client::try_default().await.unwrap(); +/// let pods: Api = Api::namespaced(client, "apps"); +/// +/// metadata_watcher(pods, ListParams::default()).applied_objects() +/// .try_for_each(|p| async move { +/// println!("Applied: {}", p.name_any()); +/// Ok(()) +/// }) +/// .await?; +/// Ok(()) +/// } +/// ``` +/// [`WatchStreamExt`]: super::WatchStreamExt +/// [`reflector`]: super::reflector::reflector +/// [`Api::watch`]: kube_client::Api::watch +/// +/// # Recovery +/// +/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned. +/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff) +/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters. +/// +/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last +/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) +/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. +/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with +/// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail. + +pub fn metadata_watcher( + api: Api, + list_params: ListParams, +) -> impl Stream>> + Send { + futures::stream::unfold( + (api, list_params, State::Empty), + |(api, list_params, state)| async { + let (event, state) = step_metadata(&api, &list_params, state).await; + Some((event, (api, list_params, state))) + }, + ) +} + /// Watch a single named object for updates /// /// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found). From 1de4cb67895e78ab18f5941d10b4f5b5cd3ac578 Mon Sep 17 00:00:00 2001 From: Matei David Date: Sun, 19 Feb 2023 19:25:39 +0000 Subject: [PATCH 02/12] Run clippy Signed-off-by: Matei David --- kube-runtime/src/watcher.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 2bcca3d61..c217e6831 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -360,9 +360,7 @@ where } ( Some(Err(err).map_err(Error::WatchStartFailed)), - State::InitListed { - resource_version: resource_version.to_string(), - }, + State::InitListed { resource_version }, ) } } @@ -447,7 +445,7 @@ pub fn watcher( /// ```no_run /// use kube::{ /// api::{Api, ListParams, ResourceExt}, Client, -/// runtime::{watcher, WatchStreamExt} +/// runtime::{watch_metadata, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; /// use futures::{StreamExt, TryStreamExt}; @@ -456,7 +454,7 @@ pub fn watcher( /// let client = Client::try_default().await.unwrap(); /// let pods: Api = Api::namespaced(client, "apps"); /// -/// metadata_watcher(pods, ListParams::default()).applied_objects() +/// watch_metadata(pods, ListParams::default()).applied_objects() /// .try_for_each(|p| async move { /// println!("Applied: {}", p.name_any()); /// Ok(()) @@ -481,7 +479,7 @@ pub fn watcher( /// If this fails because the resource version is no longer valid then we start over with a new stream, starting with /// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail. -pub fn metadata_watcher( +pub fn watch_metadata( api: Api, list_params: ListParams, ) -> impl Stream>> + Send { From 535bf298328ef20a3115836d22194be1d2276bac Mon Sep 17 00:00:00 2001 From: Matei David Date: Sun, 19 Feb 2023 19:49:42 +0000 Subject: [PATCH 03/12] Make closure arg generic Signed-off-by: Matei David --- kube-runtime/src/watcher.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index c217e6831..4ad3de352 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -134,8 +134,8 @@ enum State { } /// Helper to express nested `impl` return types in factories -trait AsyncFn: - Fn(String) -> Self::Future +trait AsyncFn: + Fn(A) -> Self::Future { type Future: Future>>, State)> + Send; } @@ -144,10 +144,11 @@ trait AsyncFn: /// machine /// /// Closures may take any argument and must return an (event, state) -impl AsyncFn for F +impl AsyncFn for F where + A: Send, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, - F: Fn(String) -> Fut, + F: Fn(A) -> Fut, Fut: Future>>, State)> + Send, { type Future = Fut; @@ -160,7 +161,7 @@ where fn make_step_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( api: &'a Api, list_params: &'a ListParams, -) -> (impl AsyncFn + 'a, impl AsyncFn + 'a) { +) -> (impl AsyncFn<(), K> + 'a, impl AsyncFn + 'a) { let list = move |_| async { let list = api.list(list_params).await; step_list(list) @@ -183,8 +184,8 @@ fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + S api: &'a Api, list_params: &'a ListParams, ) -> ( - impl AsyncFn + 'a, - impl AsyncFn + 'a, + impl AsyncFn<(), PartialObjectMeta> + 'a, + impl AsyncFn + 'a, ) { let list = move |_| async { let list = api.list_metadata(list_params).await; @@ -204,12 +205,12 @@ fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + S /// This function should be trampolined: if event == `None` /// then the function should be called again until it returns a Some. async fn step_trampolined( - step_list_fn: impl AsyncFn, - step_watch_fn: impl AsyncFn, + step_list_fn: impl AsyncFn<(), K>, + step_watch_fn: impl AsyncFn, state: State, ) -> (Option>>, State) { match state { - State::Empty => step_list_fn(String::new()).await, + State::Empty => step_list_fn(()).await, State::InitListed { resource_version } => step_watch_fn(resource_version).await, State::Watching { resource_version, From 1d60615d7a44935a823974e4eef4299090b55d71 Mon Sep 17 00:00:00 2001 From: Matei David Date: Sun, 19 Feb 2023 19:55:25 +0000 Subject: [PATCH 04/12] Fix doc test Signed-off-by: Matei David --- kube-runtime/src/watcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 4ad3de352..bbbdaf16f 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -446,7 +446,7 @@ pub fn watcher( /// ```no_run /// use kube::{ /// api::{Api, ListParams, ResourceExt}, Client, -/// runtime::{watch_metadata, WatchStreamExt} +/// runtime::{watcher, watcher::watch_metadata, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; /// use futures::{StreamExt, TryStreamExt}; From 757266b019750b0596e7e1b54713d45a96b9a916 Mon Sep 17 00:00:00 2001 From: Matei David Date: Mon, 20 Feb 2023 17:50:16 +0000 Subject: [PATCH 05/12] Bump MSRV to 1.63.0 Signed-off-by: Matei David --- .devcontainer/Dockerfile | 2 +- README.md | 2 +- kube-client/Cargo.toml | 2 +- kube-core/Cargo.toml | 2 +- kube-derive/Cargo.toml | 2 +- kube-runtime/Cargo.toml | 2 +- kube/Cargo.toml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 473356f57..046d2ccaf 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,4 +1,4 @@ -FROM docker.io/rust:1.60.0-bullseye +FROM docker.io/rust:1.63.0-bullseye ENV DEBIAN_FRONTEND=noninteractive RUN apt update && apt upgrade -y diff --git a/README.md b/README.md index 352973884..fdd9c18de 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # kube-rs [![Crates.io](https://img.shields.io/crates/v/kube.svg)](https://crates.io/crates/kube) -[![Rust 1.60](https://img.shields.io/badge/MSRV-1.60-dea584.svg)](https://github.com/rust-lang/rust/releases/tag/1.60.0) +[![Rust 1.63](https://img.shields.io/badge/MSRV-1.63-dea584.svg)](https://github.com/rust-lang/rust/releases/tag/1.63.0) [![Tested against Kubernetes v1_21 and above](https://img.shields.io/badge/MK8SV-v1_21-326ce5.svg)](https://kube.rs/kubernetes-version) [![Best Practices](https://bestpractices.coreinfrastructure.org/projects/5413/badge)](https://bestpractices.coreinfrastructure.org/projects/5413) [![Discord chat](https://img.shields.io/discord/500028886025895936.svg?logo=discord&style=plastic)](https://discord.gg/tokio) diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 1c33fcb72..869c5c757 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/kube-rs/kube" readme = "../README.md" keywords = ["kubernetes", "client",] categories = ["web-programming::http-client", "configuration", "network-programming", "api-bindings"] -rust-version = "1.60.0" +rust-version = "1.63.0" edition = "2021" [features] diff --git a/kube-core/Cargo.toml b/kube-core/Cargo.toml index 112fc67c0..421d744e1 100644 --- a/kube-core/Cargo.toml +++ b/kube-core/Cargo.toml @@ -7,7 +7,7 @@ authors = [ "kazk ", ] edition = "2021" -rust-version = "1.60.0" +rust-version = "1.63.0" license = "Apache-2.0" keywords = ["kubernetes", "apimachinery"] categories = ["api-bindings", "encoding", "parser-implementations"] diff --git a/kube-derive/Cargo.toml b/kube-derive/Cargo.toml index 1a5c1baa4..185aba3b7 100644 --- a/kube-derive/Cargo.toml +++ b/kube-derive/Cargo.toml @@ -7,7 +7,7 @@ authors = [ "kazk ", ] edition = "2021" -rust-version = "1.60.0" +rust-version = "1.63.0" license = "Apache-2.0" repository = "https://github.com/kube-rs/kube" readme = "../README.md" diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 30b184746..9439d1177 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/kube-rs/kube" readme = "../README.md" keywords = ["kubernetes", "runtime", "reflector", "watcher", "controller"] categories = ["web-programming::http-client", "caching", "network-programming"] -rust-version = "1.60.0" +rust-version = "1.63.0" edition = "2021" [package.metadata.docs.rs] diff --git a/kube/Cargo.toml b/kube/Cargo.toml index a1aab2a62..ae3ee3395 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -12,7 +12,7 @@ repository = "https://github.com/kube-rs/kube" readme = "../README.md" keywords = ["kubernetes", "client", "runtime", "cncf"] categories = ["network-programming", "caching", "api-bindings", "configuration", "encoding"] -rust-version = "1.60.0" +rust-version = "1.63.0" edition = "2021" [features] From a834d9e51920724a151d3071cc3719f9019a844a Mon Sep 17 00:00:00 2001 From: Matei David Date: Mon, 20 Feb 2023 17:52:59 +0000 Subject: [PATCH 06/12] Rename AsyncFn to StepFn Signed-off-by: Matei David --- kube-runtime/src/watcher.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index bbbdaf16f..c493ae58f 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -134,7 +134,7 @@ enum State { } /// Helper to express nested `impl` return types in factories -trait AsyncFn: +trait StepFn: Fn(A) -> Self::Future { type Future: Future>>, State)> + Send; @@ -144,7 +144,7 @@ trait AsyncFn AsyncFn for F +impl StepFn for F where A: Send, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, @@ -161,7 +161,7 @@ where fn make_step_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( api: &'a Api, list_params: &'a ListParams, -) -> (impl AsyncFn<(), K> + 'a, impl AsyncFn + 'a) { +) -> (impl StepFn<(), K> + 'a, impl StepFn + 'a) { let list = move |_| async { let list = api.list(list_params).await; step_list(list) @@ -184,8 +184,8 @@ fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + S api: &'a Api, list_params: &'a ListParams, ) -> ( - impl AsyncFn<(), PartialObjectMeta> + 'a, - impl AsyncFn + 'a, + impl StepFn<(), PartialObjectMeta> + 'a, + impl StepFn + 'a, ) { let list = move |_| async { let list = api.list_metadata(list_params).await; @@ -205,8 +205,8 @@ fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + S /// This function should be trampolined: if event == `None` /// then the function should be called again until it returns a Some. async fn step_trampolined( - step_list_fn: impl AsyncFn<(), K>, - step_watch_fn: impl AsyncFn, + step_list_fn: impl StepFn<(), K>, + step_watch_fn: impl StepFn, state: State, ) -> (Option>>, State) { match state { From e60f2f4767ebac8b62813715efb55b944340f863 Mon Sep 17 00:00:00 2001 From: Matei David Date: Mon, 20 Feb 2023 19:00:05 +0000 Subject: [PATCH 07/12] Add a compile-time typecheck and a meta example to dynamic watcher Signed-off-by: Matei David --- examples/dynamic_watcher.rs | 35 ++++++++++++++++++++++++------ kube-runtime/src/controller/mod.rs | 29 ++++++++++++++++++++++--- kube-runtime/src/watcher.rs | 1 - 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 2b669283a..7bc21a152 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -14,6 +14,12 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let client = Client::try_default().await?; + // If set will receive only the metadata for watched resources + let should_watch_meta = { + let v = env::var("WATCH_METADATA").unwrap_or_else(|_| "false".into()); + v.parse::().ok().unwrap_or_else(|| false) + }; + // Take dynamic resource identifiers: let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into()); let version = env::var("VERSION").unwrap_or_else(|_| "v1".into()); @@ -28,13 +34,28 @@ async fn main() -> anyhow::Result<()> { let api = Api::::all_with(client, &ar); // Fully compatible with kube-runtime - let mut items = watcher(api, ListParams::default()).applied_objects().boxed(); - while let Some(p) = items.try_next().await? { - if caps.scope == Scope::Cluster { - info!("saw {}", p.name_any()); - } else { - info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); + if should_watch_meta { + let mut items = watcher::watch_metadata(api, ListParams::default()) + .applied_objects() + .boxed(); + + while let Some(p) = items.try_next().await? { + if caps.scope == Scope::Cluster { + info!("saw {}", p.name_any()); + } else { + info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); + } } - } + } else { + let mut items = watcher(api, ListParams::default()).applied_objects().boxed(); + while let Some(p) = items.try_next().await? { + if caps.scope == Scope::Cluster { + info!("saw {}", p.name_any()); + } else { + info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); + } + } + }; + Ok(()) } diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 21ec5f9d9..c00c42be8 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -895,17 +895,29 @@ mod tests { use crate::{ applier, reflector::{self, ObjectRef}, - watcher, Controller, + watcher::{self, watch_metadata, watcher, Event}, + Controller, }; - use futures::{pin_mut, StreamExt, TryStreamExt}; + use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; - use kube_client::{core::ObjectMeta, Api}; + use kube_client::{core::ObjectMeta, Api, Resource}; + use serde::de::DeserializeOwned; use tokio::time::timeout; fn assert_send(x: T) -> T { x } + // Used to typecheck that a type T is a generic type that implements Stream + // and returns a WatchEvent generic over a resource `K` + fn assert_stream(x: T) -> T + where + T: Stream>> + Send, + K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static, + { + x + } + fn mock_type() -> T { unimplemented!( "mock_type is not supposed to be called, only used for filling holes in type assertions" @@ -924,6 +936,17 @@ mod tests { ); } + // not #[test] because we don't want to actually run it, we just want to + // assert that it typechecks + // + // will check return types for `watcher` and `watch_metadata` do not drift + // given an arbitrary K that implements `Resource` (e.g ConfigMap) + #[allow(dead_code, unused_must_use)] + fn test_watcher_stream_type_drift() { + assert_stream(watcher(mock_type::>(), Default::default())); + assert_stream(watch_metadata(mock_type::>(), Default::default())); + } + #[tokio::test] async fn applier_must_not_deadlock_if_reschedule_buffer_fills() { // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index c493ae58f..f98c41bd5 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -479,7 +479,6 @@ pub fn watcher( /// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. /// If this fails because the resource version is no longer valid then we start over with a new stream, starting with /// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail. - pub fn watch_metadata( api: Api, list_params: ListParams, From 89c0154592cae76a2ab0a01a3b67fae6455cd1be Mon Sep 17 00:00:00 2001 From: Matei David Date: Tue, 21 Feb 2023 10:15:10 +0000 Subject: [PATCH 08/12] Rename watch_metadata to metadata_watcher and allow module rep Signed-off-by: Matei David --- kube-runtime/src/controller/mod.rs | 7 +++++-- kube-runtime/src/lib.rs | 2 +- kube-runtime/src/watcher.rs | 7 ++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index c00c42be8..b03e9c577 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -895,7 +895,7 @@ mod tests { use crate::{ applier, reflector::{self, ObjectRef}, - watcher::{self, watch_metadata, watcher, Event}, + watcher::{self, metadata_watcher, watcher, Event}, Controller, }; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; @@ -944,7 +944,10 @@ mod tests { #[allow(dead_code, unused_must_use)] fn test_watcher_stream_type_drift() { assert_stream(watcher(mock_type::>(), Default::default())); - assert_stream(watch_metadata(mock_type::>(), Default::default())); + assert_stream(metadata_watcher( + mock_type::>(), + Default::default(), + )); } #[tokio::test] diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 7296ac019..f5e0beb56 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -32,4 +32,4 @@ pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; pub use utils::WatchStreamExt; -pub use watcher::watcher; +pub use watcher::{metadata_watcher, watcher}; diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index f98c41bd5..9f717b04b 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -446,7 +446,7 @@ pub fn watcher( /// ```no_run /// use kube::{ /// api::{Api, ListParams, ResourceExt}, Client, -/// runtime::{watcher, watcher::watch_metadata, WatchStreamExt} +/// runtime::{watcher, metadata_watcher, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; /// use futures::{StreamExt, TryStreamExt}; @@ -455,7 +455,7 @@ pub fn watcher( /// let client = Client::try_default().await.unwrap(); /// let pods: Api = Api::namespaced(client, "apps"); /// -/// watch_metadata(pods, ListParams::default()).applied_objects() +/// metadata_watcher(pods, ListParams::default()).applied_objects() /// .try_for_each(|p| async move { /// println!("Applied: {}", p.name_any()); /// Ok(()) @@ -479,7 +479,8 @@ pub fn watcher( /// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. /// If this fails because the resource version is no longer valid then we start over with a new stream, starting with /// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail. -pub fn watch_metadata( +#[allow(clippy::module_name_repetitions)] +pub fn metadata_watcher( api: Api, list_params: ListParams, ) -> impl Stream>> + Send { From f7d112f7202c69b7975a61ec7d667571ff362fbc Mon Sep 17 00:00:00 2001 From: Matei David Date: Tue, 21 Feb 2023 19:42:04 +0000 Subject: [PATCH 09/12] Add trait to specialize Api calls instead of relying on closures Signed-off-by: Matei David --- kube-runtime/Cargo.toml | 1 + kube-runtime/src/watcher.rs | 354 ++++++++++++++++-------------------- 2 files changed, 156 insertions(+), 199 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 9439d1177..a7cbd453d 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -35,6 +35,7 @@ json-patch = "0.3.0" serde_json = "1.0.68" thiserror = "1.0.29" backoff = "0.4.0" +async-trait = "0.1.64" [dependencies.k8s-openapi] version = "0.17.0" diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 9f717b04b..e13a3d631 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -3,9 +3,10 @@ //! See [`watcher`] for the primary entry point. use crate::utils::ResetTimerBackoff; +use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; use derivative::Derivative; -use futures::{stream::BoxStream, Future, Stream, StreamExt}; +use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ api::{ListParams, Resource, ResourceExt, WatchEvent}, core::{metadata::PartialObjectMeta, ObjectList}, @@ -115,7 +116,7 @@ impl Event { #[derive(Derivative)] #[derivative(Debug)] /// The internal finite state machine driving the [`watcher`] -enum State { +enum State { /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects Empty, /// The initial LIST was successful, so we should move on to starting the actual watch. @@ -133,236 +134,191 @@ enum State { }, } -/// Helper to express nested `impl` return types in factories -trait StepFn: - Fn(A) -> Self::Future -{ - type Future: Future>>, State)> + Send; -} +/// Used to control whether the watcher receives the full object, or only the +/// metadata +#[async_trait] +trait ApiMode { + type Value: Clone; -/// Allows factories to return closures that are used to drive the watcher's state -/// machine -/// -/// Closures may take any argument and must return an (event, state) -impl StepFn for F -where - A: Send, - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, - F: Fn(A) -> Fut, - Fut: Future>>, State)> + Send, -{ - type Future = Fut; + async fn list(&self, lp: &ListParams) -> kube_client::Result>; + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>>; } -/// Factory that returns two closures used to drive the watcher's state machine -/// -/// Used as an indirection mechanism to call `list` and `watch` on the -/// underlying Api type. -fn make_step_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( +/// A wrapper around the `Api` of a `Resource` type that when used by the +/// watcher will return the entire (full) object +struct FullObject<'a, K> { api: &'a Api, - list_params: &'a ListParams, -) -> (impl StepFn<(), K> + 'a, impl StepFn + 'a) { - let list = move |_| async { - let list = api.list(list_params).await; - step_list(list) - }; +} - let watch = move |resource_version: String| async { - let watch = api.watch(list_params, &resource_version).await; - step_watch(watch, resource_version) - }; +#[async_trait] +impl ApiMode for FullObject<'_, K> +where + K: Clone + Debug + DeserializeOwned + Send + 'static, +{ + type Value = K; - (list, watch) + async fn list(&self, lp: &ListParams) -> kube_client::Result> { + self.api.list(lp).await + } + + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>> { + self.api.watch(lp, version).await.map(StreamExt::boxed) + } } -/// Factory that returns two closures used to drive the watcher's state machine -/// -/// Used as an indirection mechanism to call `list_metadata` and -/// `watch_metadata` on the underlying Api type. Closures returned are -/// specialized for use with metadata only. -fn make_step_metadata_api<'a, K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>( +/// A wrapper around the `Api` of a `Resource` type that when used by the +/// watcher will return only the metadata associated with an object +struct MetaOnly<'a, K> { api: &'a Api, - list_params: &'a ListParams, -) -> ( - impl StepFn<(), PartialObjectMeta> + 'a, - impl StepFn + 'a, -) { - let list = move |_| async { - let list = api.list_metadata(list_params).await; - step_list::(list) - }; +} - let watch = move |resource_version: String| async { - let watch = api.watch_metadata(list_params, &resource_version).await; - step_watch::(watch, resource_version) - }; +#[async_trait] +impl ApiMode for MetaOnly<'_, K> +where + K: Clone + Debug + DeserializeOwned + Send + 'static, +{ + type Value = PartialObjectMeta; + + async fn list(&self, lp: &ListParams) -> kube_client::Result> { + self.api.list_metadata(lp).await + } - (list, watch) + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>> { + self.api.watch_metadata(lp, version).await.map(StreamExt::boxed) + } } /// Progresses the watcher a single step, returning (event, state) /// /// This function should be trampolined: if event == `None` /// then the function should be called again until it returns a Some. -async fn step_trampolined( - step_list_fn: impl StepFn<(), K>, - step_watch_fn: impl StepFn, - state: State, -) -> (Option>>, State) { - match state { - State::Empty => step_list_fn(()).await, - State::InitListed { resource_version } => step_watch_fn(resource_version).await, - State::Watching { - resource_version, - stream, - } => step_next(resource_version, stream).await, - } -} - -/// Trampoline helper for `step_trampolined` -async fn step( - api: &Api, +async fn step_trampolined( + api: &A, list_params: &ListParams, - mut state: State, -) -> (Result>, State) { - loop { - let (list_fn, watch_fn) = make_step_api(api, list_params); - match step_trampolined(list_fn, watch_fn, state).await { - (Some(result), new_state) => return (result, new_state), - (None, new_state) => state = new_state, - } - } -} - -/// Trampoline helper for `step_trampolined` that returns a concrete type -async fn step_metadata( - api: &Api, - list_params: &ListParams, - mut state: State, -) -> (Result>, State) { - loop { - let (list_fn, watch_fn) = make_step_metadata_api(api, list_params); - match step_trampolined::(list_fn, watch_fn, state).await { - (Some(result), new_state) => return (result, new_state), - (None, new_state) => state = new_state, - } - } -} - -/// Helper for `step_trampolined` to process the next element in a watch stream -async fn step_next( - resource_version: String, - mut stream: BoxStream<'static, kube_client::Result>>, -) -> (Option>>, State) + state: State, +) -> (Option>>, State) where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, + A: ApiMode, + A::Value: Resource + 'static, { - match stream.next().await { - Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { - let resource_version = obj.resource_version().unwrap(); - (Some(Ok(Event::Applied(obj))), State::Watching { - resource_version, - stream, - }) - } - Some(Ok(WatchEvent::Deleted(obj))) => { - let resource_version = obj.resource_version().unwrap(); - (Some(Ok(Event::Deleted(obj))), State::Watching { + match state { + State::Empty => match api.list(list_params).await { + Ok(list) => { + if let Some(resource_version) = list.metadata.resource_version { + (Some(Ok(Event::Restarted(list.items))), State::InitListed { + resource_version, + }) + } else { + (Some(Err(Error::NoResourceVersion)), State::Empty) + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) + } + }, + State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await { + Ok(stream) => (None, State::Watching { resource_version, stream, - }) - } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), - Some(Ok(WatchEvent::Error(err))) => { - // HTTP GONE, means we have desynced and need to start over and re-list :( - let new_state = if err.code == 410 { - State::Empty - } else { - State::Watching { + }), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + ( + Some(Err(err).map_err(Error::WatchStartFailed)), + State::InitListed { resource_version }, + ) + } + }, + State::Watching { + resource_version, + mut stream, + } => match stream.next().await { + Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { + let resource_version = obj.resource_version().unwrap(); + (Some(Ok(Event::Applied(obj))), State::Watching { resource_version, stream, - } - }; - if err.code == 403 { - warn!("watcher watchevent error 403: {err:?}"); - } else { - debug!("error watchevent error: {err:?}"); + }) } - (Some(Err(err).map_err(Error::WatchError)), new_state) - } - Some(Err(err)) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watcher error 403: {err:?}"); - } else { - debug!("watcher error: {err:?}"); + Some(Ok(WatchEvent::Deleted(obj))) => { + let resource_version = obj.resource_version().unwrap(); + (Some(Ok(Event::Deleted(obj))), State::Watching { + resource_version, + stream, + }) } - (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { - resource_version, + Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { + resource_version: bm.metadata.resource_version, stream, - }) - } - None => (None, State::InitListed { resource_version }), - } -} - -/// Helper for `step_trampolined` to initialize the watch with a LIST -/// -/// Used by closures that are returned from factories to provide indirection and -/// specialization where needed. -fn step_list(list_result: kube_client::Result>) -> (Option>>, State) -where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, -{ - match list_result { - Ok(list) => { - if let Some(resource_version) = list.metadata.resource_version { - (Some(Ok(Event::Restarted(list.items))), State::InitListed { + }), + Some(Ok(WatchEvent::Error(err))) => { + // HTTP GONE, means we have desynced and need to start over and re-list :( + let new_state = if err.code == 410 { + State::Empty + } else { + State::Watching { + resource_version, + stream, + } + }; + if err.code == 403 { + warn!("watcher watchevent error 403: {err:?}"); + } else { + debug!("error watchevent error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchError)), new_state) + } + Some(Err(err)) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watcher error 403: {err:?}"); + } else { + debug!("watcher error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { resource_version, + stream, }) - } else { - (Some(Err(Error::NoResourceVersion)), State::Empty) - } - } - Err(err) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch list error with 403: {err:?}"); - } else { - debug!("watch list error: {err:?}"); } - (Some(Err(err).map_err(Error::InitialListFailed)), State::Empty) - } + None => (None, State::InitListed { resource_version }), + }, } } -/// Helper for `step_trampolined` to itialize the watch with a LIST -/// -/// Used by closures that are returned from factories to provide indirection and -/// specialization where needed. -fn step_watch( - watch_event: kube_client::Result>> + Send + 'static>, - resource_version: String, -) -> (Option>>, State) +/// Trampoline helper for `step_trampolined` +async fn step( + api: &A, + list_params: &ListParams, + mut state: State, +) -> (Result>, State) where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, + A: ApiMode, + A::Value: Resource + 'static, { - match watch_event { - Ok(stream) => (None, State::Watching { - resource_version, - stream: stream.boxed(), - }), - Err(err) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch initlist error with 403: {err:?}"); - } else { - debug!("watch initlist error: {err:?}"); - } - ( - Some(Err(err).map_err(Error::WatchStartFailed)), - State::InitListed { resource_version }, - ) + loop { + match step_trampolined(api, list_params, state).await { + (Some(result), new_state) => return (result, new_state), + (None, new_state) => state = new_state, } } } @@ -423,7 +379,7 @@ pub fn watcher( futures::stream::unfold( (api, list_params, State::Empty), |(api, list_params, state)| async { - let (event, state) = step(&api, &list_params, state).await; + let (event, state) = step(&FullObject { api: &api }, &list_params, state).await; Some((event, (api, list_params, state))) }, ) @@ -487,7 +443,7 @@ pub fn metadata_watcher Date: Tue, 21 Feb 2023 22:05:45 +0000 Subject: [PATCH 10/12] Change meta watcher fn name in example Signed-off-by: Matei David --- examples/dynamic_watcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 7bc21a152..21788cb05 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt}; use kube::{ api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt}, discovery::{self, Scope}, - runtime::{watcher, WatchStreamExt}, + runtime::{metadata_watcher, watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { // Fully compatible with kube-runtime if should_watch_meta { - let mut items = watcher::watch_metadata(api, ListParams::default()) + let mut items = metadata_watcher(api, ListParams::default()) .applied_objects() .boxed(); From 7b183187f58b60759ee101620f39a5125755ce51 Mon Sep 17 00:00:00 2001 From: Matei David Date: Tue, 21 Feb 2023 22:07:11 +0000 Subject: [PATCH 11/12] Parse evar as 1 Signed-off-by: Matei David --- examples/dynamic_watcher.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 21788cb05..818e38b21 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -15,10 +15,7 @@ async fn main() -> anyhow::Result<()> { let client = Client::try_default().await?; // If set will receive only the metadata for watched resources - let should_watch_meta = { - let v = env::var("WATCH_METADATA").unwrap_or_else(|_| "false".into()); - v.parse::().ok().unwrap_or_else(|| false) - }; + let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false); // Take dynamic resource identifiers: let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into()); @@ -34,7 +31,7 @@ async fn main() -> anyhow::Result<()> { let api = Api::::all_with(client, &ar); // Fully compatible with kube-runtime - if should_watch_meta { + if watch_metadata { let mut items = metadata_watcher(api, ListParams::default()) .applied_objects() .boxed(); From 72e0b77c199c6b488f4947e0db4e84069e0c5a63 Mon Sep 17 00:00:00 2001 From: Matei David Date: Wed, 22 Feb 2023 13:00:17 +0000 Subject: [PATCH 12/12] Refactor dynamic_watcher example Signed-off-by: Matei David --- examples/dynamic_watcher.rs | 47 +++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 818e38b21..2592a0892 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -1,13 +1,14 @@ -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use kube::{ api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt}, - discovery::{self, Scope}, + discovery::{self, ApiCapabilities, Scope}, runtime::{metadata_watcher, watcher, WatchStreamExt}, Client, }; +use serde::de::DeserializeOwned; use tracing::*; -use std::env; +use std::{env, fmt::Debug}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -30,29 +31,29 @@ async fn main() -> anyhow::Result<()> { // Use the full resource info to create an Api with the ApiResource as its DynamicType let api = Api::::all_with(client, &ar); - // Fully compatible with kube-runtime + // Start a metadata or a full resource watch if watch_metadata { - let mut items = metadata_watcher(api, ListParams::default()) - .applied_objects() - .boxed(); - - while let Some(p) = items.try_next().await? { - if caps.scope == Scope::Cluster { - info!("saw {}", p.name_any()); - } else { - info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); - } - } + handle_events(metadata_watcher(api, ListParams::default()), caps).await? } else { - let mut items = watcher(api, ListParams::default()).applied_objects().boxed(); - while let Some(p) = items.try_next().await? { - if caps.scope == Scope::Cluster { - info!("saw {}", p.name_any()); - } else { - info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); - } + handle_events(watcher(api, ListParams::default()), caps).await? + } + + Ok(()) +} + +async fn handle_events( + stream: impl Stream>> + Send + 'static, + api_caps: ApiCapabilities, +) -> anyhow::Result<()> { + // Fully compatible with kube-runtime + let mut items = stream.applied_objects().boxed(); + while let Some(p) = items.try_next().await? { + if api_caps.scope == Scope::Cluster { + info!("saw {}", p.name_any()); + } else { + info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); } - }; + } Ok(()) }