Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish the watch stream connection when exiting #278

Merged
merged 1 commit into from
Aug 23, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module ManageIQ::Providers::Kubernetes::ContainerManager::StreamingRefreshMixin
include Vmdb::Logging

attr_accessor :ems, :finish, :initial, :resource_versions, :watch_threads, :queue
attr_accessor :ems, :initial, :resource_versions, :watch_streams, :watch_threads, :queue

def after_initialize
super
Expand All @@ -26,11 +26,11 @@ def before_exit(_message, _exit_code)
private

def setup_streaming_refresh
self.finish = Concurrent::AtomicBoolean.new(false)
self.initial = true
self.queue = Queue.new
self.resource_versions = {}
self.watch_threads = {}
self.watch_streams = Concurrent::Map.new
self.watch_threads = Concurrent::Map.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder, not necessarily in this PR: you also planned resource_versions to be Concurrent::Map.

is this https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Hash.html ?

locks against the object itself for every method call, ensuring only one thread can be reading or writing at a time

will lock contention be a problem?
I'd guess this is minor compared to overhead of reading notices from network.
anyway I'm cool with erring on side of safety and profiling later.

end

def do_work_streaming_refresh
Expand Down Expand Up @@ -108,7 +108,12 @@ def ensure_watch_threads
def stop_watch_threads
safe_log("#{log_header} Stopping watch threads...")

finish.value = true
# First call WatchStream#finish to forcibly terminate the loop, this
# closes the HTTP connection and will cause the #each method to raise an
Copy link
Contributor

@cben cben Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be precise, the #each method crashes and rescues internally.
it's awkward implementation, but should be invisible.
if you see any exception leaking outside it, it's kubeclient bug (we already escalated the rescue couple times, ManageIQ/kubeclient#280 and ManageIQ/kubeclient#315)

oh, I see, I think manageiq is still using old kubeclient 2.5.2 where some are uncatched :-(
includes 280 but not 315.
bumping kubeclient to 3.x / 4.x is still blocked on several things that are basically ready, I just need to test image scanning.
I could also release 2.5.3 with 315 backported.

ok to catch HTTP::ConnectionError for now but let's have a more precise comment and do nag me to bump or backport.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay added that PR to the comment re: the exception

# exception (until https://github.com/abonas/kubeclient/pull/315 is applied).
watch_streams.each_value(&:finish)

# Next loop through each thread and join them cleanly
watch_threads.each_value { |thread| thread.join(10) }

safe_log("#{log_header} Stopping watch threads...Complete")
Expand All @@ -122,9 +127,11 @@ def watch_thread(entity_type)
_log.info("#{log_header} #{entity_type} watch thread started")

resource_version = resource_versions[entity_type] || "0"
watch_stream = start_watch(entity_type, resource_version)

until finished?
watch_stream = start_watch(entity_type, resource_version)
watch_streams[entity_type] = watch_stream

begin
watch_stream.each do |notice|
# Update the collection resourceVersion to be the most recent
# object's resourceVersion so that if this watch has to be restarted
Expand All @@ -134,6 +141,8 @@ def watch_thread(entity_type)

queue.push(notice)
end
rescue HTTP::ConnectionError
# This is raised when #finish is called on a WatchStream
end

_log.info("#{log_header} #{entity_type} watch thread exiting")
Expand All @@ -146,10 +155,6 @@ def start_watch(entity_type, resource_version = "0")
connection_for_entity(entity_type).send(watch_method, :resource_version => resource_version)
end

def finished?
finish.value
end

def connection_for_entity(_entity_type)
kubernetes_connection
end
Expand Down