Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up type bounds after Metapocalypse #469

Merged
merged 2 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,34 @@ where
S: TryStream<Ok = T>,
I: IntoIterator<Item = ObjectRef<K>>,
K: Meta,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok)))
.try_flatten()
}

/// Enqueues the object itself for reconciliation
pub fn trigger_self<S, F>(stream: S, family: F) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
pub fn trigger_self<K, S>(stream: S, family: K::Family) -> impl Stream<Item = Result<ObjectRef<K>, S::Error>>
where
S: TryStream,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
S: TryStream<Ok = K>,
K: Meta,
K::Family: Clone,
{
trigger_with(stream, move |obj| {
Some(ObjectRef::from_obj_with(&obj, family.clone()))
})
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S, F, FOwner>(
pub fn trigger_owners<KOwner, S>(
stream: S,
// family: F,
owner_family: FOwner,
owner_family: KOwner::Family,
) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
KOwner: Meta<Family = FOwner>,
FOwner: Debug + Eq + Hash + Clone,
S::Ok: Meta,
KOwner: Meta,
KOwner::Family: Clone,
{
trigger_with(stream, move |obj| {
let meta = obj.meta().clone();
Expand Down Expand Up @@ -158,7 +155,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, T>(
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Unpin,
K::Family: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream<Ok = ObjectRef<K>>,
Expand Down Expand Up @@ -296,7 +293,7 @@ where
pub struct Controller<K>
where
K: Clone + Meta + Debug + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash,
{
// NB: Need to Unpin for stream::select_all
// TODO: get an arbitrary std::error::Error in here?
Expand All @@ -308,7 +305,7 @@ where
impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Default,
K::Family: Eq + Hash + Clone + Default,
{
/// Create a Controller on a type `K`
///
Expand All @@ -323,7 +320,7 @@ where
impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
Expand Down Expand Up @@ -368,7 +365,7 @@ where
lp: ListParams,
) -> Self
where
<Child as Meta>::Family: Debug + Eq + Hash + Clone,
Child::Family: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)), self.family.clone());
self.selector.push(child_watcher.boxed());
Expand Down Expand Up @@ -407,8 +404,7 @@ where
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Eq + Hash + Clone + Unpin,
K::Family: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![allow(clippy::pub_enum_variant_names)]
// Triggered by many derive macros (kube-derive, derivative)
#![allow(clippy::default_trait_access)]
#![allow(clippy::type_repetition_in_bounds)]

pub mod controller;
pub mod reflector;
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use self::object_ref::ObjectRef;
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube::api::Meta;
use std::{fmt::Debug, hash::Hash};
use std::hash::Hash;
pub use store::Store;

/// Caches objects from `watcher::Event`s to a local `Store`
Expand All @@ -19,7 +19,7 @@ pub use store::Store;
pub fn reflector<K, W>(mut store: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Meta + Clone,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
stream.inspect_ok(move |event| store.apply_watcher_event(event))
Expand Down
30 changes: 12 additions & 18 deletions kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ use std::{
};

#[derive(Derivative)]
#[derivative(Debug, PartialEq, Eq, Hash, Clone)]
#[derivative(
Debug(bound = "K::Family: Debug"),
PartialEq(bound = "K::Family: PartialEq"),
Eq(bound = "K::Family: Eq"),
Hash(bound = "K::Family: Hash"),
Clone(bound = "K::Family: Clone")
)]
/// A typed and namedspaced (if relevant) reference to a Kubernetes object
///
/// `K` may be either the object type or `DynamicObject`, in which case the
Expand All @@ -22,10 +28,7 @@ use std::{
/// ObjectRef::<Secret>::new("a").erase(),
/// );
/// ```
pub struct ObjectRef<K: Meta>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
pub struct ObjectRef<K: Meta> {
family: K::Family,
/// The name of the object
pub name: String,
Expand All @@ -45,7 +48,7 @@ where

impl<K: Meta> ObjectRef<K>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone + Default,
K::Family: Default,
{
#[must_use]
pub fn new(name: &str) -> Self {
Expand All @@ -61,10 +64,7 @@ where
}
}

impl<K: Meta> ObjectRef<K>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
impl<K: Meta> ObjectRef<K> {
#[must_use]
pub fn new_with(name: &str, family: K::Family) -> Self {
Self {
Expand Down Expand Up @@ -117,10 +117,7 @@ where
/// Note that no checking is done on whether this conversion makes sense. For example, every `Service`
/// has a corresponding `Endpoints`, but it wouldn't make sense to convert a `Pod` into a `Deployment`.
#[must_use]
pub fn into_kind_unchecked<K2: Meta>(self, f2: K2::Family) -> ObjectRef<K2>
where
<K2 as Meta>::Family: Debug + Eq + Hash + Clone,
{
pub fn into_kind_unchecked<K2: Meta>(self, f2: K2::Family) -> ObjectRef<K2> {
ObjectRef {
family: f2,
name: self.name,
Expand All @@ -141,10 +138,7 @@ where
}
}

impl<K: Meta> Display for ObjectRef<K>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
impl<K: Meta> Display for ObjectRef<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand Down
19 changes: 11 additions & 8 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use std::{collections::HashMap, fmt::Debug, hash::Hash, sync::Arc};
/// This is exclusive since it's not safe to share a single `Store` between multiple reflectors.
/// In particular, `Restarted` events will clobber the state of other connected reflectors.
#[derive(Debug, Derivative)]
#[derivative(Default(bound = "<K as Meta>::Family: Default"))]
#[derivative(Default(bound = "K::Family: Default"))]
pub struct Writer<K: 'static + Meta>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash,
{
store: Arc<DashMap<ObjectRef<K>, K>>,
family: K::Family,
}

impl<K: 'static + Meta + Clone> Writer<K>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash,
{
/// Creates a new Writer with the specified family.
///
Expand All @@ -46,7 +46,10 @@ where
}

/// Applies a single watcher event to the store
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>)
where
K::Family: Clone,
{
match event {
watcher::Event::Applied(obj) => {
self.store
Expand Down Expand Up @@ -77,18 +80,18 @@ where
///
/// Cannot be constructed directly since one writer handle is required,
/// use `Writer::as_reader()` instead.
#[derive(Debug, Derivative)]
#[derivative(Clone)]
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::Family: Debug"), Clone)]
pub struct Store<K: 'static + Meta>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Hash + Eq,
{
store: Arc<DashMap<ObjectRef<K>, K>>,
}

impl<K: 'static + Clone + Meta> Store<K>
where
<K as Meta>::Family: Debug + Eq + Hash + Clone,
K::Family: Eq + Hash + Clone,
{
/// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache.
///
Expand Down