Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple Meta from k8s-openapi #385

Merged
merged 2 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ path = "node_reflector.rs"
name = "node_watcher"
path = "node_watcher.rs"

[[example]]
name = "dynamic_watcher"
path = "dynamic_watcher.rs"

[[example]]
name = "secret_reflector"
path = "secret_reflector.rs"
Expand Down
6 changes: 3 additions & 3 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ struct ConfigMapGeneratorSpec {
content: String,
}

fn object_to_owner_reference<K: Meta>(meta: ObjectMeta) -> Result<OwnerReference, Error> {
fn object_to_owner_reference<K: Meta<Family = ()>>(meta: ObjectMeta) -> Result<OwnerReference, Error> {
Ok(OwnerReference {
api_version: K::API_VERSION.to_string(),
kind: K::KIND.to_string(),
api_version: K::api_version(&()).to_string(),
kind: K::kind(&()).to_string(),
name: meta.name.context(MissingObjectKey {
name: ".metadata.name",
})?,
Expand Down
39 changes: 39 additions & 0 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// This example shows how to use kube with dynamically known resource kinds.

use color_eyre::Result;
use futures::prelude::*;
use kube::{
api::{DynamicObject, GroupVersionKind, ListParams, Meta},
Api, Client,
};
use kube_runtime::{utils::try_flatten_applied, watcher};

#[tokio::main]
async fn main() -> Result<()> {
std::env::set_var("RUST_LOG", "info,kube=debug");
env_logger::init();

// alternatively, you can:
// 1) take them CLI arguments
// 2) use dynamic discovery apis on kube::Client and e.g. watch all
// resources in cluster.
let group = std::env::var("GROUP").expect("GROUP not set");
let group = group.trim();
let version = std::env::var("VERSION").expect("VERSION not set");
let version = version.trim();
let kind = std::env::var("KIND").expect("KIND not set");
let kind = kind.trim();

let gvk = GroupVersionKind::from_dynamic_gvk(group, version, kind);

let client = Client::try_default().await?;
let api = Api::<DynamicObject>::all_with(client, &gvk);
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)
.try_for_each(|p| async move {
log::info!("Applied: {}", Meta::name(&p));
Ok(())
})
.await?;
Ok(())
}
87 changes: 68 additions & 19 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
reflector::{
reflector,
store::{Store, Writer},
ErasedResource, ObjectRef,
ObjectRef,
},
scheduler::{self, scheduler, ScheduleRequest},
utils::{try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle},
Expand All @@ -17,10 +17,10 @@ use futures::{
stream::{self, SelectAll},
FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube::api::{Api, ListParams, Meta};
use kube::api::{Api, DynamicObject, ListParams, Meta};
use serde::de::DeserializeOwned;
use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, ResultExt, Snafu};
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration};
use stream::BoxStream;
use tokio::{runtime::Handle, time::Instant};

Expand All @@ -30,7 +30,7 @@ mod runner;
#[derive(Snafu, Debug)]
pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error::Error + 'static> {
ObjectNotFound {
obj_ref: ObjectRef<ErasedResource>,
obj_ref: ObjectRef<DynamicObject>,
backtrace: Backtrace,
},
ReconcilerFailed {
Expand Down Expand Up @@ -66,35 +66,46 @@ where
S: TryStream<Ok = T>,
I: IntoIterator<Item = ObjectRef<K>>,
K: Meta,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
stream
.map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok)))
.try_flatten()
}

/// Enqueues the object itself for reconciliation
pub fn trigger_self<S>(stream: S) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
pub fn trigger_self<S, F>(stream: S, family: F) -> impl Stream<Item = Result<ObjectRef<S::Ok>, S::Error>>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slight breakage in a low-level API. Do you think it's OK not to provide separate trigger_self and trigger_self_with functions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, yes. I think this is ok.

where
S: TryStream,
S::Ok: Meta,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
{
trigger_with(stream, |obj| Some(ObjectRef::from_obj(&obj)))
trigger_with(stream, move |obj| {
Some(ObjectRef::from_obj_with(&obj, family.clone()))
})
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(stream: S) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
pub fn trigger_owners<KOwner, S, F, FOwner>(
stream: S,
// family: F,
owner_family: FOwner,
) -> impl Stream<Item = Result<ObjectRef<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Meta,
KOwner: Meta,
S::Ok: Meta<Family = F>,
F: Debug + Eq + Hash + Clone,
KOwner: Meta<Family = FOwner>,
FOwner: Debug + Eq + Hash + Clone,
{
trigger_with(stream, |obj| {
trigger_with(stream, move |obj| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_family = owner_family.clone();
meta.owner_references
.into_iter()
.flatten()
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner))
.flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_family.clone()))
})
}

Expand Down Expand Up @@ -147,6 +158,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, T>(
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream<Ok = ObjectRef<K>>,
Expand Down Expand Up @@ -177,7 +189,13 @@ where
// to them separately
.map(|res| Ok((obj_ref, res)))
.left_future(),
None => future::err(ObjectNotFound { obj_ref }.build()).right_future(),
None => future::err(
ObjectNotFound {
obj_ref: obj_ref.erase(),
}
.build(),
)
.right_future(),
}
})
.context(SchedulerDequeueFailed)
Expand Down Expand Up @@ -278,30 +296,57 @@ where
pub struct Controller<K>
where
K: Clone + Meta + Debug + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
{
// NB: Need to Unpin for stream::select_all
// TODO: get an arbitrary std::error::Error in here?
selector: SelectAll<BoxStream<'static, Result<ObjectRef<K>, watcher::Error>>>,
family: K::Family,
reader: Store<K>,
}

impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone + Default,
{
/// Create a Controller on a type `K`
///
/// Configure `ListParams` and `Api` so you only get reconcile events
/// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
let writer = Writer::<K>::default();
Self::new_with(owned_api, lp, Default::default())
}
}

impl<K> Controller<K>
where
K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
clux marked this conversation as resolved.
Show resolved Hide resolved
{
/// Create a Controller on a type `K`
///
/// Configure `ListParams` and `Api` so you only get reconcile events
/// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset
///
/// Unlike `new`, this function accepts `K::Family` so it can be used with dynamic
/// resources.
pub fn new_with(owned_api: Api<K>, lp: ListParams, family: K::Family) -> Self {
let writer = Writer::<K>::new(family.clone());
let reader = writer.as_reader();
let mut selector = stream::SelectAll::new();
let self_watcher =
trigger_self(try_flatten_applied(reflector(writer, watcher(owned_api, lp)))).boxed();
let self_watcher = trigger_self(
try_flatten_applied(reflector(writer, watcher(owned_api, lp))),
family.clone(),
)
.boxed();
selector.push(self_watcher);
Self { selector, reader }
Self {
selector,
reader,
family,
}
}

/// Retrieve a copy of the reader before starting the controller
Expand All @@ -321,8 +366,11 @@ where
mut self,
api: Api<Child>,
lp: ListParams,
) -> Self {
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)));
) -> Self
where
<Child as Meta>::Family: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)), self.family.clone());
self.selector.push(child_watcher.boxed());
self
}
Expand Down Expand Up @@ -360,6 +408,7 @@ where
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
where
K: Clone + Meta + 'static,
<K as Meta>::Family: Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
Expand Down
4 changes: 3 additions & 1 deletion kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
mod object_ref;
pub mod store;

pub use self::object_ref::{ErasedResource, ObjectRef, RuntimeResource};
pub use self::object_ref::ObjectRef;
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube::api::Meta;
use std::{fmt::Debug, hash::Hash};
pub use store::Store;

/// Caches objects from `watcher::Event`s to a local `Store`
Expand All @@ -22,6 +23,7 @@ pub use store::Store;
pub fn reflector<K, W>(mut store: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Meta + Clone,
<K as Meta>::Family: Debug + Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
stream.inspect_ok(move |event| store.apply_watcher_event(event))
Expand Down
Loading