-
-
Notifications
You must be signed in to change notification settings - Fork 476
/
elastic_rest.rb
286 lines (251 loc) · 8.48 KB
/
elastic_rest.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# frozen_string_literal: true
require 'json'
require 'net/http'
require 'openssl'
# Parent class encapsulating general-use functions for children REST-based
# providers.
class Puppet::Provider::ElasticREST < Puppet::Provider
class << self
attr_accessor :api_discovery_uri, :api_resource_style, :api_uri, :discrete_resource_creation, :metadata, :metadata_pipeline, :query_string
end
# Fetch arbitrary metadata for the class from an instance object.
#
# @return String
def metadata
self.class.metadata
end
# Retrieve the class query_string variable
#
# @return String
def query_string
self.class.query_string
end
# Perform a REST API request against the indicated endpoint.
#
# @return Net::HTTPResponse
def self.rest(http,
req,
timeout = 10,
username = nil,
password = nil,
validate_tls: true)
if username && password
req.basic_auth username, password
elsif username || password
Puppet.warning(
'username and password must both be defined, skipping basic auth'
)
end
req['Accept'] = 'application/json'
http.read_timeout = timeout
http.open_timeout = timeout
http.verify_mode = OpenSSL::SSL::VERIFY_NONE unless validate_tls
begin
http.request req
rescue EOFError => e
# Because the provider attempts a best guess at API access, we
# only fail when HTTP operations fail for mutating methods.
unless %w[GET OPTIONS HEAD].include? req.method
raise Puppet::Error,
"Received '#{e}' from the Elasticsearch API. Are your API settings correct?"
end
end
end
# Helper to format a remote URL request for Elasticsearch which takes into
# account path ordering, et cetera.
def self.format_uri(resource_path, property_flush = {})
return api_uri if resource_path.nil? || api_resource_style == :bare
if discrete_resource_creation && !property_flush[:ensure].nil?
resource_path
else
case api_resource_style
when :prefix
"#{resource_path}/#{api_uri}"
else
"#{api_uri}/#{resource_path}"
end
end
end
# Fetch Elasticsearch API objects. Accepts a variety of argument functions
# dictating how to connect to the Elasticsearch API.
#
# @return Array
# an array of Hashes representing the found API objects, whether they be
# templates, pipelines, et cetera.
def self.api_objects(protocol = 'http',
host = 'localhost',
port = 9200,
timeout = 10,
username = nil,
password = nil,
ca_file = nil,
ca_path = nil,
validate_tls = true)
uri = URI("#{protocol}://#{host}:#{port}/#{format_uri(api_discovery_uri)}")
http = Net::HTTP.new uri.host, uri.port
req = Net::HTTP::Get.new uri.request_uri
http.use_ssl = uri.scheme == 'https'
[[ca_file, :ca_file=], [ca_path, :ca_path=]].each do |arg, method|
http.send method, arg if arg && http.respond_to?(method)
end
response = rest http, req, timeout, username, password, validate_tls: validate_tls
results = []
results = process_body(response.body) if response.respond_to?(:code) && response.code.to_i == 200
results
end
# Process the JSON response body
def self.process_body(body)
JSON.parse(body).map do |object_name, api_object|
{
:name => object_name,
:ensure => :present,
metadata => process_metadata(api_object),
:provider => name
}
end
end
# Passes API objects through arbitrary Procs/lambdas in order to postprocess
# API responses.
def self.process_metadata(raw_metadata)
if metadata_pipeline.is_a?(Array) && !metadata_pipeline.empty?
metadata_pipeline.reduce(raw_metadata) do |md, processor|
processor.call md
end
else
raw_metadata
end
end
# Fetch an array of provider objects from the Elasticsearch API.
def self.instances
api_objects.map { |resource| new resource }
end
# Unlike a typical #prefetch, which just ties discovered #instances to the
# correct resources, we need to quantify all the ways the resources in the
# catalog know about Elasticsearch API access and use those settings to
# fetch any templates we can before associating resources and providers.
def self.prefetch(resources)
# Get all relevant API access methods from the resources we know about
res = resources.map do |_, resource|
p = resource.parameters
[
p[:protocol].value,
p[:host].value,
p[:port].value,
p[:timeout].value,
(p.key?(:username) ? p[:username].value : nil),
(p.key?(:password) ? p[:password].value : nil),
(p.key?(:ca_file) ? p[:ca_file].value : nil),
(p.key?(:ca_path) ? p[:ca_path].value : nil),
(p.key?(:validate_tls) ? p[:validate_tls].value : true),
]
# Deduplicate identical settings, and fetch templates
end.uniq
res = res.map do |api|
api_objects(*api)
# Flatten and deduplicate the array, instantiate providers, and do the
# typical association dance
end
res.flatten.uniq.map { |resource| new resource }.each do |prov|
if (resource = resources[prov.name])
resource.provider = prov
end
end
end
def initialize(value = {})
super(value)
@property_flush = {}
end
# Generate a request body
def generate_body
JSON.generate(
if metadata != :content && @property_flush[:ensure] == :present
{ metadata.to_s => resource[metadata] }
else
resource[metadata]
end
)
end
# Call Elasticsearch's REST API to appropriately PUT/DELETE/or otherwise
# update any managed API objects.
def flush
Puppet.debug('Got to flush')
uri = URI(
format(
'%s://%s:%d/%s',
resource[:protocol],
resource[:host],
resource[:port],
self.class.format_uri(resource[:name], @property_flush)
)
)
uri.query = URI.encode_www_form query_string if query_string
Puppet.debug("Generated URI = #{uri.inspect}")
case @property_flush[:ensure]
when :absent
req = Net::HTTP::Delete.new uri.request_uri
else
req = Net::HTTP::Put.new uri.request_uri
req.body = generate_body
Puppet.debug("Generated body looks like: #{req.body.inspect}")
# As of Elasticsearch 6.x, required when requesting with a payload (so we
# set it always to be safe)
req['Content-Type'] = 'application/json' if req['Content-Type'].nil?
end
http = Net::HTTP.new uri.host, uri.port
http.use_ssl = uri.scheme == 'https'
%i[ca_file ca_path].each do |arg|
http.send "#{arg}=".to_sym, resource[arg] if !resource[arg].nil? && http.respond_to?(arg)
end
response = self.class.rest(
http,
req,
resource[:timeout],
resource[:username],
resource[:password],
validate_tls: resource[:validate_tls]
)
# Attempt to return useful error output
unless response.code.to_i == 200
Puppet.debug("Non-OK reponse: Body = #{response.body.inspect}")
json = JSON.parse(response.body)
err_msg = if json.key? 'error'
if json['error'].is_a?(Hash) \
&& json['error'].key?('root_cause')
# Newer versions have useful output
json['error']['root_cause'].first['reason']
else
# Otherwise fallback to old-style error messages
json['error']
end
else
# As a last resort, return the response error code
"HTTP #{response.code}"
end
raise Puppet::Error, "Elasticsearch API responded with: #{err_msg}"
end
@property_hash = self.class.api_objects(
resource[:protocol],
resource[:host],
resource[:port],
resource[:timeout],
resource[:username],
resource[:password],
resource[:ca_file],
resource[:ca_path],
resource[:validate_tls].nil? ? true : resource[:validate_tls]
).find do |t|
t[:name] == resource[:name]
end
end
# Set this provider's `:ensure` property to `:present`.
def create
@property_flush[:ensure] = :present
end
def exists?
@property_hash[:ensure] == :present
end
# Set this provider's `:ensure` property to `:absent`.
def destroy
@property_flush[:ensure] = :absent
end
end