Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for persistent metadata watches #1145

Merged
merged 17 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
discovery::{self, Scope},
runtime::{watcher, WatchStreamExt},
discovery::{self, ApiCapabilities, Scope},
runtime::{metadata_watcher, watcher, WatchStreamExt},
Client,
};
use serde::de::DeserializeOwned;
use tracing::*;

use std::env;
use std::{env, fmt::Debug};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

// If set will receive only the metadata for watched resources
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);

// Take dynamic resource identifiers:
let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into());
let version = env::var("VERSION").unwrap_or_else(|_| "v1".into());
Expand All @@ -27,14 +31,29 @@ async fn main() -> anyhow::Result<()> {
// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);

// Start a metadata or a full resource watch
if watch_metadata {
handle_events(metadata_watcher(api, ListParams::default()), caps).await?
} else {
handle_events(watcher(api, ListParams::default()), caps).await?
}

Ok(())
}

async fn handle_events<K: kube::Resource + Clone + Debug + Send + DeserializeOwned + 'static>(
stream: impl Stream<Item = watcher::Result<watcher::Event<K>>> + Send + 'static,
api_caps: ApiCapabilities,
) -> anyhow::Result<()> {
// Fully compatible with kube-runtime
let mut items = watcher(api, ListParams::default()).applied_objects().boxed();
let mut items = stream.applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
if api_caps.scope == Scope::Cluster {
info!("saw {}", p.name_any());
} else {
info!("saw {} in {}", p.name_any(), p.namespace().unwrap());
}
}

Ok(())
}
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ json-patch = "0.3.0"
serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"
async-trait = "0.1.64"

[dependencies.k8s-openapi]
version = "0.17.0"
Expand Down
32 changes: 29 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,17 +895,29 @@ mod tests {
use crate::{
applier,
reflector::{self, ObjectRef},
watcher, Controller,
watcher::{self, metadata_watcher, watcher, Event},
Controller,
};
use futures::{pin_mut, StreamExt, TryStreamExt};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{core::ObjectMeta, Api};
use kube_client::{core::ObjectMeta, Api, Resource};
use serde::de::DeserializeOwned;
use tokio::time::timeout;

fn assert_send<T: Send>(x: T) -> T {
x
}

// Used to typecheck that a type T is a generic type that implements Stream
// and returns a WatchEvent generic over a resource `K`
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
}

fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
Expand All @@ -924,6 +936,20 @@ mod tests {
);
}

// not #[test] because we don't want to actually run it, we just want to
// assert that it typechecks
//
// will check return types for `watcher` and `watch_metadata` do not drift
// given an arbitrary K that implements `Resource` (e.g ConfigMap)
#[allow(dead_code, unused_must_use)]
fn test_watcher_stream_type_drift() {
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
assert_stream(metadata_watcher(
mock_type::<Api<ConfigMap>>(),
Default::default(),
));
}

#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
pub use watcher::watcher;
pub use watcher::{metadata_watcher, watcher};
162 changes: 151 additions & 11 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
//! See [`watcher`] for the primary entry point.

use crate::utils::ResetTimerBackoff;
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
core::{metadata::PartialObjectMeta, ObjectList},
error::ErrorResponse,
Api, Error as ClientErr,
};
Expand Down Expand Up @@ -114,7 +116,7 @@ impl<K> Event<K> {
#[derive(Derivative)]
#[derivative(Debug)]
/// The internal finite state machine driving the [`watcher`]
enum State<K: Resource + Clone> {
enum State<K> {
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
/// The initial LIST was successful, so we should move on to starting the actual watch.
Expand All @@ -132,15 +134,85 @@ enum State<K: Resource + Clone> {
},
}

/// Used to control whether the watcher receives the full object, or only the
/// metadata
#[async_trait]
trait ApiMode {
type Value: Clone;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
}

/// A wrapper around the `Api` of a `Resource` type that when used by the
/// watcher will return the entire (full) object
struct FullObject<'a, K> {
api: &'a Api<K>,
}

#[async_trait]
impl<K> ApiMode for FullObject<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = K;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list(lp).await
}

async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch(lp, version).await.map(StreamExt::boxed)
}
}

/// A wrapper around the `Api` of a `Resource` type that when used by the
/// watcher will return only the metadata associated with an object
struct MetaOnly<'a, K> {
api: &'a Api<K>,
}

#[async_trait]
impl<K> ApiMode for MetaOnly<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = PartialObjectMeta;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list_metadata(lp).await
}

async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch_metadata(lp, version).await.map(StreamExt::boxed)
}
}

/// Progresses the watcher a single step, returning (event, state)
///
/// This function should be trampolined: if event == `None`
/// then the function should be called again until it returns a Some.
async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
async fn step_trampolined<A>(
api: &A,
list_params: &ListParams,
state: State<K>,
) -> (Option<Result<Event<K>>>, State<K>) {
state: State<A::Value>,
) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
match state {
State::Empty => match api.list(list_params).await {
Ok(list) => {
Expand All @@ -164,7 +236,7 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
Expand Down Expand Up @@ -234,11 +306,15 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
}

/// Trampoline helper for `step_trampolined`
async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
async fn step<A>(
api: &A,
list_params: &ListParams,
mut state: State<K>,
) -> (Result<Event<K>>, State<K>) {
mut state: State<A::Value>,
) -> (Result<Event<A::Value>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
loop {
match step_trampolined(api, list_params, state).await {
(Some(result), new_state) => return (result, new_state),
Expand Down Expand Up @@ -303,7 +379,71 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
let (event, state) = step(&FullObject { api: &api }, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
}

/// Watches a Kubernetes Resource for changes continuously and receives only the
/// metadata
///
/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
///
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
/// You can apply your own backoff by not polling the stream for a duration after errors.
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
///
/// ```no_run
/// use kube::{
/// api::{Api, ListParams, ResourceExt}, Client,
/// runtime::{watcher, metadata_watcher, WatchStreamExt}
/// };
/// use k8s_openapi::api::core::v1::Pod;
/// use futures::{StreamExt, TryStreamExt};
/// #[tokio::main]
/// async fn main() -> Result<(), watcher::Error> {
/// let client = Client::try_default().await.unwrap();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
///
/// metadata_watcher(pods, ListParams::default()).applied_objects()
/// .try_for_each(|p| async move {
/// println!("Applied: {}", p.name_any());
/// Ok(())
/// })
/// .await?;
/// Ok(())
/// }
/// ```
/// [`WatchStreamExt`]: super::WatchStreamExt
/// [`reflector`]: super::reflector::reflector
/// [`Api::watch`]: kube_client::Api::watch
///
/// # Recovery
///
/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
///
/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
/// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail.
#[allow(clippy::module_name_repetitions)]
pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<PartialObjectMeta>>> + Send {
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&MetaOnly { api: &api }, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
Expand Down