-
-
Notifications
You must be signed in to change notification settings - Fork 323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Controller::for_stream
+ Controller::watches_stream
#1187
Conversation
This adds a stream interface analogue for `Controller::new`, that allows the user to create the root `watcher` and `reflector`, outside, and pass in the reader + flattened stream. We originally discussed this name as `from_stream`, but given that we need to also pass along the `reader`, it is no longer a `From` equivalent. So have instead named it along the lines of `Controller::for` and highlighted the terminology. Note that `Controller::For` is the [controller-runtime equivalent](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/builder#Builder.For). Also fixes up bad doc setups and ensures unstable features are highlighted, even though they appear in the middle of a trait. The owns-stream unstable flagged is moved to `unstable-runtime-stream-share` which now also contains this new method. Signed-off-by: clux <sszynrae@gmail.com>
closes #1186 Tried to cherry-pick, but would have had to do a rebase on the two commits in there, so just re-did it. Thanks @suryapandian. Signed-off-by: clux <sszynrae@gmail.com>
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #1187 +/- ##
==========================================
- Coverage 73.48% 73.44% -0.04%
==========================================
Files 68 68
Lines 5355 5370 +15
==========================================
+ Hits 3935 3944 +9
- Misses 1420 1426 +6
|
Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
Controller::for_stream
Controller::for_stream
+ Controller::watches_stream
Signed-off-by: clux <sszynrae@gmail.com>
since it doesn't work with stream subscribe yet Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
This is working good enough for an unstable inclusion imo, but can benefit from a look-over. It will not work for stream sharing yet because we need consensus on whether to Arc up literally every stream interface (as noted above), but this at the very least allows configuring streams outside Pinging @Dav1dde @danrspencer as interested parties. |
Here's a diff I have used to test a larger controller with most new unstable features: let api = Api::<PartialObjectMeta<K>>::all(ctx.client.clone());
let context = Arc::new(ctx);
+ let (reader, writer) = reflector::store();
+ let stream = reflector(writer, metadata_watcher(api, watcher::Config::default()))
+ .applied_objects()
+ .predicate_filter(predicates::generation);
- Controller::new(api, ListParams::default())
+ Controller::for_stream(stream, reader)
.shutdown_on_signal()
.run(reconcile, error_policy, context)
.for_each(|_| futures::future::ready(())) |
I think |
Signed-off-by: clux <sszynrae@gmail.com>
/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
};
trigger_others(stream, mapper, child_type)
} As diff: diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs
index 87be9e5..0c5d80a 100644
--- a/kube-runtime/src/controller/mod.rs
+++ b/kube-runtime/src/controller/mod.rs
@@ -162,22 +162,16 @@ where
KOwner: Resource,
KOwner::DynamicType: Clone,
{
- trigger_with(stream, move |obj| {
+ let mapper = move |obj: S::Ok| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
- let child_ref = ObjectRef::from_obj_with(&obj, child_type.clone()).erase();
meta.owner_references
.into_iter()
.flatten()
.filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
- .map(move |owner_ref| ReconcileRequest {
- obj_ref: owner_ref,
- reason: ReconcileReason::RelatedObjectUpdated {
- obj_ref: Box::new(child_ref.clone()),
- },
- })
- })
+ };
+ trigger_others(stream, mapper, child_type)
} |
Signed-off-by: clux <sszynrae@gmail.com>
That's great. Worked immediately. |
Alright windows is green here after sync despite it consistently crashing in the other PR 🙃 Will merge this at lunch and prepare a release. |
This adds a stream interface analogue for
Controller::new
, that allows the user to create the rootwatcher
andreflector
, outside, and pass in the reader + flattened stream. This is for #1180 as a building block for stream sharing #1080.In a similar vein, we also add
Controller::watches_stream
as an analogue forController::watches
. That name should be less controversial given we haveController::owns_stream
.Naming
For now, have named it along the lines of
Controller::for
and highlighted the terminology (noting thatController::For
is the controller-runtime equivalent).However, there may be some better alternatives. Previously suggested:
Controller::from_stream
- original plan, but that was before we needed to pass in thereader
- so not a cleanFrom
Controller::new_stream
- awkward, suggests we are creating a new stream?Controller::new_from_stream
- very long, and leads to the obscene::new_from_stream_with
Controller::new_with
- taken by the dynamic variantController::new_via
- also awkward in combination with::new_via_with
Feel like realistically it's between
Controller::for_stream
andController::from_stream
.Docs Fixes + Unstable Feature Consolidation
Also fixes up bad doc setups and ensures unstable features are highlighted, even though they appear in the middle of a trait (i am not convinced it will show up correctly on docs.rs so being a little careful first time).
The owns-stream unstable flagged is moved to
unstable-runtime-stream-share
, which now also contains this new method.Consequences
When using
unstable-runtime
, we can actually usemetadata_watcher
with allController
input interfaces.Tested on a controller running several controller loops on a large cluster with thousands of pods and saw a >2x memory drop (449MB -> 210MB from a smaller core controller reflector passed in), and significant bandwidth reduction as well (230kB/s -> 30kB/s TX, 20kB/s -> 13kB/s RX but that's partially from using predicates).
Stream Sharing
Currently this interface does not work with
WatchStreamExt::stream_subscribe
because of the lack ofArc<K>
in our interfaces. I want to change this long term, but this also involves addingArc
on everything internal toController
, but I think that's OK. These new interfaces are unstable, and streams are otherwise constructed internally and could be arc'd up therein.We already
Arc
-wrap before calling out toreconcile
anderror_policy
so it would be nice to just havewatcher
(or a method inWatchStreamExt
do this. Leaving this for a separate PR though.