Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arc wrap watcher output #1266

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl Action {
/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
mapper: impl Fn(Arc<T>) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
S: TryStream<Ok = Arc<T>>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
Expand All @@ -110,29 +110,29 @@ pub fn trigger_self<K, S>(
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
fn trigger_others<T, S, K, I>(
stream: S,
mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
dyntype: <S::Ok as Resource>::DynamicType,
mapper: impl Fn(Arc<T>) -> I + Sync + Send + 'static,
dyntype: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
// Input stream has items as some Resource (via Controller::watches)
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
// Output stream is requests for the root type K
K: Resource,
K::DynamicType: Clone,
Expand All @@ -141,7 +141,7 @@ where
I::IntoIter: Send,
{
trigger_with(stream, move |obj| {
let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
Expand All @@ -154,19 +154,19 @@ where
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(
pub fn trigger_owners<KOwner, S, T>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
child_type: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let mapper = move |obj: Arc<T>| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
Expand Down Expand Up @@ -605,7 +605,7 @@ where
/// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
) -> Self
where
Expand All @@ -629,7 +629,7 @@ where
/// [`dynamic`]: kube_client::core::dynamic
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream_with(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
Expand Down Expand Up @@ -680,7 +680,9 @@ where
///
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
pub fn owns<
Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static + Sync,
>(
self,
api: Api<Child>,
wc: Config,
Expand All @@ -692,7 +694,7 @@ where
///
/// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
Expand Down Expand Up @@ -750,7 +752,7 @@ where
#[must_use]
pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
) -> Self {
self.owns_stream_with(trigger, ())
}
Expand All @@ -768,7 +770,7 @@ where
#[must_use]
pub fn owns_stream_with<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -848,10 +850,10 @@ where
self,
api: Api<Other>,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
Other::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Expand All @@ -868,10 +870,10 @@ where
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Other::DynamicType: Debug + Clone + Eq + Hash,
Expand Down Expand Up @@ -923,8 +925,8 @@ where
#[must_use]
pub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Expand All @@ -948,8 +950,8 @@ where
#[must_use]
pub fn watches_stream_with<Other, I>(
mut self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
dyntype: Other::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -1205,7 +1207,7 @@ where
applier(
move |obj, ctx| {
CancelableJoinHandle::spawn(
reconciler(obj, ctx).into_future().in_current_span(),
reconciler(obj.clone(), ctx).into_future().in_current_span(),
&Handle::current(),
)
},
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef};
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube_client::Resource;
use std::hash::Hash;
use std::{hash::Hash, sync::Arc};
pub use store::{store, Store};

/// Cache objects from a [`watcher()`] stream into a local [`Store`]
Expand Down Expand Up @@ -93,7 +93,7 @@ pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<I
where
K: Resource + Clone,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
W: Stream<Item = watcher::Result<watcher::Event<Arc<K>>>>,
{
stream.inspect_ok(move |event| writer.apply_watcher_event(event))
}
Expand Down
13 changes: 6 additions & 7 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,23 @@ where
}

/// Applies a single watcher event to the store
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
pub fn apply_watcher_event(&mut self, event: &watcher::Event<Arc<K>>) {
match event {
watcher::Event::Applied(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
let key = ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone());
self.store.write().insert(key, obj.clone());
clux marked this conversation as resolved.
Show resolved Hide resolved
}
watcher::Event::Deleted(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let key = ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| {
(
ObjectRef::from_obj_with(obj, self.dyntype.clone()),
Arc::new(obj.clone()),
ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone()),
obj.clone(),
)
})
.collect::<AHashMap<_, _>>();
Expand Down
9 changes: 5 additions & 4 deletions kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use core::{
};
use futures::{ready, Stream, TryStream};
use pin_project::pin_project;
use std::sync::Arc;

#[pin_project]
/// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method.
Expand All @@ -13,9 +14,9 @@ pub struct EventFlatten<St, K> {
#[pin]
stream: St,
emit_deleted: bool,
queue: std::vec::IntoIter<K>,
queue: std::vec::IntoIter<Arc<K>>,
}
impl<St: TryStream<Ok = Event<K>>, K> EventFlatten<St, K> {
impl<St: TryStream<Ok = Event<Arc<K>>>, K> EventFlatten<St, K> {
pub(super) fn new(stream: St, emit_deleted: bool) -> Self {
Self {
stream,
Expand All @@ -26,9 +27,9 @@ impl<St: TryStream<Ok = Event<K>>, K> EventFlatten<St, K> {
}
impl<St, K> Stream for EventFlatten<St, K>
where
St: Stream<Item = Result<Event<K>, Error>>,
St: Stream<Item = Result<Event<Arc<K>>, Error>>,
{
type Item = Result<K, Error>;
type Item = Result<Arc<K>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Expand Down
5 changes: 3 additions & 2 deletions kube-runtime/src/utils/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::{
pin::Pin,
task::{Context, Poll},
};
use std::sync::Arc;

use futures::{Stream, TryStream};
use pin_project::pin_project;
Expand Down Expand Up @@ -39,9 +40,9 @@ impl<St, K> Stream for Reflect<St, K>
where
K: Resource + Clone,
K::DynamicType: Eq + std::hash::Hash + Clone,
St: Stream<Item = Result<Event<K>, Error>>,
St: Stream<Item = Result<Event<Arc<K>>, Error>>,
{
type Item = Result<Event<K>, Error>;
type Item = Result<Event<Arc<K>>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Expand Down
6 changes: 4 additions & 2 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
Expand Down Expand Up @@ -40,7 +42,7 @@ pub trait WatchStreamExt: Stream {
/// All Added/Modified events are passed through, and critical errors bubble up.
fn applied_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
Self: Stream<Item = Result<watcher::Event<Arc<K>>, watcher::Error>> + Sized,
{
EventFlatten::new(self, false)
}
Expand All @@ -50,7 +52,7 @@ pub trait WatchStreamExt: Stream {
/// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
fn touched_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
Self: Stream<Item = Result<watcher::Event<Arc<K>>, watcher::Error>> + Sized,
{
EventFlatten::new(self, true)
}
Expand Down
10 changes: 7 additions & 3 deletions kube-runtime/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use futures::TryStreamExt;
use kube_client::{Api, Resource};
use serde::de::DeserializeOwned;
use std::fmt::Debug;
use std::{fmt::Debug, sync::Arc};
use thiserror::Error;

use crate::watcher::{self, watch_object};
Expand Down Expand Up @@ -47,13 +47,17 @@ pub enum Error {
/// # }
/// ```
#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
pub async fn await_condition<K>(
api: Api<K>,
name: &str,
cond: impl Condition<K>,
) -> Result<Option<Arc<K>>, Error>
where
K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
{
// Skip updates until the condition is satisfied.
let stream = watch_object(api, name).try_skip_while(|obj| {
let matches = cond.matches_object(obj.as_ref());
let matches = cond.matches_object(obj.as_ref().map(Arc::as_ref));
clux marked this conversation as resolved.
Show resolved Hide resolved
futures::future::ok(!matches)
});
futures::pin_mut!(stream);
Expand Down
Loading