forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
miq_worker.rb
572 lines (462 loc) · 16.6 KB
/
miq_worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
require 'io/wait'
class MiqWorker < ApplicationRecord
include UuidMixin
before_destroy :log_destroy_of_worker_messages
belongs_to :miq_server
has_many :messages, :as => :handler, :class_name => 'MiqQueue'
has_many :active_messages, -> { where(["state = ?", "dequeue"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :ready_messages, -> { where(["state = ?", "ready"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :processed_messages, -> { where(["state != ?", "ready"]) }, :as => :handler, :class_name => 'MiqQueue', :dependent => :destroy
virtual_column :friendly_name, :type => :string
virtual_column :uri_or_queue_name, :type => :string
STATUS_CREATING = 'creating'.freeze
STATUS_STARTING = 'starting'.freeze
STATUS_STARTED = 'started'.freeze
STATUS_READY = 'ready'.freeze
STATUS_WORKING = 'working'.freeze
STATUS_STOPPING = 'stopping'.freeze
STATUS_STOPPED = 'stopped'.freeze
STATUS_KILLED = 'killed'.freeze
STATUS_ABORTED = 'aborted'.freeze
STATUSES_STARTING = [STATUS_CREATING, STATUS_STARTING]
STATUSES_CURRENT = [STATUS_STARTED, STATUS_READY, STATUS_WORKING]
STATUSES_STOPPED = [STATUS_STOPPED, STATUS_KILLED, STATUS_ABORTED]
STATUSES_CURRENT_OR_STARTING = STATUSES_CURRENT + STATUSES_STARTING
STATUSES_ALIVE = STATUSES_CURRENT_OR_STARTING + [STATUS_STOPPING]
PROCESS_INFO_FIELDS = %i(priority memory_usage percent_memory percent_cpu memory_size cpu_time proportional_set_size)
PROCESS_TITLE_PREFIX = "MIQ:".freeze
def self.atShutdown
stop_all_workers
end
class << self
attr_writer :workers
end
def self.workers
return (self.has_minimal_env_option? ? 1 : 0) if MiqServer.minimal_env? && check_for_minimal_role
return @workers.call if @workers.kind_of?(Proc)
return @workers unless @workers.nil?
workers_configured_count
end
def self.workers_configured_count
count = worker_settings[:count]
if maximum_workers_count.kind_of?(Integer)
count = maximum_workers_count if maximum_workers_count < count
end
count
end
def self.has_minimal_env_option?
roles = if required_roles.kind_of?(Proc)
required_roles.call
else
required_roles
end
return false if MiqServer.minimal_env_options.empty? || roles.blank?
roles = Array(roles) if roles.kind_of?(String)
raise _("Unexpected type: <self.required_roles.class.name>") unless roles.kind_of?(Array)
roles.any? { |role| MiqServer.minimal_env_options.include?(role) }
end
class_attribute :check_for_minimal_role, :default_queue_name, :required_roles, :maximum_workers_count, :include_stopping_workers_on_synchronize
self.include_stopping_workers_on_synchronize = false
self.check_for_minimal_role = true
self.required_roles = []
def self.server_scope(server_id = nil)
return current_scope if current_scope && current_scope.where_values_hash.include?('miq_server_id')
if server_id.nil?
server = MiqServer.my_server
server_id = server.id unless server.nil?
end
where(:miq_server_id => server_id)
end
CONDITION_CURRENT = {:status => STATUSES_CURRENT}
def self.find_current(server_id = nil)
server_scope(server_id).where(CONDITION_CURRENT)
end
def self.find_current_in_region(region)
in_region(region).where(CONDITION_CURRENT)
end
def self.find_current_in_my_region
in_my_region.where(CONDITION_CURRENT)
end
def self.find_current_in_zone(zone_id)
where(CONDITION_CURRENT.merge(:miq_server_id => Zone.find(zone_id).miq_servers)).to_a
end
def self.find_current_in_my_zone
where(CONDITION_CURRENT.merge(:miq_server_id => MiqServer.my_server.zone.miq_servers)).to_a
end
def self.find_starting(server_id = nil)
server_scope(server_id).where(:status => STATUSES_STARTING)
end
def self.find_current_or_starting(server_id = nil)
server_scope(server_id).where(:status => STATUSES_CURRENT_OR_STARTING)
end
def self.find_alive(server_id = nil)
server_scope(server_id).where(:status => STATUSES_ALIVE)
end
def self.has_required_role?
roles = if required_roles.kind_of?(Proc)
required_roles.call
else
required_roles
end
return true if roles.blank?
roles = Array(roles) if roles.kind_of?(String)
raise _("Unexpected type: <self.required_roles.class.name>") unless roles.kind_of?(Array)
roles.any? { |role| MiqServer.my_server.has_active_role?(role) }
end
def self.enough_resource_to_start_worker?
MiqServer.my_server.enough_resource_to_start_worker?(self)
end
def self.sync_workers
w = include_stopping_workers_on_synchronize ? find_alive : find_current_or_starting
current = w.length
desired = self.has_required_role? ? workers : 0
result = {:adds => [], :deletes => []}
if current != desired
_log.info("Workers are being synchronized: Current #: [#{current}], Desired #: [#{desired}]")
if desired > current && enough_resource_to_start_worker?
(desired - current).times { result[:adds] << start_worker.pid }
elsif desired < current
w = w.to_a
(current - desired).times do
ww = w.pop
result[:deletes] << ww.pid
ww.stop
end
end
end
result
end
# Convert the Models name from MiqGenericWorker to :generic_worker
def self.settings_name
@settings_name ||=
if self == MiqWorker
:worker_base
elsif parent.try(:short_token)
# :generic_worker_infra, :generic_worker_vmware
:"#{normalized_type}_#{parent.short_token.underscore}"
else
# :generic_worker
normalized_type.to_sym
end
end
# Grab all the classes in the hierarchy below ActiveRecord::Base
def self.path_to_my_worker_settings
@path_to_my_worker_settings ||=
ancestors.grep(Class).select { |c| c <= MiqWorker }.reverse.collect(&:settings_name)
end
def self.fetch_worker_settings_from_server(miq_server, options = {})
settings = {}
unless miq_server.nil?
server_config = options[:config] || miq_server.get_config("vmdb")
server_config = server_config.config if server_config.respond_to?(:config)
# Get the configuration values
section = server_config[:workers]
unless section.nil?
classes = path_to_my_worker_settings
classes.each do |c|
section = section[c]
raise _("Missing config section %{section_name}") % {:section_name => c} if section.nil?
defaults = section[:defaults]
settings.merge!(defaults) unless defaults.nil?
end
settings.merge!(section)
# If not specified, provide the worker_settings cleaned up in fixnums, etc. instead of 1.seconds, 10.megabytes
raw = options[:raw] == true
# Clean up the configuration values in a format like "30.seconds"
unless raw
settings.keys.each do |k|
settings[k] = settings[k].to_i_with_method if settings[k].respond_to?(:to_i_with_method) && settings[k].number_with_method?
end
end
end
end
settings
end
def worker_settings(options = {})
self.class.fetch_worker_settings_from_server(miq_server, options)
end
def heartbeat_file
@heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(guid)
end
def self.worker_settings(options = {})
fetch_worker_settings_from_server(MiqServer.my_server, options)
end
def self.start_workers
return unless self.has_required_role?
workers.times { start_worker }
end
def self.stop_workers(server_id = nil)
server_scope(server_id).each(&:stop)
end
def self.restart_workers(server_id = nil)
find_current(server_id).each(&:restart)
end
def self.status_update
find_current.each(&:status_update)
end
def self.log_status(level = :info)
find_current.each { |w| w.log_status(level) }
end
def self.create_worker_record(*params)
params = params.first
params = {} unless params.kind_of?(Hash)
params[:queue_name] = default_queue_name unless params.key?(:queue_name) || default_queue_name.nil?
params[:status] = STATUS_CREATING
params[:last_heartbeat] = Time.now.utc
server_scope.create(params)
end
def self.start_worker(*params)
w = create_worker_record(*params)
w.start
w
end
cache_with_timeout(:my_worker) { server_scope.find_by(:pid => Process.pid) }
def self.find_all_current(server_id = nil)
MiqWorker.find_current(server_id)
end
def self.stop_all_workers(server_id = nil)
MiqWorker.stop_workers(server_id)
end
def self.restart_all_workers(server_id = nil)
MiqWorker.restart_workers(server_id)
end
def self.status_update_all
MiqWorker.status_update
end
def self.log_status_all(level = :info)
MiqWorker.log_status(level)
end
def self.send_message_to_worker_monitor(wid, message, *args)
w = MiqWorker.find_by(:id => wid)
raise _("Worker with id=<%{id}> does not exist") % {:id => wid} if w.nil?
w.send_message_to_worker_monitor(message, *args)
end
def send_message_to_worker_monitor(message, *args)
MiqQueue.put_deprecated(
:class_name => 'MiqServer',
:instance_id => miq_server.id,
:method_name => 'message_for_worker',
:args => [id, message, *args],
:queue_name => 'miq_server',
:zone => miq_server.zone.name,
:server_guid => miq_server.guid
)
end
def self.before_fork
preload_for_worker_role if respond_to?(:preload_for_worker_role)
end
def self.after_fork
close_pg_sockets_inherited_from_parent
DRb.stop_service
renice(Process.pid)
end
# When we fork, the children inherits the parent's file descriptors
# so we need to close any inherited raw pg sockets in the child.
def self.close_pg_sockets_inherited_from_parent
owner_to_pool = ActiveRecord::Base.connection_handler.instance_variable_get(:@owner_to_pool)
owner_to_pool[Process.ppid].values.compact.each do |pool|
pool.connections.each do |conn|
socket = conn.raw_connection.socket
_log.info("Closing socket: #{socket}")
IO.for_fd(socket).close
end
end
end
# Overriding queue_name as now some queue names can be
# arrays of names for some workers not just a singular name.
# We use JSON.parse as the array of names is stored as a string.
# This converts it back to a Ruby Array safely.
def queue_name
begin
JSON.parse(self[:queue_name])
rescue JSON::ParserError, TypeError
self[:queue_name]
end
end
def start_runner
if ENV['MIQ_SPAWN_WORKERS'] || !Process.respond_to?(:fork)
start_runner_via_spawn
else
start_runner_via_fork
end
end
def start_runner_via_fork
self.class.before_fork
pid = fork(:cow_friendly => true) do
self.class.after_fork
self.class::Runner.start_worker(worker_options)
exit!
end
Process.detach(pid)
pid
end
def self.build_command_line(guid)
command_line = "#{Gem.ruby} #{runner_script} --heartbeat --guid=#{guid} #{name}"
ENV['APPLIANCE'] ? "nice #{nice_increment} #{command_line}" : command_line
end
def self.runner_script
script = ManageIQ.root.join("lib/workers/bin/run_single_worker.rb")
raise "script not found: #{script}" unless File.exist?(script)
script
end
def start_runner_via_spawn
pid = Kernel.spawn(self.class.build_command_line(guid), [:out, :err] => [Rails.root.join("log", "evm.log"), "a"])
Process.detach(pid)
pid
end
def start
self.pid = start_runner
save
msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
_log.info(msg)
self
end
def stop
miq_server.stop_worker_queue(self)
end
# Let the worker monitor start a new worker
alias_method :restart, :stop
def kill
unless pid.nil?
begin
_log.info("Killing worker: ID [#{id}], PID [#{pid}], GUID [#{guid}], status [#{status}]")
Process.kill(9, pid)
rescue Errno::ESRCH
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed")
rescue => err
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed, but with the following error: #{err}")
end
end
destroy
end
def is_current?
STATUSES_CURRENT.include?(status)
end
def is_alive?
STATUSES_ALIVE.include?(status) && actually_running?
end
def is_stopped?
STATUSES_STOPPED.include?(status)
end
def started?
STATUS_STARTED == status
end
def actually_running?
MiqProcess.is_worker?(pid)
end
def enabled_or_running?
!is_stopped? || actually_running?
end
def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || Workers::MiqDefaults.stopping_timeout
status == MiqWorker::STATUS_STOPPING && (last_heartbeat + current_timeout.to_i) < stopping_timeout.seconds.ago
end
def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
def clean_active_messages
active_messages.each do |m|
_log.warn("Message id: [#{m.id}] Setting state to 'error'")
m.delivered_in_error('Clean Active Messages')
end
end
def log_destroy_of_worker_messages
ready_messages.each do |m|
_log.warn("Nullifying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
m.update_attributes(:handler_id => nil, :handler_type => nil) rescue nil
end
processed_messages.each do |m|
_log.warn("Destroying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
end
end
def status_update
begin
pinfo = MiqProcess.processInfo(pid)
rescue Errno::ESRCH
update(:status => STATUS_ABORTED)
_log.warn("No such process [#{friendly_name}] with PID=[#{pid}], aborting worker.")
rescue => err
_log.warn("Unexpected error: #{err.message}, while requesting process info for [#{friendly_name}] with PID=[#{pid}]")
else
# Ensure the hash only contains the values we want to store in the table
pinfo.slice!(*PROCESS_INFO_FIELDS)
pinfo[:os_priority] = pinfo.delete(:priority)
update_attributes!(pinfo)
end
end
def log_status(level = :info)
_log.send(level, "[#{friendly_name}] Worker ID [#{id}], PID [#{pid}], GUID [#{guid}], Last Heartbeat [#{last_heartbeat}], Process Info: Memory Usage [#{memory_usage}], Memory Size [#{memory_size}], Proportional Set Size: [#{proportional_set_size}], Memory % [#{percent_memory}], CPU Time [#{cpu_time}], CPU % [#{percent_cpu}], Priority [#{os_priority}]")
end
def current_timeout
msg = active_messages.first
msg.nil? ? nil : msg.msg_timeout
end
def uri_or_queue_name
uri || queue_name
end
def friendly_name
normalized_type.titleize
end
delegate :normalized_type, :to => :class
def abbreviated_class_name
type.sub(/^ManageIQ::Providers::/, "")
end
def minimal_class_name
abbreviated_class_name
.sub(/Miq/, "")
.sub(/Worker/, "")
end
def database_application_name
zone = MiqServer.my_server.zone
"MIQ|#{Process.pid}|#{miq_server.compressed_id}|#{compressed_id}|#{zone.compressed_id}|#{minimal_class_name}|#{zone.name}".truncate(64)
end
def format_full_log_msg
"Worker [#{self.class}] with ID: [#{id}], PID: [#{pid}], GUID: [#{guid}]"
end
def format_short_log_msg
"Worker ID: [#{id}]"
end
def self.release_db_connection
ActiveRecord::Base.connection_pool.release_connection if ActiveRecord::Base.connected?
end
def update_heartbeat
update_attribute(:last_heartbeat, Time.now.utc)
end
def self.config_settings_path
@config_settings_path ||= [:workers] + path_to_my_worker_settings
end
class << self
attr_writer :config_settings_path
end
def update_spid(spid = ActiveRecord::Base.connection.spid)
self.sql_spid = spid
end
def update_spid!(spid = ActiveRecord::Base.connection.spid)
if sql_spid != spid
self.sql_spid = spid
save
end
end
def worker_options
{:guid => guid}
end
def self.normalized_type
@normalized_type ||= if parent == Object
name.sub(/^Miq/, '').underscore
else
name.demodulize.underscore
end
end
def self.renice(pid)
AwesomeSpawn.run("renice", :params => {:n => nice_increment, :p => pid })
end
def self.nice_increment
delta = worker_settings[:nice_delta]
delta.kind_of?(Integer) ? delta.to_s : "+10"
end
private_class_method :nice_increment
end