-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming refresh for kubernetes using watches #271
Conversation
a072dd2
to
709e770
Compare
Is there a common pattern here (and from other streaming providers) than can be extracted to a Mixin or a base class? |
Yeah probably, this is similar to vmware and kubevirt streaming refreshes. In the BaseManager::Refresher class would probably even work, check if ems.supports_streaming_refresh? and call out to methods to setup the threads/etc... |
2a43c13
to
869a8f4
Compare
8f461fa
to
542b7d4
Compare
542b7d4
to
5daaad4
Compare
fc52ce5
to
67266cc
Compare
67266cc
to
f638a37
Compare
Checked commits agrare/manageiq-providers-kubernetes@f32af19~...f638a37 with ruby 2.3.3, rubocop 0.52.1, haml-lint 0.20.0, and yamllint 1.10.0 app/models/manageiq/providers/kubernetes/container_manager/streaming_refresh_mixin.rb
|
def start_watch_threads | ||
_log.info("#{log_header} Starting watch threads...") | ||
|
||
entity_types.each do |entity_type| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I remember there was an issue that watch is being disconnected every hour or so? Is that still a case @cben
So we would need a logic checking if the watch died?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaik yes, ManageIQ/kubeclient#273, not solved.
|
||
def save_resource_versions(inventory) | ||
entity_types.each do |entity_type| | ||
resource_versions[entity_type] = inventory.collector.send(entity_type).resourceVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going forward, we should probably store this to some table?
The point would be not having full refresh and streaming connected. Since we want to start streaming right away, and not wait like half hour(or hours) for the full refresh.
Can we get the 'whole collection resourceVersion' from watches? We should add a new table, where we store the latest resource versions of watches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully, we should be able to check that the last 'whole collection resourceVersion' is still in watches, so we can avoid a full refresh even when the worker is restarted.
So we would do full refresh, only if we'll detect a gap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's first get this working well with streaming only after full refresh.
Can we get the 'whole collection resourceVersion' from watches?
Not directly, but last received version of individual object is the thing to use as collection version to watch from.
We'll need to figure out "version skew" between components — when all parts are busy and queue
is not empty, do we want last recieved version / last dequeued version / last persisted version?
(Here not an issue as save_resource_versions
only called in quiet moment after full refresh)
So we would do full refresh, only if we'll detect a gap.
k8s says with etcd3 they only keep history ~5min back => not sure this matters much — if we were down, we'll frequently have a gap...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a great start 👍
I have few comments that we should solve in next PRs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing, like what I saw so far...
I like how the full flow is visible in do_work_streaming_refresh
❤️
In-process queue is simpler in so many ways than MiqQueue... :)
def start_watch_threads | ||
_log.info("#{log_header} Starting watch threads...") | ||
|
||
entity_types.each do |entity_type| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaik yes, ManageIQ/kubeclient#273, not solved.
@@ -26,6 +30,10 @@ def supports_metrics? | |||
endpoints.where(:role => METRICS_ROLES).exists? | |||
end | |||
|
|||
def streaming_refresh_enabled? | |||
Settings.ems_refresh[emstype.to_sym]&.streaming_refresh | |||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you move this nearer supports :streaming_refresh
(or inline it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so it looks fine in overall, I am going to merge.
We'll fix the pending issues in follow up. (I am gonna build the service catalog refresh on this)
|
||
until finish.value | ||
watch_stream.each { |notice| queue.push(notice) } | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This until
is ineffective.
watch_stream.each {...}
may run infinitely! In practice it sometimes stops, but on order of hour, not 10sec as stop_watch_threads
hopes...
And if it stops, you can't restart same watch_stream
, you need a new start_watch
connection.
You could move the check inside the each, something like this:
watch_stream.each do |notice|
queue.push(notice)
break if finish.value
end
(untested, not sure kubeclient cleans up correctly with break
)
However, this too only stops after next watch notice!
You could call watch_stream.finish
from stop_watch_threads
to stop faster.
It's a violent kludge — closes the underlying http connection, letting each
crash and hopefully rescue 💥 but IMO good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I noticed that this always ends up exiting only after the timeout and killing the threads.
I pulled the logic from KubeVirt https://github.com/ManageIQ/manageiq-providers-kubevirt/blob/master/app/models/manageiq/providers/kubevirt/infra_manager/refresh_worker/runner.rb#L215-L219 so someone should tell them 😄
not sure kubeclient cleans up correctly with break
Probably no worse than the process exiting out from under the threads because they failed to join! 😆
However, this too only stops after next watch notice!
Yeah I tried moving to this approach with my last PR but if no notices were delivered it wasn't much better. You could definitely argue a production system would have no issue with timely changes though.
You could call watch_stream.finish from stop_watch_threads to stop faster.
I'll give this a try thanks! Definitely sounds like the best bad approach.
|
||
def save_resource_versions(inventory) | ||
entity_types.each do |entity_type| | ||
resource_versions[entity_type] = inventory.collector.send(entity_type).resourceVersion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: how does this line work? what's say collector.pods.resourceVersion
?
AFAICT, collector.pods
will return an array.
Did you mean send(entity_type).last.resourceVersion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use resource version of the collection, not object, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad, I initially thought collector is a Kubernetes::Inventory::Collector::Watches
, only later realized it's full refresh. I see, get_pods
result is not just an array, it has .resourceVersion
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I want to try to compartmentalize the collection resource_version logic in the collector, so that this could do collector.pods_resource_version
and the full collector would just use the resourceVersion from the collection and the watches collector would populate it from the last entity's resourceVersion, right now this logic is in this mixin which IMO is less than ideal but works for now. It is on my list of refactorings :)
Add the ability to use watches in the RefreshWorker in place of the standard queued full/targeted refreshes.
Tasks still pending:
Since there is no indication if a watch started with resource_version => "0" has "completed" we need to do an initial full refresh, get the resourceVersion of the collection, then start the watch.At present this will only add/edit entities using the TargetedCollection persister.Depends on: ManageIQ/manageiq#17531