Skip to content

Commit

Permalink
Ease the bound for reflector to only request identifying metadata
Browse files Browse the repository at this point in the history
Fixes #1389
  • Loading branch information
SOF3 committed Jan 15, 2024
1 parent c7054b5 commit ceccb8c
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 56 deletions.
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use request::Request;
mod resource;
pub use resource::{
ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource, ResourceExt, ResourceScope,
SubResourceScope,
SubResourceScope, api_version_from_group_version,
};

pub mod response;
Expand Down
21 changes: 13 additions & 8 deletions kube-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@ pub trait Resource {
fn version(dt: &Self::DynamicType) -> Cow<'_, str>;
/// Returns apiVersion of this object
fn api_version(dt: &Self::DynamicType) -> Cow<'_, str> {
let group = Self::group(dt);
if group.is_empty() {
return Self::version(dt);
}
let mut group = group.into_owned();
group.push('/');
group.push_str(&Self::version(dt));
group.into()
api_version_from_group_version(Self::group(dt), Self::version(dt))
}
/// Returns the plural name of the kind
///
Expand Down Expand Up @@ -115,6 +108,18 @@ pub trait Resource {
}
}

/// Helper function that creates the `apiVersion` field from the group and version strings.
pub fn api_version_from_group_version<'a>(group: Cow<'a, str>, version: Cow<'a, str>) -> Cow<'a, str> {
if group.is_empty() {
return version;
}

let mut output = group;
output.to_mut().push('/');
output.to_mut().push_str(&version);
output
}

/// Implement accessor trait for any ObjectMeta-using Kubernetes Resource
impl<K, S> Resource for K
where
Expand Down
5 changes: 2 additions & 3 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
mod object_ref;
pub mod store;

pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef};
pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef, ToObjectRef};
use crate::watcher;
use futures::{Stream, TryStreamExt};
use kube_client::Resource;
use std::hash::Hash;
pub use store::{store, Store};

Expand Down Expand Up @@ -91,7 +90,7 @@ pub use store::{store, Store};
/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
where
K: Resource + Clone,
K: ToObjectRef + Clone,
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
Expand Down
135 changes: 107 additions & 28 deletions kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,102 @@ use derivative::Derivative;
use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference};
use kube_client::{
api::{DynamicObject, Resource},
core::ObjectMeta,
ResourceExt,
core::{api_version_from_group_version, ObjectMeta},
};
use std::{
borrow::Cow,
fmt::{Debug, Display},
hash::Hash,
};

/// An object that can be indexed by a [reflector store](super::Store).
///
/// This trait is blanket-implemented for all [`Resource`] objects.
#[allow(clippy::module_name_repetitions)]
pub trait ToObjectRef {
/// Type information for types that do not know their resource information at compile time.
/// This is equivalent to [`Resource::DynamicType`].
type DynamicType;

/// The [kind](Resource::kind) for this object.
fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [group](Resource::group) for this object.
fn group(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [version](Resource::version) for this object.
fn version(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [apiVersion](Resource::_version) for this object.
fn api_version(dyntype: &Self::DynamicType) -> Cow<'_, str> {
api_version_from_group_version(Self::group(dyntype), Self::version(dyntype))
}

/// The [plural](Resource::plural) for this object.
fn plural(dyntype: &Self::DynamicType) -> Cow<'_, str>;

/// The [name](ObjectMeta#structfield.name) of the object.
fn name(&self) -> Option<Cow<'_, str>>;

/// The [namespace](ObjectMeta#structfield.namespace) of the object.
fn namespace(&self) -> Option<Cow<'_, str>>;

/// The [resource version](ObjectMeta#structfield.resource_version) of the object.
fn resource_version(&self) -> Option<Cow<'_, str>>;

/// The [UID](ObjectMeta#structfield.uid) of the object.
fn uid(&self) -> Option<Cow<'_, str>>;

/// Constructs an [`ObjectRef`] for this object.
fn to_object_ref(&self, dyntype: Self::DynamicType) -> ObjectRef<Self> {
ObjectRef {
dyntype,
name: self.name().expect(".metadata.name missing").into_owned(),
namespace: self.namespace().map(Cow::into_owned),
extra: Extra {
resource_version: self.resource_version().map(Cow::into_owned),
uid: self.uid().map(Cow::into_owned),
},
}
}
}

impl<K: Resource> ToObjectRef for K {
type DynamicType = K::DynamicType;

fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::kind(dyntype)
}

fn version(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::version(dyntype)
}

fn group(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::group(dyntype)
}

fn plural(dyntype: &Self::DynamicType) -> Cow<'_, str> {
K::plural(dyntype)
}

fn name(&self) -> Option<Cow<'_, str>> {
self.meta().name.as_deref().map(Cow::Borrowed)
}

fn namespace(&self) -> Option<Cow<'_, str>> {
self.meta().namespace.as_deref().map(Cow::Borrowed)
}

fn resource_version(&self) -> Option<Cow<'_, str>> {
self.meta().resource_version.as_deref().map(Cow::Borrowed)
}

fn uid(&self) -> Option<Cow<'_, str>> {
self.meta().uid.as_deref().map(Cow::Borrowed)
}
}

#[derive(Derivative)]
#[derivative(
Debug(bound = "K::DynamicType: Debug"),
Expand All @@ -33,7 +121,7 @@ use std::{
/// );
/// ```
#[non_exhaustive]
pub struct ObjectRef<K: Resource> {
pub struct ObjectRef<K: ToObjectRef + ?Sized> {
pub dyntype: K::DynamicType,
/// The name of the object
pub name: String,
Expand Down Expand Up @@ -69,7 +157,7 @@ pub struct Extra {
pub uid: Option<String>,
}

impl<K: Resource> ObjectRef<K>
impl<K: ToObjectRef> ObjectRef<K>
where
K::DynamicType: Default,
{
Expand All @@ -81,13 +169,13 @@ where
#[must_use]
pub fn from_obj(obj: &K) -> Self
where
K: Resource,
K: ToObjectRef,
{
Self::from_obj_with(obj, Default::default())
obj.to_object_ref(Default::default())
}
}

impl<K: Resource> ObjectRef<K> {
impl<K: ToObjectRef> ObjectRef<K> {
#[must_use]
pub fn new_with(name: &str, dyntype: K::DynamicType) -> Self {
Self {
Expand All @@ -108,15 +196,9 @@ impl<K: Resource> ObjectRef<K> {
#[must_use]
pub fn from_obj_with(obj: &K, dyntype: K::DynamicType) -> Self
where
K: Resource,
K: ToObjectRef,
{
let meta = obj.meta();
Self {
dyntype,
name: obj.name_unchecked(),
namespace: meta.namespace.clone(),
extra: Extra::from_obj_meta(meta),
}
obj.to_object_ref(dyntype)
}

/// Create an `ObjectRef` from an `OwnerReference`
Expand Down Expand Up @@ -148,7 +230,7 @@ impl<K: Resource> ObjectRef<K> {
/// Note that no checking is done on whether this conversion makes sense. For example, every `Service`
/// has a corresponding `Endpoints`, but it wouldn't make sense to convert a `Pod` into a `Deployment`.
#[must_use]
pub fn into_kind_unchecked<K2: Resource>(self, dt2: K2::DynamicType) -> ObjectRef<K2> {
pub fn into_kind_unchecked<K2: ToObjectRef>(self, dt2: K2::DynamicType) -> ObjectRef<K2> {
ObjectRef {
dyntype: dt2,
name: self.name,
Expand All @@ -159,15 +241,21 @@ impl<K: Resource> ObjectRef<K> {

pub fn erase(self) -> ObjectRef<DynamicObject> {
ObjectRef {
dyntype: kube_client::api::ApiResource::erase::<K>(&self.dyntype),
dyntype: kube_client::api::ApiResource {
group: K::group(&self.dyntype).to_string(),
version: K::version(&self.dyntype).to_string(),
api_version: K::api_version(&self.dyntype).to_string(),
kind: K::kind(&self.dyntype).to_string(),
plural: K::plural(&self.dyntype).to_string(),
},
name: self.name,
namespace: self.namespace,
extra: self.extra,
}
}
}

impl<K: Resource> From<ObjectRef<K>> for ObjectReference {
impl<K: ToObjectRef> From<ObjectRef<K>> for ObjectReference {
fn from(val: ObjectRef<K>) -> Self {
let ObjectRef {
dyntype: dt,
Expand All @@ -190,7 +278,7 @@ impl<K: Resource> From<ObjectRef<K>> for ObjectReference {
}
}

impl<K: Resource> Display for ObjectRef<K> {
impl<K: ToObjectRef> Display for ObjectRef<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand All @@ -207,15 +295,6 @@ impl<K: Resource> Display for ObjectRef<K> {
}
}

impl Extra {
fn from_obj_meta(obj_meta: &ObjectMeta) -> Self {
Self {
resource_version: obj_meta.resource_version.clone(),
uid: obj_meta.uid.clone(),
}
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
26 changes: 10 additions & 16 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::ObjectRef;
use super::{ObjectRef, ToObjectRef};
use crate::{
utils::delayed_init::{self, DelayedInit},
watcher,
};
use ahash::AHashMap;
use derivative::Derivative;
use kube_client::Resource;
use parking_lot::RwLock;
use std::{fmt::Debug, hash::Hash, sync::Arc};
use thiserror::Error;
Expand All @@ -17,7 +16,7 @@ type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;
/// This is exclusive since it's not safe to share a single `Store` between multiple reflectors.
/// In particular, `Restarted` events will clobber the state of other connected reflectors.
#[derive(Debug)]
pub struct Writer<K: 'static + Resource>
pub struct Writer<K: 'static + ToObjectRef>
where
K::DynamicType: Eq + Hash,
{
Expand All @@ -27,7 +26,7 @@ where
ready_rx: Arc<DelayedInit<()>>,
}

impl<K: 'static + Resource + Clone> Writer<K>
impl<K: 'static + ToObjectRef + Clone> Writer<K>
where
K::DynamicType: Eq + Hash + Clone,
{
Expand Down Expand Up @@ -61,23 +60,18 @@ where
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Applied(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let key = obj.to_object_ref(self.dyntype.clone());
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
}
watcher::Event::Deleted(obj) => {
let key = ObjectRef::from_obj_with(obj, self.dyntype.clone());
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
.iter()
.map(|obj| {
(
ObjectRef::from_obj_with(obj, self.dyntype.clone()),
Arc::new(obj.clone()),
)
})
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())))
.collect::<AHashMap<_, _>>();
*self.store.write() = new_objs;
}
Expand All @@ -91,7 +85,7 @@ where
}
impl<K> Default for Writer<K>
where
K: Resource + Clone + 'static,
K: ToObjectRef + Clone + 'static,
K::DynamicType: Default + Eq + Hash + Clone,
{
fn default() -> Self {
Expand All @@ -107,7 +101,7 @@ where
/// use `Writer::as_reader()` instead.
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
pub struct Store<K: 'static + Resource>
pub struct Store<K: 'static + ToObjectRef>
where
K::DynamicType: Hash + Eq,
{
Expand All @@ -119,7 +113,7 @@ where
#[error("writer was dropped before store became ready")]
pub struct WriterDropped(delayed_init::InitDropped);

impl<K: 'static + Clone + Resource> Store<K>
impl<K: 'static + Clone + ToObjectRef> Store<K>
where
K::DynamicType: Eq + Hash + Clone,
{
Expand Down Expand Up @@ -201,7 +195,7 @@ where
#[must_use]
pub fn store<K>() -> (Store<K>, Writer<K>)
where
K: Resource + Clone + 'static,
K: ToObjectRef + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::default();
Expand Down

0 comments on commit ceccb8c

Please sign in to comment.