Skip to content

Commit

Permalink
Decouple Meta from k8s-openapi
Browse files Browse the repository at this point in the history
  • Loading branch information
MikailBag committed Mar 15, 2021
1 parent 03f64fe commit 6b95f20
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 182 deletions.
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ path = "node_reflector.rs"
name = "node_watcher"
path = "node_watcher.rs"

[[example]]
name = "generic_watcher"
path = "generic_watcher.rs"

[[example]]
name = "secret_reflector"
path = "secret_reflector.rs"
Expand Down
6 changes: 3 additions & 3 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ struct ConfigMapGeneratorSpec {
content: String,
}

fn object_to_owner_reference<K: Meta>(meta: ObjectMeta) -> Result<OwnerReference, Error> {
fn object_to_owner_reference<K: Meta<Family = ()>>(meta: ObjectMeta) -> Result<OwnerReference, Error> {
Ok(OwnerReference {
api_version: K::API_VERSION.to_string(),
kind: K::KIND.to_string(),
api_version: K::api_version(&()).to_string(),
kind: K::kind(&()).to_string(),
name: meta.name.context(MissingObjectKey {
name: ".metadata.name",
})?,
Expand Down
39 changes: 39 additions & 0 deletions examples/generic_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This example shows how to use kube with dynamically known resource kinds.

use color_eyre::Result;
use futures::prelude::*;
use kube::{
api::{DynamicObject, GroupVersionKind, ListParams, Meta},
Api, Client,
};
use kube_runtime::{utils::try_flatten_applied, watcher};

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();

// alternatively, you can:
// 1) take them CLI arguments
// 2) use dynamic discovery apis on kube::Client and e.g. watch all
// resources in cluster.
let group = std::env::var("GROUP").expect("GROUP not set");
let group = group.trim();
let version = std::env::var("VERSION").expect("VERSION not set");
let version = version.trim();
let kind = std::env::var("KIND").expect("KIND not set");
let kind = kind.trim();

let gvk = GroupVersionKind::from_dynamic_gvk(group, version, kind);

let client = Client::try_default().await?;
let api = Api::<DynamicObject>::all_with(client, &gvk);
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)
.try_for_each(|p| async move {
log::info!("Applied: {}", Meta::name(&p));
Ok(())
})
.await?;
Ok(())
}
87 changes: 68 additions & 19 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
reflector::{
reflector,
store::{Store, Writer},
ErasedResource, ObjectRef,
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle},
Expand All @@ -17,10 +17,10 @@ use futures::{
stream::{self, SelectAll},
FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube::api::{Api, ListParams, Meta};
use kube::api::{Api, DynamicObject, ListParams, Meta};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, ResultExt, Snafu};
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration};
use stream::BoxStream;
use tokio::{runtime::Handle, time::Instant};

Expand All @@ -30,7 +30,7 @@ mod runner;
#[derive(Snafu, Debug)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
ObjectNotFound {
obj_ref: ObjectRef<ErasedResource>,
obj_ref: ObjectRef<DynamicObject>,
backtrace: Backtrace,
},
ReconcilerFailed {
Expand Down Expand Up @@ -66,35 +66,46 @@ where
S: TryStream<Ok = T>,
I: IntoIterator<Item = ObjectRef<K>>,
K: Meta,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok)))
.try_flatten()
}

/// Enqueues the object itself for reconciliation
pub fn trigger_self<S>(stream: S) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
pub fn trigger_self<S, F>(stream: S, family: F) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
{
trigger_with(stream, |obj| Some(ObjectRef::from_obj(&obj)))
trigger_with(stream, move |obj| {
Some(ObjectRef::from_obj_with(&obj, family.clone()))
})
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(stream: S) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
pub fn trigger_owners<KOwner, S, F, FOwner>(
stream: S,
// family: F,
owner_family: FOwner,
) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
KOwner: Meta,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
KOwner: Meta<Family = FOwner>,
FOwner: Debug + Eq + Hash + Clone,
{
trigger_with(stream, |obj| {
trigger_with(stream, move |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_family = owner_family.clone();
meta.owner_references
.into_iter()
.flatten()
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner))
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_family.clone()))
})
}

Expand Down Expand Up @@ -147,6 +158,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, T>(
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream<Ok = ObjectRef<K>>,
Expand Down Expand Up @@ -177,7 +189,13 @@ where
// to them separately
.map(|res| Ok((obj_ref, res)))
.left_future(),
None => future::err(ObjectNotFound { obj_ref }.build()).right_future(),
None => future::err(
ObjectNotFound {
obj_ref: obj_ref.erase(),
}
.build(),
)
.right_future(),
}
})
.context(SchedulerDequeueFailed)
Expand Down Expand Up @@ -278,30 +296,57 @@ where
pub struct Controller<K>
where
K: Clone + Meta + Debug + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
// NB: Need to Unpin for stream::select_all
// TODO: get an arbitrary std::error::Error in here?
selector: SelectAll<BoxStream<'static, Result<ObjectRef<K>, watcher::Error>>>,
family: K::Family,
reader: Store<K>,
}

impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Default,
{
/// Create a Controller on a type `K`
///
/// Configure `ListParams` and `Api` so you only get reconcile events
/// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
let writer = Writer::<K>::default();
Self::new_with(owned_api, lp, Default::default())
}
}

impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
/// Configure `ListParams` and `Api` so you only get reconcile events
/// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset
///
/// Unlike `new`, this function accepts `K::Family` so it can be used with dynamic
/// resources.
pub fn new_with(owned_api: Api<K>, lp: ListParams, family: K::Family) -> Self {
let writer = Writer::<K>::new(family.clone());
let reader = writer.as_reader();
let mut selector = stream::SelectAll::new();
let self_watcher =
trigger_self(try_flatten_applied(reflector(writer, watcher(owned_api, lp)))).boxed();
let self_watcher = trigger_self(
try_flatten_applied(reflector(writer, watcher(owned_api, lp))),
family.clone(),
)
.boxed();
selector.push(self_watcher);
Self { selector, reader }
Self {
selector,
reader,
family,
}
}

/// Retrieve a copy of the reader before starting the controller
Expand All @@ -321,8 +366,11 @@ where
mut self,
api: Api<Child>,
lp: ListParams,
) -> Self {
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)));
) -> Self
where
<Child as Meta>::Family: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)), self.family.clone());
self.selector.push(child_watcher.boxed());
self
}
Expand Down Expand Up @@ -360,6 +408,7 @@ where
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
Expand Down
4 changes: 3 additions & 1 deletion kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
mod object_ref;
pub mod store;

pub use self::object_ref::{ErasedResource, ObjectRef, RuntimeResource};
pub use self::object_ref::ObjectRef;
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube::api::Meta;
use std::{fmt::Debug, hash::Hash};
pub use store::Store;

/// Caches objects from `watcher::Event`s to a local `Store`
Expand All @@ -22,6 +23,7 @@ pub use store::Store;
pub fn reflector<K, W>(mut store: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Meta + Clone,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
stream.inspect_ok(move |event| store.apply_watcher_event(event))
Expand Down
Loading

0 comments on commit 6b95f20

Please sign in to comment.