Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow setting default :as for the whole client #299

Merged
merged 1 commit into from
Feb 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/kubeclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
require 'kubeclient/resource'
require 'kubeclient/resource_not_found_error'
require 'kubeclient/version'
require 'kubeclient/watch_notice'
require 'kubeclient/watch_stream'

module Kubeclient
Expand Down
57 changes: 32 additions & 25 deletions lib/kubeclient/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def initialize_client(
auth_options: DEFAULT_AUTH_OPTIONS,
socket_options: DEFAULT_SOCKET_OPTIONS,
timeouts: DEFAULT_TIMEOUTS,
http_proxy_uri: DEFAULT_HTTP_PROXY_URI
http_proxy_uri: DEFAULT_HTTP_PROXY_URI,
as: :ros
)
validate_auth_options(auth_options)
handle_uri(uri, path)
Expand All @@ -72,6 +73,7 @@ def initialize_client(
# @timeouts[:foo] == nil resulting in infinite timeout.
@timeouts = DEFAULT_TIMEOUTS.merge(timeouts)
@http_proxy_uri = http_proxy_uri ? http_proxy_uri.to_s : nil
@as = as

if auth_options[:bearer_token]
bearer_token(@auth_options[:bearer_token])
Expand Down Expand Up @@ -242,7 +244,11 @@ def watch_entities(resource_name, options = {})
WATCH_ARGUMENTS.each { |k, v| params[k] = options[v] if options[v] }
uri.query = URI.encode_www_form(params) if params.any?

Kubeclient::Common::WatchStream.new(uri, http_options(uri), as: options[:as] || :ros)
Kubeclient::Common::WatchStream.new(
uri,
http_options(uri),
formatter: ->(value) { format_response(options[:as] || @as, value) }
)
end

# Accepts the following options:
Expand All @@ -261,19 +267,7 @@ def get_entities(entity_type, resource_name, options = {})
rest_client[ns_prefix + resource_name]
.get({ 'params' => params }.merge(@headers))
end
return response.body if options[:as] == :raw

result = JSON.parse(response)

resource_version =
result.fetch('resourceVersion') do
result.fetch('metadata', {}).fetch('resourceVersion', nil)
end

# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
collection = result['items'].to_a.map { |item| Kubeclient::Resource.new(item) }

Kubeclient::Common::EntityList.new(entity_type, resource_version, collection)
format_response(options[:as] || @as, response.body, entity_type)
end

# Accepts the following options:
Expand All @@ -286,7 +280,7 @@ def get_entity(resource_name, name, namespace = nil, options = {})
rest_client[ns_prefix + resource_name + "/#{name}"]
.get(@headers)
end
format_response(options[:as], response)
format_response(options[:as] || @as, response.body)
end

# delete_options are passed as a JSON payload in the delete request
Expand All @@ -305,7 +299,7 @@ def delete_entity(resource_name, name, namespace = nil, delete_options: {})
)
)
end
format_response(:ros, response)
format_response(@as, response.body)
end

def create_entity(entity_type, resource_name, entity_config)
Expand All @@ -326,7 +320,7 @@ def create_entity(entity_type, resource_name, entity_config)
rest_client[ns_prefix + resource_name]
.post(hash.to_json, { 'Content-Type' => 'application/json' }.merge(@headers))
end
format_response(:ros, response)
format_response(@as, response.body)
end

def update_entity(resource_name, entity_config)
Expand All @@ -336,7 +330,7 @@ def update_entity(resource_name, entity_config)
rest_client[ns_prefix + resource_name + "/#{name}"]
.put(entity_config.to_h.to_json, { 'Content-Type' => 'application/json' }.merge(@headers))
end
format_response(:ros, response)
format_response(@as, response.body)
end

def patch_entity(resource_name, name, patch, namespace = nil)
Expand All @@ -348,7 +342,7 @@ def patch_entity(resource_name, name, patch, namespace = nil)
{ 'Content-Type' => 'application/strategic-merge-patch+json' }.merge(@headers)
)
end
format_response(:ros, response)
format_response(@as, response.body)
end

def all_entities(options = {})
Expand Down Expand Up @@ -389,7 +383,7 @@ def watch_pod_log(pod_name, namespace, container: nil)
uri.path += "/#{@api_version}/#{ns}pods/#{pod_name}/log"
uri.query = URI.encode_www_form(params)

Kubeclient::Common::WatchStream.new(uri, http_options(uri), as: :raw)
Kubeclient::Common::WatchStream.new(uri, http_options(uri), formatter: ->(value) { value })
end

def proxy_url(kind, name, port, namespace = '')
Expand Down Expand Up @@ -432,11 +426,24 @@ def api

private

def format_response(as, response)
return response.body if as == :raw
def format_response(as, body, list_type = nil)
return body if as == :raw

result = JSON.parse(response)
Kubeclient::Resource.new(result)
result = JSON.parse(body)

if list_type
resource_version =
result.fetch('resourceVersion') do
result.fetch('metadata', {}).fetch('resourceVersion', nil)
end

# result['items'] might be nil due to https://github.com/kubernetes/kubernetes/issues/13096
collection = result['items'].to_a.map { |item| Kubeclient::Resource.new(item) }

Kubeclient::Common::EntityList.new(list_type, resource_version, collection)
else
Kubeclient::Resource.new(result)
end
end

def load_entities
Expand Down
12 changes: 0 additions & 12 deletions lib/kubeclient/watch_notice.rb

This file was deleted.

12 changes: 3 additions & 9 deletions lib/kubeclient/watch_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ module Kubeclient
module Common
# HTTP Stream used to watch changes on entities
class WatchStream
def initialize(uri, http_options, as:)
def initialize(uri, http_options, formatter:)
@uri = uri
@http_client = nil
@http_options = http_options
@as = as
@formatter = formatter
end

def each
Expand All @@ -24,13 +24,7 @@ def each
response.body.each do |chunk|
buffer << chunk
while (line = buffer.slice!(/.+\n/))
result =
case @as
when :ros then WatchNotice.new(JSON.parse(line))
when :raw then line.chomp
else raise NotImplementedError, "Unsupported as #{@as.inspect}"
end
yield(result)
yield @formatter.call(line.chomp)
end
end
rescue IOError, Errno::EBADF
Expand Down
15 changes: 2 additions & 13 deletions test/test_watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_watch_pod_success
client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1')

client.watch_pods.to_enum.with_index do |notice, index|
assert_instance_of(Kubeclient::Common::WatchNotice, notice)
assert_instance_of(Kubeclient::Resource, notice)
assert_equal(expected[index]['type'], notice.type)
assert_equal('Pod', notice.object.kind)
assert_equal('php', notice.object.metadata.name)
Expand Down Expand Up @@ -62,24 +62,13 @@ def test_watch_stream_text
.to_return(body: open_test_file('pod_log.txt'),
status: 200)

stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, as: :raw)
stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, formatter: ->(v) { v })
stream.to_enum.with_index do |line, index|
assert_instance_of(String, line)
assert_equal(expected_lines[index], line)
end
end

# Ensure that WatchStream respects a format that's not JSON
def test_watch_stream_unknown
url = 'http://www.example.com/foobar'
stub_request(:get, url)
.to_return(body: open_test_file('pod_log.txt'),
status: 200)

stream = Kubeclient::Common::WatchStream.new(URI.parse(url), {}, as: :foo)
assert_raises(NotImplementedError) { stream.each {} }
end

def test_watch_with_resource_version
api_host = 'http://localhost:8080/api'
version = '1995'
Expand Down
37 changes: 0 additions & 37 deletions test/test_watch_notice.rb

This file was deleted.