Skip to content

Commit

Permalink
Merge pull request #494 from grosser/grosser/informer
Browse files Browse the repository at this point in the history
informer library for shared watching with retries and updated list
  • Loading branch information
cben authored Mar 30, 2021
2 parents e175e95 + e8493a9 commit 19a5d14
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ Style/AccessorGrouping:
EnforcedStyle: separated
Style/NegatedIfElseCondition:
Enabled: false
Style/Semicolon:
Exclude: ["test/**/*.rb"]
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,20 @@ to be substituted. Note that for a required parameter that does not provide a ge
client.process_template template
```

### Informer

A list that is always updated because it is it kept in sync by a watch in the background.
Can also share a list+watch with multiple threads.

```ruby
client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
informer = Kubeclient::Informer.new(client, "pods", reconcile_timeout: 15 * 60, logger: Logger.new(STDOUT))
informer.start_worker

informer.list # all current pods
informer.watch { |notice| } # watch for changes (hides restarts and errors)
```

## Contributing

1. Fork it ( https://github.com/[my-github-username]/kubeclient/fork )
Expand Down
1 change: 1 addition & 0 deletions lib/kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_relative 'kubeclient/exec_credentials'
require_relative 'kubeclient/gcp_auth_provider'
require_relative 'kubeclient/http_error'
require_relative 'kubeclient/informer'
require_relative 'kubeclient/missing_kind_compatibility'
require_relative 'kubeclient/oidc_auth_provider'
require_relative 'kubeclient/resource'
Expand Down
93 changes: 93 additions & 0 deletions lib/kubeclient/informer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
module Kubeclient
# caches results for multiple consumers to share and keeps them updated with a watch
class Informer
def initialize(client, resource_name, reconcile_timeout: 15 * 60, logger: nil)
@client = client
@resource_name = resource_name
@reconcile_timeout = reconcile_timeout
@logger = logger
@cache = nil
@started = nil
@watching = []
end

def list
@cache.values
end

def watch(&block)
with_watching(&block)
end

# not implicit so users know they have to `stop`
def start_worker
@worker = Thread.new do
loop do
fill_cache
watch_to_update_cache
rescue StandardError => e
# need to keep retrying since we work in the background
@logger&.error("ignoring error during background work #{e}")
ensure
sleep(1) # do not overwhelm the api-server if we are somehow broken
end
end
sleep(0.01) until @cache
end

def stop_worker
@worker&.kill # TODO: be nicer ?
end

private

def with_watching
queue = Queue.new
@watching << queue
loop do
x = queue.pop
yield(x)
end
ensure
@watching.delete(queue)
end

def cache_key(resource)
resource.dig(:metadata, :uid)
end

def fill_cache
reply = @client.get_entities(nil, @resource_name, raw: true, resource_version: '0')
@cache = reply[:items].each_with_object({}) do |item, h|
h[cache_key(item)] = item
end
@started = reply.dig(:metadata, :resourceVersion)
end

def watch_to_update_cache
watcher = @client.watch_entities(@resource_name, watch: true, resource_version: @started)
stop_reason = 'disconnect'

# stop watcher without using timeout
Thread.new do
sleep(@reconcile_timeout)
stop_reason = 'reconcile'
watcher.finish
end

watcher.each do |notice|
case notice[:type]
when 'ADDED', 'MODIFIED' then @cache[cache_key(notice[:object])] = notice[:object]
when 'DELETED' then @cache.delete(cache_key(notice[:object]))
when 'ERROR'
stop_reason = 'error'
break
else
@logger&.error("Unsupported event type #{notice[:type]}")
end
@watching.each { |q| q << notice }
end
@logger&.info("watch restarted: #{stop_reason}")
end
end
end
163 changes: 163 additions & 0 deletions test/test_informer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# frozen_string_literal: true

require_relative 'test_helper'

# tests with_retries in common.rb
class RetryTest < MiniTest::Test
def setup
super
@slept = []
stub_core_api_list
end

# prevent leftover threads from causing trouble
def teardown
(Thread.list - [Thread.current]).each(&:kill)
super
end

def test_lists_at_start
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(body: '', status: 200)
with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
end
assert_requested(list, times: 1)
assert_requested(watch, times: 1)
end

def test_watches_for_updates
lock = Mutex.new
lock.lock
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).with { lock.lock }.to_return(
body: {
type: 'MODIFIED', object: { metadata: { name: 'b', uid: 'id1' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
lock.unlock # trigger watch
sleep(0.02) # wait for watch to finish
assert_equal(['b'], informer.list.map { |p| p.metadata.name })
end

assert_requested(list, times: 1)
assert_requested(watch, times: 1)
end

def test_watches_for_add
stub_list
stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal(['a', 'b'], informer.list.map { |p| p.metadata.name })
end
end

def test_watches_for_delete
stub_list
stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'DELETED', object: { metadata: { name: 'b', uid: 'id1' } }
}.to_json << "\n",
status: 200
)

with_worker do
assert_equal([], informer.list.map { |p| p.metadata.name })
end
end

def test_restarts_on_error
list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(
body: { type: 'ERROR' }.to_json << "\n",
status: 200
)
slept = []
informer.stubs(:sleep).with { |x| slept << x; sleep(0.01) }

with_worker do
assert_equal(['a'], informer.list.map { |p| p.metadata.name })
sleep(0.05)
end

assert slept.size >= 2, slept
assert_requested(list, at_least_times: 2)
assert_requested(watch, at_least_times: 2)
end

def test_can_watch_watches
skip if RUBY_ENGINE == 'truffleruby' # TODO: some race condition in truffle-ruby

list = stub_list
watch = stub_request(:get, %r{/v1/watch/pods}).to_return(
body: {
type: 'ADDED', object: { metadata: { name: 'b', uid: 'id2' } }
}.to_json << "\n",
status: 200
)

seen1 = []
seen2 = []
seeer1 = Thread.new { informer.watch { |n| seen1 << n; break } }
seeer2 = Thread.new { informer.watch { |n| seen2 << n; break } }
sleep(0.01) until informer.instance_variable_get(:@watching).size == 2

with_worker do
assert_equal([['ADDED'], ['ADDED']], [seen1.map(&:type), seen2.map(&:type)])
end

assert_requested(list, times: 1)
assert_requested(watch, times: 1)
ensure
seeer1&.kill
seeer2&.kill
end

def test_timeout
timeout = 0.1
informer.instance_variable_set(:@reconcile_timeout, timeout)
stub_list
Kubeclient::Common::WatchStream.any_instance.expects(:finish)
stub_request(:get, %r{/v1/watch/pods})
with_worker { sleep(timeout * 1.9) }
end

private

def with_worker
informer.start_worker
sleep(0.03) # wait for worker to watch
yield
ensure
informer.stop_worker
end

def stub_list
stub_request(:get, %r{/v1/pods}).to_return(body: pods_reply.to_json, status: 200)
end

def client
@client ||= Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
end

def informer
@informer ||= Kubeclient::Informer.new(client, 'pods')
end

def pods_reply
@pods_reply ||= {
metadata: { resourceVersion: 1 },
items: [{ metadata: { name: 'a', uid: 'id1' } }]
}
end
end

0 comments on commit 19a5d14

Please sign in to comment.