Skip to content

Commit

Permalink
Merge pull request #285 from grosser/grosser/raw-watch
Browse files Browse the repository at this point in the history
implement as: :raw for watch
  • Loading branch information
Mooli Tayer authored Jan 15, 2018
2 parents f53c50d + 19d8742 commit 8098547
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 27 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ It is possible to interrupt the watcher from another thread with:
watcher.finish
```

Pass `as: :raw` to `watch_*` get raw replies.

#### Watch events for a particular object
You can use the `field_selector` option as part of the watch methods.

Expand Down
6 changes: 4 additions & 2 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ def rest_client
# :label_selector (string) - a selector to restrict the list of returned objects by labels.
# :field_selector (string) - a selector to restrict the list of returned objects by fields.
# :resource_version (string) - shows changes that occur after passed version of a resource.
# Default response type will return an entity as a RecursiveOpenStruct
# (:ros) object, unless `:as` is passed with `:raw`.
def watch_entities(resource_name, options = {})
ns = build_namespace_prefix(options[:namespace])

Expand All @@ -258,7 +260,7 @@ def watch_entities(resource_name, options = {})
WATCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }
uri.query = URI.encode_www_form(params) if params.any?

Kubeclient::Common::WatchStream.new(uri, http_options(uri))
Kubeclient::Common::WatchStream.new(uri, http_options(uri), as: options[:as] || :ros)
end

# Accepts the following options:
Expand Down Expand Up @@ -411,7 +413,7 @@ def watch_pod_log(pod_name, namespace, container: nil)
uri.path += "/#{@api_version}/#{ns}pods/#{pod_name}/log"
uri.query = URI.encode_www_form(params)

Kubeclient::Common::WatchStream.new(uri, http_options(uri), format: :text)
Kubeclient::Common::WatchStream.new(uri, http_options(uri), as: :raw)
end

def proxy_url(kind, name, port, namespace = '')
Expand Down
12 changes: 9 additions & 3 deletions lib/kubeclient/watch_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ module Kubeclient
module Common
# HTTP Stream used to watch changes on entities
class WatchStream
def initialize(uri, http_options, format: :json)
def initialize(uri, http_options, as:)
@uri = uri
@http_client = nil
@http_options = http_options
@format = format
@as = as
end

def each
Expand All @@ -24,7 +24,13 @@ def each
response.body.each do |chunk|
buffer << chunk
while (line = buffer.slice!(/.+\n/))
yield(@format == :json ? WatchNotice.new(JSON.parse(line)) : line.chomp)
result =
case @as
when :ros then WatchNotice.new(JSON.parse(line))
when :raw then line.chomp
else raise NotImplementedError, "Unsupported as #{@as.inspect}"
end
yield(result)
end
end
rescue IOError, Errno::EBADF
Expand Down
67 changes: 45 additions & 22 deletions test/test_watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
# Watch entity tests
class TestWatch < MiniTest::Test
def test_watch_pod_success
stub_resource_list

expected = [
{ 'type' => 'ADDED', 'resourceVersion' => '1389' },
{ 'type' => 'MODIFIED', 'resourceVersion' => '1390' },
{ 'type' => 'DELETED', 'resourceVersion' => '1398' }
]

stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)

stub_request(:get, %r{.*\/watch/pods})
stub_request(:get, %r{/watch/pods})
.to_return(body: open_test_file('watch_stream.json'),
status: 200)

Expand All @@ -29,11 +27,24 @@ def test_watch_pod_success
end
end

def test_watch_pod_raw
stub_resource_list

stub_request(:get, %r{/watch/pods}).to_return(
body: open_test_file('watch_stream.json'),
status: 200
)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

got = nil
client.watch_pods(as: :raw).each { |notice| got = notice }
assert_match(/\A{"type":"DELETED"/, got)
end

def test_watch_pod_failure
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)
stub_request(:get, %r{.*\/watch/pods}).to_return(status: 404)
stub_resource_list
stub_request(:get, %r{/watch/pods}).to_return(status: 404)

client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')
assert_raises(Kubeclient::HttpError) do
Expand All @@ -51,19 +62,28 @@ def test_watch_stream_text
.to_return(body: open_test_file('pod_log.txt'),
status: 200)

stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, format: :txt)
stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, as: :raw)
stream.to_enum.with_index do |line, index|
assert_instance_of(String, line)
assert_equal(expected_lines[index], line)
end
end

# Ensure that WatchStream respects a format that's not JSON
def test_watch_stream_unknown
url = 'http://www.example.com/foobar'
stub_request(:get, url)
.to_return(body: open_test_file('pod_log.txt'),
status: 200)

stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, as: :foo)
assert_raises(NotImplementedError) { stream.each {} }
end

def test_watch_with_resource_version
api_host = 'http://localhost:8080/api'
version = '1995'
stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)
stub_resource_list
stub_request(:get, %r{.*\/watch/events})
.to_return(body: open_test_file('watch_stream.json'),
status: 200)
Expand All @@ -81,9 +101,7 @@ def test_watch_with_label_selector
api_host = 'http://localhost:8080/api'
selector = 'name=redis-master'

stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)
stub_resource_list
stub_request(:get, %r{.*\/watch/events})
.to_return(body: open_test_file('watch_stream.json'),
status: 200)
Expand All @@ -101,9 +119,7 @@ def test_watch_with_field_selector
api_host = 'http://localhost:8080/api'
selector = 'involvedObject.kind=Pod'

stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)
stub_resource_list
stub_request(:get, %r{.*\/watch/events})
.to_return(body: open_test_file('watch_stream.json'),
status: 200)
Expand All @@ -120,9 +136,7 @@ def test_watch_with_field_selector
def test_watch_with_finish_and_ebadf
api_host = 'http://localhost:8080/api'

stub_request(:get, %r{/api/v1$})
.to_return(body: open_test_file('core_api_resource_list.json'),
status: 200)
stub_resource_list
stub_request(:get, %r{.*\/watch/events})
.to_return(body: open_test_file('watch_stream.json'), status: 200)

Expand All @@ -137,4 +151,13 @@ def test_watch_with_finish_and_ebadf

assert_requested(:get, "#{api_host}/v1/watch/events", times: 1)
end

private

def stub_resource_list
stub_request(:get, %r{/api/v1$}).to_return(
body: open_test_file('core_api_resource_list.json'),
status: 200
)
end
end

0 comments on commit 8098547

Please sign in to comment.