Skip to content

Commit

Permalink
Merge pull request ManageIQ#18648 from agrare/use_systemd_for_worker_…
Browse files Browse the repository at this point in the history
…management

Allow use of systemd for worker management
  • Loading branch information
carbonin authored May 30, 2019
2 parents 57a1406 + 9ba6c3a commit c95298f
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 8 deletions.
7 changes: 7 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require File.join(Bundler::Plugin.index.load_paths("bundler-inject")[0], "bundle
#

gem "manageiq-gems-pending", ">0", :require => 'manageiq-gems-pending', :git => "https://github.com/ManageIQ/manageiq-gems-pending.git", :branch => "master"
gem "manageiq-loggers", ">0", :require => false, :git => "https://github.com/ManageIQ/manageiq-loggers", :branch => "master"

# Modified gems for gems-pending. Setting sources here since they are git references
gem "handsoap", "~>0.2.5", :require => false, :git => "https://github.com/ManageIQ/handsoap.git", :tag => "v0.2.5-5"

Expand Down Expand Up @@ -130,6 +132,11 @@ group :qpid_proton, :optional => true do
gem "qpid_proton", "~>0.26.0", :require => false
end

group :systemd, :optional => true do
gem "dbus-systemd", "~>1.1.0", :require => false
gem "systemd-journal", "~>1.4.0", :require => false
end

group :openshift, :manageiq_default do
manageiq_plugin "manageiq-providers-openshift"
end
Expand Down
1 change: 1 addition & 0 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def sync_workers
self.class.monitor_class_names.each do |class_name|
begin
c = class_name.constantize
c.ensure_systemd_files if c.systemd_worker?
result[c.name] = c.sync_workers
result[c.name][:adds].each { |pid| worker_add(pid) unless pid.nil? }
rescue => error
Expand Down
8 changes: 7 additions & 1 deletion app/models/miq_server/worker_management/monitor/quiesce.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ def quiesce_workers_loop
worker_monitor_poll = (@worker_monitor_settings[:poll] || 1.seconds).to_i_with_method

miq_workers.each do |w|
MiqEnvironment::Command.is_podified? && w.containerized_worker? ? w.delete_container_objects : stop_worker(w)
if w.containerized_worker?
w.delete_container_objects
elsif w.systemd_worker?
w.stop_systemd_worker
else
stop_worker(w)
end
end

loop do
Expand Down
2 changes: 2 additions & 0 deletions app/models/miq_server/worker_management/monitor/stop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def stop_worker(worker, monitor_status = :waiting_for_stop, monitor_reason = nil

if w.containerized_worker?
w.stop_container
elsif w.systemd_worker?
w.stop_systemd_worker
elsif w.respond_to?(:terminate)
w.terminate
else
Expand Down
33 changes: 30 additions & 3 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

class MiqWorker < ApplicationRecord
include_concern 'ContainerCommon'
include_concern 'SystemdCommon'
include UuidMixin

before_destroy :log_destroy_of_worker_messages
Expand Down Expand Up @@ -52,6 +53,14 @@ def self.workers
workers_configured_count
end

def self.scalable?
maximum_workers_count.nil? || maximum_workers_count > 1
end

def scalable?
self.class.scalable?
end

def self.workers_configured_count
count = worker_settings[:count]
if maximum_workers_count.kind_of?(Integer)
Expand Down Expand Up @@ -358,9 +367,19 @@ def containerized_worker?
self.class.containerized_worker?
end

def self.systemd_worker?
MiqEnvironment::Command.supports_systemd? && supports_systemd?
end

def systemd_worker?
self.class.systemd_worker?
end

def start_runner
if ENV['MIQ_SPAWN_WORKERS'] || !Process.respond_to?(:fork)
start_runner_via_spawn
elsif systemd_worker?
start_systemd_worker
elsif containerized_worker?
start_runner_via_container
else
Expand Down Expand Up @@ -416,7 +435,7 @@ def start_runner_via_spawn

def start
self.pid = start_runner
save unless containerized_worker?
save if !containerized_worker? && !systemd_worker?

msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
Expand Down Expand Up @@ -540,16 +559,24 @@ def friendly_name

delegate :normalized_type, :to => :class

def self.abbreviated_class_name
name.sub(/^ManageIQ::Providers::/, "")
end

def abbreviated_class_name
type.sub(/^ManageIQ::Providers::/, "")
self.class.abbreviated_class_name
end

def minimal_class_name
def self.minimal_class_name
abbreviated_class_name
.sub(/Miq/, "")
.sub(/Worker/, "")
end

def minimal_class_name
self.class.minimal_class_name
end

def database_application_name
zone = MiqServer.my_server.zone
"MIQ|#{Process.pid}|#{miq_server.compressed_id}|#{compressed_id}|#{zone.compressed_id}|#{minimal_class_name}|#{zone.name}".truncate(64)
Expand Down
178 changes: 178 additions & 0 deletions app/models/miq_worker/systemd_common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
class MiqWorker
module SystemdCommon
extend ActiveSupport::Concern

class_methods do
def supports_systemd?
return unless worker_settings[:systemd_enabled]
require "dbus/systemd"
rescue LoadError
false
end

def ensure_systemd_files
target_file_path.write(target_file) unless target_file_path.exist?
service_file_path.write(unit_file) unless service_file_path.exist?
end

def service_base_name
minimal_class_name.underscore.tr("/", "_")
end

def slice_base_name
"miq"
end

def service_name
scalable? ? service_base_name : "#{service_base_name}@"
end

def service_file_name
"#{service_name}.service"
end

def slice_name
"#{slice_base_name}-#{service_base_name}.slice"
end

def service_file_path
systemd_unit_dir.join(service_file_name)
end

def target_file_name
"#{service_base_name}.target"
end

def target_file_path
systemd_unit_dir.join(target_file_name)
end

def systemd_unit_dir
Pathname.new("/etc/systemd/system")
end

def target_file
<<~TARGET_FILE
[Unit]
PartOf=miq.target
TARGET_FILE
end

def unit_file
<<~UNIT_FILE
[Unit]
PartOf=#{target_file_name}
[Install]
WantedBy=#{target_file_name}
[Service]
WorkingDirectory=#{working_directory}
ExecStart=/bin/bash -lc '#{exec_start}'
Restart=always
Slice=#{slice_name}
UNIT_FILE
end

def working_directory
Rails.root
end

def exec_start
"exec ruby lib/workers/bin/run_single_worker.rb #{name} #{run_single_worker_args}"
end

def run_single_worker_args
"--heartbeat --guid=%i"
end
end

def start_systemd_worker
enable_systemd_unit
write_unit_settings_file
start_systemd_unit
end

def stop_systemd_worker
stop_systemd_unit
cleanup_unit_settings_file
disable_systemd_unit
end

def enable_systemd_unit(runtime: false, replace: true)
systemd.EnableUnitFiles([unit_name], runtime, replace)
end

def disable_systemd_unit(runtime: false)
systemd.DisableUnitFiles([unit_name], runtime)
end

def start_systemd_unit(mode: "replace")
systemd.StartUnit(unit_name, mode)
end

def stop_systemd_unit(mode: "replace")
systemd.StopUnit(unit_name, mode)
end

private

def systemd
@systemd ||= begin
require "dbus/systemd"
DBus::Systemd::Manager.new
end
end

def service_base_name
self.class.service_base_name
end

def unit_name
"#{service_base_name}#{unit_instance}.service"
end

def unit_instance
scalable? ? "" : "@#{guid}"
end

def write_unit_settings_file
FileUtils.mkdir_p(unit_config_path) unless unit_config_path.exist?
unit_config_file_path.write(unit_config_file) unless unit_config_file_path.exist?
end

def cleanup_unit_settings_file
unit_config_file_path.delete if unit_config_file_path.exist?
unit_config_path.delete if unit_config_path.exist?
end

def unit_config_name
"#{unit_name}.d"
end

def unit_config_path
self.class.systemd_unit_dir.join(unit_config_name)
end

def unit_config_file_path
unit_config_path.join("override.conf")
end

def unit_config_file
# Override this in a sub-class if the specific instance needs
# any additional config
<<~UNIT_CONFIG_FILE
[Service]
MemoryHigh=#{worker_settings[:memory_threshold].bytes}
TimeoutStartSec=#{worker_settings[:starting_timeout]}
TimeoutStopSec=#{worker_settings[:stopping_timeout]}
#{unit_environment_variables.map { |env_var| "Environment=#{env_var}" }.join("\n")}
UNIT_CONFIG_FILE
end

def unit_environment_variables
# Override this in a child class to add env vars
[
"HOME=/root"
]
end
end
end
4 changes: 4 additions & 0 deletions app/models/mixins/per_ems_worker_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ def ext_management_system
def worker_options
super.merge(:ems_id => ems_id)
end

def unit_environment_variables
super << "EMS_ID=#{ems_id}"
end
end
1 change: 1 addition & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@
:restart_interval: 0.hours
:starting_timeout: 10.minutes
:stopping_timeout: 10.minutes
:systemd_enabled: false
:embedded_ansible_worker:
:starting_timeout: 20.minutes
:poll: 10.seconds
Expand Down
7 changes: 6 additions & 1 deletion lib/miq_environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module MiqEnvironment
class Command
EVM_KNOWN_COMMANDS = %w( memcached memcached-tool service apachectl nohup)
EVM_KNOWN_COMMANDS = %w[apachectl memcached memcached-tool nohup service systemctl].freeze

def self.supports_memcached?
return @supports_memcached unless @supports_memcached.nil?
Expand All @@ -14,6 +14,11 @@ def self.supports_apache?
@supports_apache = is_appliance? && supports_command?('apachectl')
end

def self.supports_systemd?
return @supports_systemd unless @supports_systemd.nil?
@supports_systemd = is_appliance? && supports_command?('systemctl')
end

def self.supports_nohup_and_backgrounding?
return @supports_nohup unless @supports_nohup.nil?
@supports_nohup = is_appliance? && supports_command?('nohup')
Expand Down
14 changes: 14 additions & 0 deletions lib/vmdb/loggers.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require 'manageiq'
require 'manageiq-loggers'
require 'miq_environment'
require 'util/vmdb-logger'

module Vmdb
Expand All @@ -19,6 +21,7 @@ def self.init

def self.apply_config(config)
apply_config_value(config, $log, :level)
apply_config_value(config, $journald_log, :level) if $journald_log
apply_config_value(config, $rails_log, :level_rails)
apply_config_value(config, $ansible_tower_log, :level_ansible_tower)
apply_config_value(config, $api_log, :level_api)
Expand All @@ -45,6 +48,7 @@ def self.create_loggers

$audit_log = AuditLogger.new(path_dir.join("audit.log"))
$container_log = ContainerLogger.new
$journald_log = create_journald_logger
$log = create_multicast_logger(path_dir.join("evm.log"))
$rails_log = create_multicast_logger(path_dir.join("#{Rails.env}.log"))
$api_log = create_multicast_logger(path_dir.join("api.log"))
Expand Down Expand Up @@ -73,10 +77,20 @@ def self.create_loggers
def self.create_multicast_logger(log_file_path, logger_class = VMDBLogger)
logger_class.new(log_file_path).tap do |logger|
logger.extend(ActiveSupport::Logger.broadcast($container_log)) if ENV["CONTAINER"]
logger.extend(ActiveSupport::Logger.broadcast($journald_log)) if $journald_log
end
end
private_class_method :create_multicast_logger

private_class_method def self.create_journald_logger
return unless MiqEnvironment::Command.supports_systemd?

require "manageiq/loggers/journald"
ManageIQ::Loggers::Journald.new
rescue LoadError
nil
end

def self.configure_external_loggers
require 'awesome_spawn'
AwesomeSpawn.logger = $log
Expand Down
Loading

0 comments on commit c95298f

Please sign in to comment.