diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 2b669283a..2592a0892 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -1,19 +1,23 @@ -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use kube::{ api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt}, - discovery::{self, Scope}, - runtime::{watcher, WatchStreamExt}, + discovery::{self, ApiCapabilities, Scope}, + runtime::{metadata_watcher, watcher, WatchStreamExt}, Client, }; +use serde::de::DeserializeOwned; use tracing::*; -use std::env; +use std::{env, fmt::Debug}; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let client = Client::try_default().await?; + // If set will receive only the metadata for watched resources + let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false); + // Take dynamic resource identifiers: let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into()); let version = env::var("VERSION").unwrap_or_else(|_| "v1".into()); @@ -27,14 +31,29 @@ async fn main() -> anyhow::Result<()> { // Use the full resource info to create an Api with the ApiResource as its DynamicType let api = Api::::all_with(client, &ar); + // Start a metadata or a full resource watch + if watch_metadata { + handle_events(metadata_watcher(api, ListParams::default()), caps).await? + } else { + handle_events(watcher(api, ListParams::default()), caps).await? + } + + Ok(()) +} + +async fn handle_events( + stream: impl Stream>> + Send + 'static, + api_caps: ApiCapabilities, +) -> anyhow::Result<()> { // Fully compatible with kube-runtime - let mut items = watcher(api, ListParams::default()).applied_objects().boxed(); + let mut items = stream.applied_objects().boxed(); while let Some(p) = items.try_next().await? { - if caps.scope == Scope::Cluster { + if api_caps.scope == Scope::Cluster { info!("saw {}", p.name_any()); } else { info!("saw {} in {}", p.name_any(), p.namespace().unwrap()); } } + Ok(()) } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 6adac2fd9..db966489b 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -39,6 +39,7 @@ json-patch = "0.3.0" serde_json = "1.0.68" thiserror = "1.0.29" backoff = "0.4.0" +async-trait = "0.1.64" [dependencies.k8s-openapi] version = "0.17.0" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 21ec5f9d9..b03e9c577 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -895,17 +895,29 @@ mod tests { use crate::{ applier, reflector::{self, ObjectRef}, - watcher, Controller, + watcher::{self, metadata_watcher, watcher, Event}, + Controller, }; - use futures::{pin_mut, StreamExt, TryStreamExt}; + use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; - use kube_client::{core::ObjectMeta, Api}; + use kube_client::{core::ObjectMeta, Api, Resource}; + use serde::de::DeserializeOwned; use tokio::time::timeout; fn assert_send(x: T) -> T { x } + // Used to typecheck that a type T is a generic type that implements Stream + // and returns a WatchEvent generic over a resource `K` + fn assert_stream(x: T) -> T + where + T: Stream>> + Send, + K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static, + { + x + } + fn mock_type() -> T { unimplemented!( "mock_type is not supposed to be called, only used for filling holes in type assertions" @@ -924,6 +936,20 @@ mod tests { ); } + // not #[test] because we don't want to actually run it, we just want to + // assert that it typechecks + // + // will check return types for `watcher` and `watch_metadata` do not drift + // given an arbitrary K that implements `Resource` (e.g ConfigMap) + #[allow(dead_code, unused_must_use)] + fn test_watcher_stream_type_drift() { + assert_stream(watcher(mock_type::>(), Default::default())); + assert_stream(metadata_watcher( + mock_type::>(), + Default::default(), + )); + } + #[tokio::test] async fn applier_must_not_deadlock_if_reschedule_buffer_fills() { // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 7296ac019..f5e0beb56 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -32,4 +32,4 @@ pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; pub use utils::WatchStreamExt; -pub use watcher::watcher; +pub use watcher::{metadata_watcher, watcher}; diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 7b202f0ea..e13a3d631 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -3,11 +3,13 @@ //! See [`watcher`] for the primary entry point. use crate::utils::ResetTimerBackoff; +use async_trait::async_trait; use backoff::{backoff::Backoff, ExponentialBackoff}; use derivative::Derivative; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ api::{ListParams, Resource, ResourceExt, WatchEvent}, + core::{metadata::PartialObjectMeta, ObjectList}, error::ErrorResponse, Api, Error as ClientErr, }; @@ -114,7 +116,7 @@ impl Event { #[derive(Derivative)] #[derivative(Debug)] /// The internal finite state machine driving the [`watcher`] -enum State { +enum State { /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects Empty, /// The initial LIST was successful, so we should move on to starting the actual watch. @@ -132,15 +134,85 @@ enum State { }, } +/// Used to control whether the watcher receives the full object, or only the +/// metadata +#[async_trait] +trait ApiMode { + type Value: Clone; + + async fn list(&self, lp: &ListParams) -> kube_client::Result>; + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>>; +} + +/// A wrapper around the `Api` of a `Resource` type that when used by the +/// watcher will return the entire (full) object +struct FullObject<'a, K> { + api: &'a Api, +} + +#[async_trait] +impl ApiMode for FullObject<'_, K> +where + K: Clone + Debug + DeserializeOwned + Send + 'static, +{ + type Value = K; + + async fn list(&self, lp: &ListParams) -> kube_client::Result> { + self.api.list(lp).await + } + + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>> { + self.api.watch(lp, version).await.map(StreamExt::boxed) + } +} + +/// A wrapper around the `Api` of a `Resource` type that when used by the +/// watcher will return only the metadata associated with an object +struct MetaOnly<'a, K> { + api: &'a Api, +} + +#[async_trait] +impl ApiMode for MetaOnly<'_, K> +where + K: Clone + Debug + DeserializeOwned + Send + 'static, +{ + type Value = PartialObjectMeta; + + async fn list(&self, lp: &ListParams) -> kube_client::Result> { + self.api.list_metadata(lp).await + } + + async fn watch( + &self, + lp: &ListParams, + version: &str, + ) -> kube_client::Result>>> { + self.api.watch_metadata(lp, version).await.map(StreamExt::boxed) + } +} + /// Progresses the watcher a single step, returning (event, state) /// /// This function should be trampolined: if event == `None` /// then the function should be called again until it returns a Some. -async fn step_trampolined( - api: &Api, +async fn step_trampolined( + api: &A, list_params: &ListParams, - state: State, -) -> (Option>>, State) { + state: State, +) -> (Option>>, State) +where + A: ApiMode, + A::Value: Resource + 'static, +{ match state { State::Empty => match api.list(list_params).await { Ok(list) => { @@ -164,7 +236,7 @@ async fn step_trampolined match api.watch(list_params, &resource_version).await { Ok(stream) => (None, State::Watching { resource_version, - stream: stream.boxed(), + stream, }), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { @@ -234,11 +306,15 @@ async fn step_trampolined( - api: &Api, +async fn step( + api: &A, list_params: &ListParams, - mut state: State, -) -> (Result>, State) { + mut state: State, +) -> (Result>, State) +where + A: ApiMode, + A::Value: Resource + 'static, +{ loop { match step_trampolined(api, list_params, state).await { (Some(result), new_state) => return (result, new_state), @@ -303,7 +379,71 @@ pub fn watcher( futures::stream::unfold( (api, list_params, State::Empty), |(api, list_params, state)| async { - let (event, state) = step(&api, &list_params, state).await; + let (event, state) = step(&FullObject { api: &api }, &list_params, state).await; + Some((event, (api, list_params, state))) + }, + ) +} + +/// Watches a Kubernetes Resource for changes continuously and receives only the +/// metadata +/// +/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors. +/// +/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. +/// You can apply your own backoff by not polling the stream for a duration after errors. +/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as +/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) +/// will terminate eagerly as soon as they receive an [`Err`]. +/// +/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`]. +/// Direct users may want to flatten composite events via [`WatchStreamExt`]: +/// +/// ```no_run +/// use kube::{ +/// api::{Api, ListParams, ResourceExt}, Client, +/// runtime::{watcher, metadata_watcher, WatchStreamExt} +/// }; +/// use k8s_openapi::api::core::v1::Pod; +/// use futures::{StreamExt, TryStreamExt}; +/// #[tokio::main] +/// async fn main() -> Result<(), watcher::Error> { +/// let client = Client::try_default().await.unwrap(); +/// let pods: Api = Api::namespaced(client, "apps"); +/// +/// metadata_watcher(pods, ListParams::default()).applied_objects() +/// .try_for_each(|p| async move { +/// println!("Applied: {}", p.name_any()); +/// Ok(()) +/// }) +/// .await?; +/// Ok(()) +/// } +/// ``` +/// [`WatchStreamExt`]: super::WatchStreamExt +/// [`reflector`]: super::reflector::reflector +/// [`Api::watch`]: kube_client::Api::watch +/// +/// # Recovery +/// +/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned. +/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff) +/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters. +/// +/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last +/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) +/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. +/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with +/// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail. +#[allow(clippy::module_name_repetitions)] +pub fn metadata_watcher( + api: Api, + list_params: ListParams, +) -> impl Stream>> + Send { + futures::stream::unfold( + (api, list_params, State::Empty), + |(api, list_params, state)| async { + let (event, state) = step(&MetaOnly { api: &api }, &list_params, state).await; Some((event, (api, list_params, state))) }, )