diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b49535e95..22579b28b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -138,6 +138,10 @@ path = "node_reflector.rs" name = "node_watcher" path = "node_watcher.rs" +[[example]] +name = "dynamic_watcher" +path = "dynamic_watcher.rs" + [[example]] name = "secret_reflector" path = "secret_reflector.rs" diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index a41061d55..2626a2b0e 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -36,10 +36,10 @@ struct ConfigMapGeneratorSpec { content: String, } -fn object_to_owner_reference(meta: ObjectMeta) -> Result { +fn object_to_owner_reference>(meta: ObjectMeta) -> Result { Ok(OwnerReference { - api_version: K::API_VERSION.to_string(), - kind: K::KIND.to_string(), + api_version: K::api_version(&()).to_string(), + kind: K::kind(&()).to_string(), name: meta.name.context(MissingObjectKey { name: ".metadata.name", })?, diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs new file mode 100644 index 000000000..18ca661c9 --- /dev/null +++ b/examples/dynamic_watcher.rs @@ -0,0 +1,39 @@ +// This example shows how to use kube with dynamically known resource kinds. + +use color_eyre::Result; +use futures::prelude::*; +use kube::{ + api::{DynamicObject, GroupVersionKind, ListParams, Meta}, + Api, Client, +}; +use kube_runtime::{utils::try_flatten_applied, watcher}; + +#[tokio::main] +async fn main() -> Result<()> { + std::env::set_var("RUST_LOG", "info,kube=debug"); + env_logger::init(); + + // alternatively, you can: + // 1) take them CLI arguments + // 2) use dynamic discovery apis on kube::Client and e.g. watch all + // resources in cluster. + let group = std::env::var("GROUP").expect("GROUP not set"); + let group = group.trim(); + let version = std::env::var("VERSION").expect("VERSION not set"); + let version = version.trim(); + let kind = std::env::var("KIND").expect("KIND not set"); + let kind = kind.trim(); + + let gvk = GroupVersionKind::from_dynamic_gvk(group, version, kind); + + let client = Client::try_default().await?; + let api = Api::::all_with(client, &gvk); + let watcher = watcher(api, ListParams::default()); + try_flatten_applied(watcher) + .try_for_each(|p| async move { + log::info!("Applied: {}", Meta::name(&p)); + Ok(()) + }) + .await?; + Ok(()) +} diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 49a42f07e..e2f83f5fc 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -5,7 +5,7 @@ use crate::{ reflector::{ reflector, store::{Store, Writer}, - ErasedResource, ObjectRef, + ObjectRef, }, scheduler::{self, scheduler, ScheduleRequest}, utils::{try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle}, @@ -17,10 +17,10 @@ use futures::{ stream::{self, SelectAll}, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, }; -use kube::api::{Api, ListParams, Meta}; +use kube::api::{Api, DynamicObject, ListParams, Meta}; use serde::de::DeserializeOwned; use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, ResultExt, Snafu}; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration}; use stream::BoxStream; use tokio::{runtime::Handle, time::Instant}; @@ -30,7 +30,7 @@ mod runner; #[derive(Snafu, Debug)] pub enum Error { ObjectNotFound { - obj_ref: ObjectRef, + obj_ref: ObjectRef, backtrace: Backtrace, }, ReconcilerFailed { @@ -66,6 +66,7 @@ where S: TryStream, I: IntoIterator>, K: Meta, + ::Family: Debug + Eq + Hash + Clone, { stream .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Ok))) @@ -73,28 +74,38 @@ where } /// Enqueues the object itself for reconciliation -pub fn trigger_self(stream: S) -> impl Stream, S::Error>> +pub fn trigger_self(stream: S, family: F) -> impl Stream, S::Error>> where S: TryStream, - S::Ok: Meta, + S::Ok: Meta, + F: Debug + Eq + Hash + Clone, { - trigger_with(stream, |obj| Some(ObjectRef::from_obj(&obj))) + trigger_with(stream, move |obj| { + Some(ObjectRef::from_obj_with(&obj, family.clone())) + }) } /// Enqueues any owners of type `KOwner` for reconciliation -pub fn trigger_owners(stream: S) -> impl Stream, S::Error>> +pub fn trigger_owners( + stream: S, + // family: F, + owner_family: FOwner, +) -> impl Stream, S::Error>> where S: TryStream, - S::Ok: Meta, - KOwner: Meta, + S::Ok: Meta, + F: Debug + Eq + Hash + Clone, + KOwner: Meta, + FOwner: Debug + Eq + Hash + Clone, { - trigger_with(stream, |obj| { + trigger_with(stream, move |obj| { let meta = obj.meta().clone(); let ns = meta.namespace; + let owner_family = owner_family.clone(); meta.owner_references .into_iter() .flatten() - .flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner)) + .flat_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_family.clone())) }) } @@ -147,6 +158,7 @@ pub fn applier( ) -> impl Stream, ReconcilerAction), Error>> where K: Clone + Meta + 'static, + ::Family: Debug + Eq + Hash + Clone + Unpin, ReconcilerFut: TryFuture + Unpin, ReconcilerFut::Error: std::error::Error + 'static, QueueStream: TryStream>, @@ -177,7 +189,13 @@ where // to them separately .map(|res| Ok((obj_ref, res))) .left_future(), - None => future::err(ObjectNotFound { obj_ref }.build()).right_future(), + None => future::err( + ObjectNotFound { + obj_ref: obj_ref.erase(), + } + .build(), + ) + .right_future(), } }) .context(SchedulerDequeueFailed) @@ -278,16 +296,19 @@ where pub struct Controller where K: Clone + Meta + Debug + 'static, + ::Family: Debug + Eq + Hash + Clone, { // NB: Need to Unpin for stream::select_all // TODO: get an arbitrary std::error::Error in here? selector: SelectAll, watcher::Error>>>, + family: K::Family, reader: Store, } impl Controller where K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static, + ::Family: Debug + Eq + Hash + Clone + Default, { /// Create a Controller on a type `K` /// @@ -295,13 +316,37 @@ where /// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset #[must_use] pub fn new(owned_api: Api, lp: ListParams) -> Self { - let writer = Writer::::default(); + Self::new_with(owned_api, lp, Default::default()) + } +} + +impl Controller +where + K: Clone + Meta + DeserializeOwned + Debug + Send + Sync + 'static, + ::Family: Debug + Eq + Hash + Clone, +{ + /// Create a Controller on a type `K` + /// + /// Configure `ListParams` and `Api` so you only get reconcile events + /// for the correct `Api` scope (cluster/all/namespaced), or `ListParams` subset + /// + /// Unlike `new`, this function accepts `K::Family` so it can be used with dynamic + /// resources. + pub fn new_with(owned_api: Api, lp: ListParams, family: K::Family) -> Self { + let writer = Writer::::new(family.clone()); let reader = writer.as_reader(); let mut selector = stream::SelectAll::new(); - let self_watcher = - trigger_self(try_flatten_applied(reflector(writer, watcher(owned_api, lp)))).boxed(); + let self_watcher = trigger_self( + try_flatten_applied(reflector(writer, watcher(owned_api, lp))), + family.clone(), + ) + .boxed(); selector.push(self_watcher); - Self { selector, reader } + Self { + selector, + reader, + family, + } } /// Retrieve a copy of the reader before starting the controller @@ -321,8 +366,11 @@ where mut self, api: Api, lp: ListParams, - ) -> Self { - let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp))); + ) -> Self + where + ::Family: Debug + Eq + Hash + Clone, + { + let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)), self.family.clone()); self.selector.push(child_watcher.boxed()); self } @@ -360,6 +408,7 @@ where ) -> impl Stream, ReconcilerAction), Error>> where K: Clone + Meta + 'static, + ::Family: Eq + Hash + Clone + Unpin, ReconcilerFut: TryFuture + Send + 'static, ReconcilerFut::Error: std::error::Error + Send + 'static, { diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index e433ad4db..34e01a225 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -3,10 +3,11 @@ mod object_ref; pub mod store; -pub use self::object_ref::{ErasedResource, ObjectRef, RuntimeResource}; +pub use self::object_ref::ObjectRef; use crate::watcher; use futures::{Stream, TryStreamExt}; use kube::api::Meta; +use std::{fmt::Debug, hash::Hash}; pub use store::Store; /// Caches objects from `watcher::Event`s to a local `Store` @@ -22,6 +23,7 @@ pub use store::Store; pub fn reflector(mut store: store::Writer, stream: W) -> impl Stream where K: Meta + Clone, + ::Family: Debug + Eq + Hash + Clone, W: Stream>>, { stream.inspect_ok(move |event| store.apply_watcher_event(event)) diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index 6a88b0296..5e2a3fe33 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -1,6 +1,6 @@ use derivative::Derivative; -use k8s_openapi::{apimachinery::pkg::apis::meta::v1::OwnerReference, Resource}; -use kube::api::Meta; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +use kube::api::{DynamicObject, Meta}; use std::{ fmt::{Debug, Display}, hash::Hash, @@ -10,20 +10,23 @@ use std::{ #[derivative(Debug, PartialEq, Eq, Hash, Clone)] /// A typed and namedspaced (if relevant) reference to a Kubernetes object /// -/// `K` may be either the object type or `ErasedResource`, in which case the +/// `K` may be either the object type or `DynamicObject`, in which case the /// type is stored at runtime. Erased `ObjectRef`s pointing to different types /// are still considered different. /// /// ``` -/// use kube_runtime::reflector::{ErasedResource, ObjectRef}; +/// use kube_runtime::reflector::ObjectRef; /// use k8s_openapi::api::core::v1::{ConfigMap, Secret}; /// assert_ne!( -/// ObjectRef::::from(ObjectRef::::new("a")), -/// ObjectRef::::from(ObjectRef::::new("a")), +/// ObjectRef::::new("a").erase(), +/// ObjectRef::::new("a").erase(), /// ); /// ``` -pub struct ObjectRef { - kind: K::State, +pub struct ObjectRef +where + ::Family: Debug + Eq + Hash + Clone, +{ + family: K::Family, /// The name of the object pub name: String, /// The namespace of the object @@ -40,11 +43,32 @@ pub struct ObjectRef { pub namespace: Option, } -impl ObjectRef { +impl ObjectRef +where + ::Family: Debug + Eq + Hash + Clone + Default, +{ #[must_use] pub fn new(name: &str) -> Self { + Self::new_with(name, Default::default()) + } + + #[must_use] + pub fn from_obj(obj: &K) -> Self + where + K: Meta, + { + Self::from_obj_with(obj, Default::default()) + } +} + +impl ObjectRef +where + ::Family: Debug + Eq + Hash + Clone, +{ + #[must_use] + pub fn new_with(name: &str, family: K::Family) -> Self { Self { - kind: (), + family, name: name.into(), namespace: None, } @@ -57,12 +81,12 @@ impl ObjectRef { } #[must_use] - pub fn from_obj(obj: &K) -> Self + pub fn from_obj_with(obj: &K, f: K::Family) -> Self where K: Meta, { Self { - kind: (), + family: f, name: obj.name(), namespace: obj.namespace(), } @@ -72,10 +96,14 @@ impl ObjectRef { /// Create an `ObjectRef` from an `OwnerReference` /// /// Returns `None` if the types do not match. - pub fn from_owner_ref(namespace: Option<&str>, owner: &OwnerReference) -> Option { - if owner.api_version == K::API_VERSION && owner.kind == K::KIND { + pub fn from_owner_ref( + namespace: Option<&str>, + owner: &OwnerReference, + family: K::Family, + ) -> Option { + if owner.api_version == K::api_version(&family) && owner.kind == K::kind(&family) { Some(Self { - kind: (), + family, name: owner.name.clone(), namespace: namespace.map(String::from), }) @@ -89,23 +117,41 @@ impl ObjectRef { /// Note that no checking is done on whether this conversion makes sense. For example, every `Service` /// has a corresponding `Endpoints`, but it wouldn't make sense to convert a `Pod` into a `Deployment`. #[must_use] - pub fn into_kind_unchecked(self) -> ObjectRef { + pub fn into_kind_unchecked(self, f2: K2::Family) -> ObjectRef + where + ::Family: Debug + Eq + Hash + Clone, + { ObjectRef { - kind: (), + family: f2, + name: self.name, + namespace: self.namespace, + } + } + + pub fn erase(self) -> ObjectRef { + ObjectRef { + family: kube::api::GroupVersionKind::from_dynamic_gvk( + K::group(&self.family).as_ref(), + K::version(&self.family).as_ref(), + K::kind(&self.family).as_ref(), + ), name: self.name, namespace: self.namespace, } } } -impl Display for ObjectRef { +impl Display for ObjectRef +where + ::Family: Debug + Eq + Hash + Clone, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "{}.{}.{}/{}", - K::kind(&self.kind), - K::version(&self.kind), - K::group(&self.kind), + K::kind(&self.family), + K::version(&self.family), + K::group(&self.family), self.name )?; if let Some(namespace) = &self.namespace { @@ -115,82 +161,9 @@ impl Display for ObjectRef { } } -/// A Kubernetes type that is known at runtime -pub trait RuntimeResource { - type State: Debug + PartialEq + Eq + Hash + Clone; - fn group(state: &Self::State) -> &str; - fn version(state: &Self::State) -> &str; - fn kind(state: &Self::State) -> &str; -} - -/// All `Resource`s are also known at runtime -impl RuntimeResource for K { - /// All required state is provided at build time - type State = (); - - fn group(_state: &Self::State) -> &str { - K::GROUP - } - - fn version(_state: &Self::State) -> &str { - K::VERSION - } - - fn kind(_state: &Self::State) -> &str { - K::KIND - } -} - -/// Marker for indicating that the `ObjectRef`'s type is only known at runtime -// ! is still unstable: https://github.com/rust-lang/rust/issues/35121 -#[allow(clippy::empty_enum)] -pub enum ErasedResource {} - -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub struct ErasedResourceState { - group: &'static str, - version: &'static str, - kind: &'static str, -} -impl RuntimeResource for ErasedResource { - type State = ErasedResourceState; - - fn group(state: &Self::State) -> &str { - &state.group - } - - fn version(state: &Self::State) -> &str { - &state.version - } - - fn kind(state: &Self::State) -> &str { - &state.kind - } -} - -impl ErasedResource { - fn erase() -> ErasedResourceState { - ErasedResourceState { - group: K::GROUP, - version: K::VERSION, - kind: K::KIND, - } - } -} - -impl From> for ObjectRef { - fn from(old: ObjectRef) -> Self { - ObjectRef { - kind: ErasedResource::erase::(), - name: old.name, - namespace: old.namespace, - } - } -} - #[cfg(test)] mod tests { - use super::{ErasedResource, ObjectRef}; + use super::ObjectRef; use k8s_openapi::api::{ apps::v1::Deployment, core::v1::{Node, Pod}, @@ -218,19 +191,10 @@ mod tests { #[test] fn display_should_be_transparent_to_representation() { let pod_ref = ObjectRef::::new("my-pod").within("my-namespace"); - assert_eq!( - format!("{}", pod_ref), - format!("{}", ObjectRef::::from(pod_ref)) - ); + assert_eq!(format!("{}", pod_ref), format!("{}", pod_ref.erase())); let deploy_ref = ObjectRef::::new("my-deploy").within("my-namespace"); - assert_eq!( - format!("{}", deploy_ref), - format!("{}", ObjectRef::::from(deploy_ref)) - ); + assert_eq!(format!("{}", deploy_ref), format!("{}", deploy_ref.erase())); let node_ref = ObjectRef::::new("my-node"); - assert_eq!( - format!("{}", node_ref), - format!("{}", ObjectRef::::from(node_ref)) - ); + assert_eq!(format!("{}", node_ref), format!("{}", node_ref.erase())); } } diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 77e9a11f2..f1f2afd36 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -2,21 +2,38 @@ use super::ObjectRef; use crate::watcher; use dashmap::DashMap; use derivative::Derivative; -use k8s_openapi::Resource; use kube::api::Meta; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, hash::Hash, sync::Arc}; /// A writable Store handle /// /// This is exclusive since it's not safe to share a single `Store` between multiple reflectors. /// In particular, `Restarted` events will clobber the state of other connected reflectors. #[derive(Debug, Derivative)] -#[derivative(Default(bound = ""))] -pub struct Writer { +#[derivative(Default(bound = "::Family: Default"))] +pub struct Writer +where + ::Family: Debug + Eq + Hash + Clone, +{ store: Arc, K>>, + family: K::Family, } -impl Writer { +impl Writer +where + ::Family: Debug + Eq + Hash + Clone, +{ + /// Creates a new Writer with the specified family. + /// + /// If family is default-able (for example when writer is used with + /// `k8s_openapi` types) you can use `Default` instead. + pub fn new(family: K::Family) -> Self { + Writer { + store: Default::default(), + family, + } + } + /// Return a read handle to the store /// /// Multiple read handles may be obtained, by either calling `as_reader` multiple times, @@ -32,15 +49,17 @@ impl Writer { pub fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { watcher::Event::Applied(obj) => { - self.store.insert(ObjectRef::from_obj(&obj), obj.clone()); + self.store + .insert(ObjectRef::from_obj_with(&obj, self.family.clone()), obj.clone()); } watcher::Event::Deleted(obj) => { - self.store.remove(&ObjectRef::from_obj(&obj)); + self.store + .remove(&ObjectRef::from_obj_with(&obj, self.family.clone())); } watcher::Event::Restarted(new_objs) => { let new_objs = new_objs .iter() - .map(|obj| (ObjectRef::from_obj(obj), obj)) + .map(|obj| (ObjectRef::from_obj_with(obj, self.family.clone()), obj)) .collect::>(); // We can't do do the whole replacement atomically, but we should at least not delete objects that still exist self.store.retain(|key, _old_value| new_objs.contains_key(key)); @@ -60,11 +79,17 @@ impl Writer { /// use `Writer::as_reader()` instead. #[derive(Debug, Derivative)] #[derivative(Clone)] -pub struct Store { +pub struct Store +where + ::Family: Debug + Eq + Hash + Clone, +{ store: Arc, K>>, } -impl Store { +impl Store +where + ::Family: Debug + Eq + Hash + Clone, +{ /// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache. /// /// `key.namespace` is ignored for cluster-scoped resources. diff --git a/kube/src/api/dynamic.rs b/kube/src/api/dynamic.rs index 4fd536c12..e5f214dd1 100644 --- a/kube/src/api/dynamic.rs +++ b/kube/src/api/dynamic.rs @@ -1,9 +1,9 @@ use crate::{ - api::{typed::Api, Resource}, + api::{typed::Api, Meta, Resource}, Client, Error, Result, }; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::APIResource; -use std::convert::TryFrom; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIResource, ObjectMeta}; +use std::{borrow::Cow, convert::TryFrom, sync::Arc}; use inflector::{cases::pascalcase::is_pascal_case, string::pluralize::to_plural}; @@ -122,7 +122,7 @@ impl DynamicResource { /// /// Note this crashes on invalid group/version/kinds. /// Use [`try_into_api`](Self::try_into_api) to handle the errors. - pub fn into_api(self, client: Client) -> Api { + pub fn into_api(self, client: Client) -> Api { let resource = Resource::try_from(self).unwrap(); Api { client, @@ -139,7 +139,7 @@ impl DynamicResource { } /// Consume the `DynamicResource` and and attempt to convert to an `Api` object. - pub fn try_into_api(self, client: Client) -> Result> { + pub fn try_into_api(self, client: Client) -> Result> { let resource = Resource::try_from(self)?; Ok(Api { client, @@ -188,6 +188,115 @@ impl TryFrom for Resource { } } +/// Cheaply `Clone`-able string. +// `String`: cloning requires additional allocation. +// `StringRef::Dynamic`: cloning is one atomic operation. +// `StringRef::Static`: cloning is essentially free. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum StringRef { + /// Should be used when string is known during compilation. + Static(&'static str), + /// Should be used when string is only known at runtime + Dynamic(Arc), +} + +impl StringRef { + fn as_str(&self) -> &str { + match self { + StringRef::Static(s) => *s, + StringRef::Dynamic(s) => &*s, + } + } +} + +/// Represents a type-erased object kind +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GroupVersionKind { + /// API group + group: StringRef, + /// Version + version: StringRef, + /// Kind + kind: StringRef, + /// Concatenation of group and version + api_version: StringRef, +} + +impl GroupVersionKind { + /// Creates `GroupVersionKind` from group, version and kind. + /// For `core` resources, group should be empty. + /// `api_version` will be created based on group and version + pub fn from_dynamic_gvk(group: &str, version: &str, kind: &str) -> Self { + let api_version = if group.is_empty() { + version.to_string() + } else { + format!("{}/{}", group, version) + }; + GroupVersionKind { + group: StringRef::Dynamic(group.into()), + version: StringRef::Dynamic(version.into()), + kind: StringRef::Dynamic(kind.into()), + api_version: StringRef::Dynamic(api_version.into()), + } + } + + /// Create `GroupVersionKind` for statically known resource. + pub fn for_resource() -> Self { + GroupVersionKind { + group: StringRef::Static(K::GROUP), + version: StringRef::Static(K::VERSION), + kind: StringRef::Static(K::KIND), + api_version: StringRef::Static(K::API_VERSION), + } + } +} + +/// The most generic representation of a single Kubernetes resource. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct DynamicObject { + /// standard metadata + pub metadata: ObjectMeta, + /// All other data. Meaning of this field depends on specific object. + #[serde(flatten)] + pub data: serde_json::Value, +} + +impl Meta for DynamicObject { + type Family = GroupVersionKind; + + fn group<'a>(f: &'a GroupVersionKind) -> Cow<'a, str> { + f.group.as_str().into() + } + + fn version<'a>(f: &'a GroupVersionKind) -> Cow<'a, str> { + f.version.as_str().into() + } + + fn kind<'a>(f: &'a GroupVersionKind) -> Cow<'a, str> { + f.kind.as_str().into() + } + + fn api_version<'a>(f: &'a GroupVersionKind) -> Cow<'a, str> { + f.api_version.as_str().into() + } + + fn meta(&self) -> &ObjectMeta { + &self.metadata + } + + fn name(&self) -> String { + self.metadata.name.clone().expect("missing name") + } + + fn namespace(&self) -> Option { + self.metadata.namespace.clone() + } + + fn resource_ver(&self) -> Option { + self.metadata.resource_version.clone() + } +} + #[cfg(test)] mod test { use crate::{ diff --git a/kube/src/api/metadata.rs b/kube/src/api/metadata.rs index bd8e269c8..18d3b844b 100644 --- a/kube/src/api/metadata.rs +++ b/kube/src/api/metadata.rs @@ -1,6 +1,7 @@ pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta}; use k8s_openapi::Metadata; use serde::{Deserialize, Serialize}; +use std::borrow::Cow; /// An accessor trait for Metadata. /// @@ -14,7 +15,28 @@ use serde::{Deserialize, Serialize}; /// - `.metadata.resource_version` /// /// This avoids a bunch of the unnecessary unwrap mechanics for apps. -pub trait Meta: Metadata { +pub trait Meta { + /// Types that know their metadata at compile time should select `Family = ()`. + /// Types that require some information at runtime should select `Family` + /// as type of this information. + type Family: Send + Sync + 'static; + /// Returns kind of this object + fn kind<'a>(f: &'a Self::Family) -> Cow<'a, str>; + /// Returns group of this object + fn group<'a>(f: &'a Self::Family) -> Cow<'a, str>; + /// Returns version of this object + fn version<'a>(f: &'a Self::Family) -> Cow<'a, str>; + /// Returns apiVersion of this object + fn api_version<'a>(f: &'a Self::Family) -> Cow<'a, str> { + let group = Self::group(f); + if group.is_empty() { + return Self::version(f); + } + let mut group = group.into_owned(); + group.push('/'); + group.push_str(&Self::version(f)); + group.into() + } /// Metadata that all persisted resources must have fn meta(&self) -> &ObjectMeta; /// The name of the resource @@ -30,6 +52,24 @@ impl Meta for K where K: Metadata, { + type Family = (); + + fn kind<'a>(_: &'a ()) -> Cow<'a, str> { + K::KIND.into() + } + + fn group<'a>(_: &'a ()) -> Cow<'a, str> { + K::GROUP.into() + } + + fn version<'a>(_: &'a ()) -> Cow<'a, str> { + K::VERSION.into() + } + + fn api_version<'a>(_: &'a ()) -> Cow<'a, str> { + K::API_VERSION.into() + } + fn meta(&self) -> &ObjectMeta { self.metadata() } diff --git a/kube/src/api/mod.rs b/kube/src/api/mod.rs index e47e4f3cd..3d44f90f2 100644 --- a/kube/src/api/mod.rs +++ b/kube/src/api/mod.rs @@ -20,7 +20,7 @@ pub(crate) mod typed; pub use typed::Api; mod dynamic; -pub use dynamic::DynamicResource; +pub use dynamic::{DynamicObject, DynamicResource, GroupVersionKind}; #[cfg(feature = "ws")] mod remote_command; #[cfg(feature = "ws")] pub use remote_command::AttachedProcess; diff --git a/kube/src/api/object.rs b/kube/src/api/object.rs index b370dedbb..095ff67fb 100644 --- a/kube/src/api/object.rs +++ b/kube/src/api/object.rs @@ -1,5 +1,5 @@ use crate::{ - api::metadata::{ListMeta, Meta, ObjectMeta, TypeMeta}, + api::metadata::{ListMeta, ObjectMeta, TypeMeta}, error::ErrorResponse, }; use serde::{Deserialize, Serialize}; @@ -10,10 +10,7 @@ use std::fmt::Debug; /// Note that a watch query returns many of these as newline separated JSON. #[derive(Deserialize, Serialize, Clone)] #[serde(tag = "type", content = "object", rename_all = "UPPERCASE")] -pub enum WatchEvent -where - K: Clone + Meta, -{ +pub enum WatchEvent { /// Resource was added Added(K), /// Resource was modified @@ -30,10 +27,7 @@ where Error(ErrorResponse), } -impl Debug for WatchEvent -where - K: Clone + Meta, -{ +impl Debug for WatchEvent { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match &self { WatchEvent::Added(_) => write!(f, "Added event"), diff --git a/kube/src/api/resource.rs b/kube/src/api/resource.rs index 7fd165780..86928cd0e 100644 --- a/kube/src/api/resource.rs +++ b/kube/src/api/resource.rs @@ -1,5 +1,8 @@ use super::params::{DeleteParams, ListParams, Patch, PatchParams, PostParams}; -use crate::{api::DynamicResource, Error, Result}; +use crate::{ + api::{DynamicResource, Meta}, + Error, Result, +}; use inflector::string::pluralize::to_plural; /// A Kubernetes resource that can be accessed through the API @@ -32,33 +35,50 @@ pub struct Resource { impl Resource { /// Cluster level resources, or resources viewed across all namespaces - pub fn all() -> Self { + pub fn all_with(f: &K::Family) -> Self { Self { - api_version: K::API_VERSION.to_string(), - kind: K::KIND.to_string(), - group: K::GROUP.to_string(), - version: K::VERSION.to_string(), + api_version: K::api_version(f).into_owned(), + kind: K::kind(f).into_owned(), + group: K::group(f).into_owned(), + version: K::version(f).into_owned(), namespace: None, } } /// Namespaced resource within a given namespace - pub fn namespaced(ns: &str) -> Self { - match K::KIND { + pub fn namespaced_with(ns: &str, f: &K::Family) -> Self { + let kind = K::kind(f); + match kind.as_ref() { "Node" | "Namespace" | "ClusterRole" | "CustomResourceDefinition" => { - panic!("{} is not a namespace scoped resource", K::KIND) + panic!("{} is not a namespace scoped resource", kind) } _ => {} } Self { - api_version: K::API_VERSION.to_string(), - kind: K::KIND.to_string(), - group: K::GROUP.to_string(), - version: K::VERSION.to_string(), + api_version: K::api_version(f).into_owned(), + kind: kind.into_owned(), + group: K::group(f).into_owned(), + version: K::version(f).into_owned(), namespace: Some(ns.to_string()), } } + /// Cluster level resources, or resources viewed across all namespaces + pub fn all() -> Self + where + ::Family: Default, + { + Self::all_with::(&Default::default()) + } + + /// Namespaced resource within a given namespace + pub fn namespaced(ns: &str) -> Self + where + ::Family: Default, + { + Self::namespaced_with::(ns, &Default::default()) + } + /// Manually configured resource or custom resource /// /// This is the only entrypoint to `Resource` that bypasses [`k8s_openapi`] entirely. diff --git a/kube/src/api/typed.rs b/kube/src/api/typed.rs index 4e95dd175..23eb5b4b3 100644 --- a/kube/src/api/typed.rs +++ b/kube/src/api/typed.rs @@ -32,11 +32,30 @@ pub struct Api { /// Expose same interface as Api for controlling scope/group/versions/ns impl Api where - K: k8s_openapi::Resource, + K: Meta, + ::Family: Default, { /// Cluster level resources, or resources viewed across all namespaces pub fn all(client: Client) -> Self { - let resource = Resource::all::(); + Self::all_with(client, &Default::default()) + } + + /// Namespaced resource within a given namespace + pub fn namespaced(client: Client, ns: &str) -> Self { + Self::namespaced_with(client, ns, &Default::default()) + } +} + +/// Expose same interface as Api for controlling scope/group/versions/ns +impl Api +where + K: Meta, +{ + /// Cluster level resources, or resources viewed across all namespaces + /// + /// This function accepts `K::Family` so it can be used with dynamic resources. + pub fn all_with(client: Client, family: &K::Family) -> Self { + let resource = Resource::all_with::(family); Self { resource, client, @@ -45,8 +64,10 @@ where } /// Namespaced resource within a given namespace - pub fn namespaced(client: Client, ns: &str) -> Self { - let resource = Resource::namespaced::(ns); + /// + /// This function accepts `K::Family` so it can be used with dynamic resources. + pub fn namespaced_with(client: Client, ns: &str, family: &K::Family) -> Self { + let resource = Resource::namespaced_with::(ns, family); Self { resource, client, @@ -54,6 +75,13 @@ where } } + /// Returns reference to the underlying `Resource` object. + /// It can be used to make low-level requests or as a `Family` + /// for a `DynamicObject`. + pub fn resource(&self) -> &Resource { + &self.resource + } + /// Consume self and return the [`Client`] pub fn into_client(self) -> Client { self.into() diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index 1cb53b835..514b9b40e 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -6,13 +6,7 @@ //! the [`Api`][crate::api::Api] type for more structured //! interaction with the kuberneres API. -use crate::{ - api::{Meta, WatchEvent}, - config::Config, - error::ErrorResponse, - service::Service, - Error, Result, -}; +use crate::{api::WatchEvent, config::Config, error::ErrorResponse, service::Service, Error, Result}; #[cfg(feature = "ws")] use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; @@ -201,12 +195,12 @@ impl Client { } /// Perform a raw request and get back a stream of [`WatchEvent`] objects - pub async fn request_events( + pub async fn request_events( &self, request: Request>, ) -> Result>>> where - T: DeserializeOwned, + T: Clone + DeserializeOwned, { let res = self.send(request.map(Body::from)).await?; // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());