Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arc wrap watcher output #1266

Closed
wants to merge 4 commits into from

Conversation

mateiidavid
Copy link
Contributor

Motivation

Controller machinery has been changed to support arbitrary streams; users may instantiate controllers with already established watches, or already created stores. A subscriber interface has also been added to allow for stream sharing; due to the implementation details of the subscriber interface, the output is incompatible with what the controller expects.

This change started as a proof of concept based on #1189. It attempts to bridge the two APIs by Arcing the output from watchers. Since this is done at a very low level in the codepath, both the controller and other dependent helpers (e.g. the reflector or event flattener) have been changed to both read and write arc'd streams. As an added benefit, arced streams should relieve some memory pressure by avoiding clones on k8s objects.

Solution

  1. Changed step_trampolined to arc the object. This is propagated all the way up to the watcher().
  2. Reflectors now clone the Arc instead of doing their own Arcs.
  3. Event flatten now unwraps the inner value of an event and returns it as an Arc
  4. Controller now internally works on Arc'd streams. A few associated type bounds have been changed in the triggers (mostly for ok values of TryStreams). Mappers have also had their signatures changed.

Signed-off-by: Matei David <matei.david.35@gmail.com>
Signed-off-by: Matei David <matei.david.35@gmail.com>
@clux clux added the changelog-change changelog change category for prs label Jul 30, 2023
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot for doing this! having seen this play out, i personally think this approach makes more sense than opting into Arc at a later level.

if we get this to pass tests, then there a few things that would be good to have;

there is at least one place that it probably can be propagated into on the controller side (passing into user reconciler):

reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()

and there's also probably room for a stream sharing controller example (maybe a dumb thing with two co-hosted Controller instances re-using the same crd stream?) to verify that it all works.

kube-runtime/src/reflector/store.rs Show resolved Hide resolved
kube-runtime/src/wait.rs Show resolved Hide resolved
@clux clux linked an issue Jul 31, 2023 that may be closed by this pull request
Signed-off-by: Matei David <matei.david.35@gmail.com>
@codecov
Copy link

codecov bot commented Aug 4, 2023

Codecov Report

Merging #1266 (54c2508) into main (c3fbe25) will increase coverage by 0.44%.
The diff coverage is 94.80%.

❗ Current head 54c2508 differs from pull request most recent head f29e0ae. Consider uploading reports for the commit f29e0ae to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1266      +/-   ##
==========================================
+ Coverage   72.31%   72.76%   +0.44%     
==========================================
  Files          75       75              
  Lines        6343     6179     -164     
==========================================
- Hits         4587     4496      -91     
+ Misses       1756     1683      -73     
Files Coverage Δ
kube-runtime/src/reflector/mod.rs 100.00% <100.00%> (ø)
kube-runtime/src/reflector/store.rs 99.08% <100.00%> (+0.13%) ⬆️
kube-runtime/src/utils/event_flatten.rs 92.85% <100.00%> (+0.54%) ⬆️
kube-runtime/src/utils/event_modify.rs 96.00% <100.00%> (+0.16%) ⬆️
kube-runtime/src/utils/reflect.rs 100.00% <100.00%> (ø)
kube-runtime/src/utils/watch_ext.rs 11.76% <ø> (ø)
kube-runtime/src/wait.rs 75.47% <100.00%> (-2.08%) ⬇️
kube-runtime/src/watcher.rs 49.70% <100.00%> (+11.56%) ⬆️
kube/src/lib.rs 89.04% <100.00%> (+0.07%) ⬆️
kube/src/mock_tests.rs 97.87% <100.00%> (ø)
... and 2 more

... and 18 files with indirect coverage changes

@mateiidavid
Copy link
Contributor Author

mateiidavid commented Aug 4, 2023

@clux my pleasure! Thanks for having a look. I made some changes to get CI to pass. I'm surprised the change came out as small as it did. All in all, having worked on it, there are some nice things about the API change (particularly around the reflector, using shared memory to pass all of the state around, etc.), but I do think the change is pretty significant and scary. I'm happy to investigate more if either you or @nightkr want to see what other alternatives look like.

Problems:

  • Event::modify has to change to support arc wrapping objects in the watcher. To get it to pass, I ended up using Arc::get_mut. If we end up having an event wrapping an arc, we could potentially modify it before the object is stored, or before the stream is shared. IIUC, we'd had only one reference at that point in time. As long as this contract holds, and it is properly documented, then all's well. However, it also exposes a potential footgun for users. We can mitigate it through an error, or logging, or something else, or we could potentially solve this problem in an entirely different way. My goal here was to get this to compile. It's one of these things that makes me feel uncomfortably worried :D
  • Having a objects arc wrapped so early in the codepath means users that are not interested in stream sharing, or even the reflector interface, have to deal with Arc. This is also going to be a breaking change. Is there anything we can do here to make the API easier to understand, or to at least communicate what has to be done for people to upgrade safely?
  • Some tests are a bit more awkward now (particularly around using matches!). Will definitely need a thorough look to make sure we're not sacrificing any quality there.
  • stream_subscribe interface: since it deals with a stream of watch results, it ends up arc wrapping everything. One of my early ideas was to change the interface itself to take a stream of objects. Perhaps we could also use a TryStream? I have to admit this is the point where I genuinely feel a bit stuck. If either of you have an idea here, please let me know :) (disclaimer: I haven't worked on this part at all this week, felt more important to actually get the branch to pass the tests)

Still left to do:

  • Pass stream in the reconciler (as mentioned in comment)
  • Write an example for stream sharing.
  • Docs, if we're moving ahead.

cc @Dav1dde, I know you've also worked on parts of this and would love it if you could provide some feedback. No pressure!

@mateiidavid mateiidavid requested a review from clux August 4, 2023 18:36
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on two of the issues; user facing changes, and the modify() problem.

Comment on lines 30 to 35
fn handle_event(ev: Arc<Event>) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
ev.message.as_ref().unwrap().trim(),
ev.involved_object.kind.as_ref().unwrap(),
ev.involved_object.name.as_ref().unwrap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like some of the more awkward user facing interactions can be simplified a bit by making fns like these take a &Event (say) and call as_ref() earlier as you've done in for wait.rs

Comment on lines +115 to +119
Event::Applied(obj) | Event::Deleted(obj) => {
if let Some(obj) = Arc::get_mut(obj) {
(f)(obj)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you've noted, this is likely one of the more awkward problems that falls out of this. If this works like you are saying, i.e.

  • watcherstream.modify(f).reflect(writer) modifies and works
  • watcherstream.reflect(writer).modify(f) does not modify because the store has a ref to the arc

then this really highlights the fundamental pros and cons this PR is trying to make; we are fundamentally passing the same object around everywhere, and rely on users to clone if they need to (say via arc's clone-on-write behaviour on Arc::make_mut).

Given that reconcilers are already Arcd (and they are the main stream-end consumers) I think this is not too bad on the user side, plus it makes reconcilers and direct watch stream consumers the same, which is nice.

For the particular store into modify case, there are two options as i see:

  1. make modification after storing an error via Arc::try_unwrap - propagating that error from modify
  2. use Arc::make_mut in modify to allow the separation, while pasing the unmodified object around if chosen, having a copy shunted through the remains of the watcher stream.

I personally lead toward 1 because if people have a fn to modify, they can just apply that at the end of their controller stack in a reconciler helper that ultimately does a short term clone, rather than force the clone early on (when the object might be queued for minutes).

There is also a third option; type enforcement of the order (e.g. have reflector modify the output type in a way that makes it incompatible with modify). Although I am not sure how to do that cleanly.

@clux
Copy link
Member

clux commented Aug 7, 2023

About StreamSubscribe, looking more closely at its signature, it does look like it requires an unflattened stream in ::steam_subscribe which feels wrong to me. I'd have expected a usage kind of like:

let w1 = watcher(api, cfg).reflect(writer); // a flattened stream of all events
let w2 = w1.subscribe().predicate_filter(p); // a copy of the flattened stream with more specific filters applied

and pass w1 and w2 to different controllers, so that they'd reuse the reflector, and filter more specifically after. as it stands, if the stream is unflattened, then both sides might have to make a reflector IIUC.

I don't remember the reason for this, might have to dig in the original PR at #1131, but this feels wrong looking at it now.

Signed-off-by: Matei David <matei.david.35@gmail.com>
@mateiidavid
Copy link
Contributor Author

Coming back to this. I merged the latest main (the merge was pretty pain free actually). I'm going to prototype everything again in a different branch just so that I can reload the context into my brain and get acquainted with some of the controller logic.

There are still 2 issues:

  • StreamSubscribe API last time looked a bit weird, going to look at it separately.
  • Check up on the comments, there was an issue with watcher::modify which made the arc use a bit awkward, will need to check what I wrote, the answer, and whether I was correct the first time around to begin with.

Will keep folks up to date! Thanks for bearing with me :)

@mateiidavid
Copy link
Contributor Author

The time has come to finally retire this. Superseded by #1449.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
changelog-change changelog change category for prs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Shareable Controller stream interfaces
2 participants