Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Event::modify + reflector::store helpers #907

Merged
merged 11 commits into from
May 12, 2022
5 changes: 2 additions & 3 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ async fn main() -> anyhow::Result<()> {
.labels("kubernetes.io/arch=amd64") // filter instances by label
.timeout(10); // short watch timeout in this example

let store = reflector::store::Writer::<Node>::default();
let reader = store.as_reader();
let rf = reflector(store, watcher(nodes, lp));
let (reader, writer) = reflector::store();
let rf = reflector(writer, watcher(nodes, lp));

// Periodically read our state in the background
tokio::spawn(async move {
Expand Down
44 changes: 32 additions & 12 deletions examples/pod_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::prelude::*;
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
runtime::{reflector, watcher},
Client,
runtime::{reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;

Expand All @@ -13,15 +13,35 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;

let api: Api<Pod> = Api::default_namespaced(client);
let store_w = reflector::store::Writer::default();
let store = store_w.as_reader();
let reflector = reflector(store_w, watcher(api, ListParams::default()));
// Use try_for_each to fail on first error, use for_each to keep retrying
reflector
.try_for_each(|_event| async {
info!("Current pod count: {}", store.state().len());
Ok(())
let (reader, writer) = reflector::store::<Pod>();

tokio::spawn(async move {
// Show state every 5 seconds of watching
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("Current pod count: {}", reader.state().len());
// full information with debug logs
for p in reader.state() {
let yaml = serde_yaml::to_string(p.as_ref()).unwrap();
debug!("Pod {}: \n{}", p.name(), yaml);
}
}
});

let stream = watcher(api, ListParams::default()).map_ok(|ev| {
ev.modify(|pod| {
// memory optimization for our store - we don't care about fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
})
.await?;
});

let rf = reflector(writer, stream).applied_objects();
futures::pin_mut!(rf);

while let Some(pod) = rf.try_next().await? {
info!("saw {}", pod.name());
}
Ok(())
}
6 changes: 3 additions & 3 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube_client::Resource;
use std::hash::Hash;
pub use store::Store;
pub use store::{store, Store};

/// Caches objects from `watcher::Event`s to a local `Store`
///
/// Keep in mind that the `Store` is just a cache, and may be out of date.
///
/// Note: It is a bad idea to feed a single `reflector` from multiple `watcher`s, since
/// the whole `Store` will be cleared whenever any of them emits a `Restarted` event.
pub fn reflector<K, W>(mut store: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Resource + Clone,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
stream.inspect_ok(move |event| store.apply_watcher_event(event))
stream.inspect_ok(move |event| writer.apply_watcher_event(event))
}

#[cfg(test)]
Expand Down
24 changes: 20 additions & 4 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,26 @@ where
}
}


/// Create a (Reader, Writer) for a `Store<K>` for a typed resource `K`
///
/// The `Writer` should be passed to a [`reflector()`],
/// and the [`Store`] is a read-only handle.
#[must_use]
pub fn store<K>() -> (Store<K>, Writer<K>)
where
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::default();
let r = w.as_reader();
(r, w)
}


#[cfg(test)]
mod tests {
use super::Writer;
use super::{store, Writer};
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::api::ObjectMeta;
Expand Down Expand Up @@ -180,9 +197,8 @@ mod tests {
},
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
let store = store_w.as_reader();
let (store, mut writer) = store();
writer.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}

Expand Down
30 changes: 30 additions & 0 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,36 @@ impl<K> Event<K> {
}
.into_iter()
}

/// Map each object in an event through a mutator fn
///
/// This allows for memory optimizations in watch streams.
/// If you are chaining a watch stream into a reflector as an in memory state store,
/// you can control the space used by each object by dropping fields.
///
/// ```no_run
/// use k8s_openapi::api::core::v1::Pod;
/// use kube::ResourceExt;
/// # use kube::runtime::watcher::Event;
/// # let event: Event<Pod> = todo!();
/// event.modify(|pod| {
/// pod.managed_fields_mut().clear();
/// pod.annotations_mut().clear();
/// pod.status = None;
/// });
/// ```
#[must_use]
pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
match &mut self {
Event::Applied(obj) | Event::Deleted(obj) => (f)(obj),
Event::Restarted(objs) => {
for k in objs {
(f)(k)
}
}
}
self
}
}

#[derive(Derivative)]
Expand Down