Skip to content

Commit

Permalink
Completely remove the MiqVimBrokerWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Jan 3, 2020
1 parent 6bf7f94 commit 070edd1
Show file tree
Hide file tree
Showing 13 changed files with 16 additions and 118 deletions.
11 changes: 0 additions & 11 deletions app/models/job_proxy_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def dispatch

# Skip work if there are no jobs to dispatch
if jobs_to_dispatch.length > 0
broker_available, = Benchmark.realtime_block(:miq_vim_broker_available) { MiqVimBrokerWorker.available_in_zone?(@zone) }
logged_broker_unavailable = false

Benchmark.realtime_block(:active_vm_scans) { active_vm_scans_by_zone }
Expand Down Expand Up @@ -52,16 +51,6 @@ def dispatch
next
end

if @vm.kind_of?(ManageIQ::Providers::Vmware::InfraManager::Vm) || @vm.kind_of?(ManageIQ::Providers::Vmware::InfraManager::Template)
unless broker_available
unless logged_broker_unavailable
_log.warn("Skipping dispatch because broker is currently unavailable")
logged_broker_unavailable = true
end
next
end
end

proxy = nil
if @all_busy_by_host_id_storage_id["#{@vm.host_id}_#{@vm.storage_id}"]
_log.debug("Skipping job id [#{job.id}] guid [#{job.guid}] for vm: [#{@vm.id}] in this dispatch since a prior job with the same host [#{@vm.host_id}] and storage [#{@vm.storage_id}] determined that all resources are busy.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
class ManageIQ::Providers::BaseManager::MetricsCollectorWorker::Runner < ::MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_metrics_collector role, TODO: only for VMware
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class ManageIQ::Providers::BaseManager::RefreshWorker::Runner < ::MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_inventory role, TODO: only for VMware

def after_initialize
@emss = ExtManagementSystem.find([@cfg[:ems_id]])
@emss.each do |ems|
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_generic_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
class MiqGenericWorker::Runner < MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_operations and smartstate roles
end
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_message
end

def message_delivery_suspended?
self.class.delay_queue_delivery_for_vim_broker? && !MiqVimBrokerWorker.available?
false
end

def deliver_queue_message(msg, &block)
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def worker_not_responding(w)
_log.warn(msg)
MiqEvent.raise_evm_event_queue(w.miq_server, "evm_worker_killed", :event_details => msg, :type => w.class.name)
w.kill
MiqVimBrokerWorker.cleanup_for_pid(w.pid)
end

def sync_workers
Expand Down
2 changes: 0 additions & 2 deletions app/models/miq_smart_proxy_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class MiqSmartProxyWorker::Runner < MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For smartproxy role

def do_before_work_loop
@tid = start_heartbeat_thread
end
Expand Down
34 changes: 0 additions & 34 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,6 @@ def worker_monitor_drb
end
end

###############################
# VimBrokerWorker Methods
###############################

def self.delay_startup_for_vim_broker?
!!@delay_startup_for_vim_broker
end

class << self
attr_writer :delay_startup_for_vim_broker
end

def self.delay_queue_delivery_for_vim_broker?
!!@delay_queue_delivery_for_vim_broker
end

class << self
attr_writer :delay_queue_delivery_for_vim_broker

alias require_vim_broker? delay_queue_delivery_for_vim_broker?
alias require_vim_broker= delay_queue_delivery_for_vim_broker=
end

def start
prepare
run
Expand All @@ -128,7 +105,6 @@ def prepare
set_database_application_name
ObjectSpace.garbage_collect
started_worker_record
do_delay_startup_for_vim_broker if self.class.delay_startup_for_vim_broker? && MiqVimBrokerWorker.workers > 0
do_before_work_loop
self
end
Expand Down Expand Up @@ -284,16 +260,6 @@ def do_work
raise NotImplementedError, _("must be implemented in a subclass")
end

def do_delay_startup_for_vim_broker
_log.info("#{log_prefix} Checking that VIM Broker has started before doing work")
loop do
break if MiqVimBrokerWorker.available?
heartbeat
sleep 3
end
_log.info("#{log_prefix} Starting work since VIM Broker has started")
end

def do_work_loop
warn_about_heartbeat_skipping if skip_heartbeat?
loop do
Expand Down
17 changes: 1 addition & 16 deletions app/models/vm_or_template/operations/snapshot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,7 @@ def raw_remove_snapshot_by_description(description, refresh = false)
end

def remove_snapshot_by_description(description, refresh = false, retry_time = nil)
if (ext_management_system.kind_of?(ManageIQ::Providers::Vmware::InfraManager) && ManageIQ::Providers::Vmware::InfraManager.use_vim_broker? && MiqVimBrokerWorker.available?) || host.nil? || host.state == "on"
raw_remove_snapshot_by_description(description, refresh)
else
if retry_time.nil?
raise _("The VM's Host system is unavailable to remove the snapshot. VM id:[%{id}] Snapshot description:[%{description}]") %
{:id => id, :descrption => description}
end
# If the host is off re-queue the action based on the retry_time
MiqQueue.put(:class_name => self.class.name,
:instance_id => id,
:method_name => 'remove_snapshot_by_description',
:args => [description, refresh, retry_time],
:deliver_on => Time.now.utc + retry_time,
:role => "smartstate",
:zone => my_zone)
end
raw_remove_snapshot_by_description(description, refresh)
end

def raw_remove_all_snapshots
Expand Down
36 changes: 4 additions & 32 deletions app/models/vm_scan.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ def load_transitions
{
:initializing => {'initialize' => 'waiting_to_start'},
:snapshot_delete => {'scanning' => 'snapshot_delete'},
:broker_unavailable => {'snapshot_create' => 'wait_for_broker'},
:scan_retry => {'scanning' => 'scanning'},
:abort_retry => {'scanning' => 'scanning'},
:abort_job => {'*' => 'aborting'},
:cancel => {'*' => 'canceling'},
:finish => {'*' => 'finished'},
:error => {'*' => '*'},
:start => {'waiting_to_start' => 'wait_for_policy'},
:start_snapshot => {'wait_for_policy' => 'snapshot_create',
'wait_for_broker' => 'snapshot_create'},
:start_snapshot => {'wait_for_policy' => 'snapshot_create'},
:snapshot_complete => {'snapshot_create' => 'scanning',
'snapshot_delete' => 'synchronizing'},
:data => {'snapshot_create' => 'scanning',
Expand Down Expand Up @@ -108,13 +106,6 @@ def call_snapshot_create
elsif vm.require_snapshot_for_scan?
proxy = MiqServer.find(miq_server_id)

# Check if the broker is available
if MiqServer.use_broker_for_embedded_proxy? && !MiqVimBrokerWorker.available?
_log.warn("VimBroker is not available")
signal(:broker_unavailable)
return
end

if proxy && proxy.forceVmScan
options[:snapshot] = :smartProxy
_log.info("Skipping snapshot creation, it will be performed by the SmartProxy")
Expand Down Expand Up @@ -143,19 +134,6 @@ def call_snapshot_create
end
end

def wait_for_vim_broker
_log.info("Enter")
i = 0
loop do
set_status("Waiting for VimBroker to become available (#{i += 1})")
sleep(60)
_log.info("Checking VimBroker connection status. Count=[#{i}]")
break if MiqVimBrokerWorker.available?
end

signal(:start_snapshot)
end

def call_scan
_log.info("Enter")

Expand Down Expand Up @@ -427,13 +405,8 @@ def delete_snapshot_by_description(mor, vm)
server = miqVimHost[:hostname] || miqVimHost[:ipaddress]
begin
password_decrypt = ManageIQ::Password.decrypt(miqVimHost[:password])
if MiqServer.use_broker_for_embedded_proxy?(ems_type)
$vim_broker_client ||= MiqVimBroker.new(:client, MiqVimBrokerWorker.drb_port)
miqVim = $vim_broker_client.getMiqVim(server, miqVimHost[:username], password_decrypt)
else
require 'VMwareWebService/MiqVim'
miqVim = MiqVim.new(server, miqVimHost[:username], password_decrypt)
end
require 'VMwareWebService/MiqVim'
miqVim = MiqVim.new(server, miqVimHost[:username], password_decrypt)

vimVm = miqVim.getVimVm(vm.path)
vimVm.removeSnapshotByDescription(mor, true) unless vimVm.nil?
Expand Down Expand Up @@ -559,7 +532,6 @@ def abort_retry(*args)
alias_method :start, :call_check_policy
alias_method :start_snapshot, :call_snapshot_create
alias_method :snapshot_delete, :call_snapshot_delete
alias_method :broker_unavailable, :wait_for_vim_broker
alias_method :abort_job, :process_abort
alias_method :cancel, :process_cancel
alias_method :finish, :process_finished
Expand All @@ -579,7 +551,7 @@ def create_snapshot(vm)
rescue Exception => err
msg = "Failed to create evm snapshot with EMS. Error: [#{err.class.name}]: [#{err}]"
_log.error(msg)
err.kind_of?(MiqException::MiqVimBrokerUnavailable) ? signal(:broker_unavailable) : signal(:abort, msg, "error")
signal(:abort, msg, "error")
return false
end
context[:snapshot_mor] = sn
Expand Down
2 changes: 0 additions & 2 deletions spec/models/job_proxy_dispatcher_embedded_scan_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ def assert_at_most_x_scan_jobs_per_y_resource(x_scans, y_resource)

context "and a scan job for each vm" do
before do
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)

@jobs = @vms.collect(&:raw_scan)
end

Expand Down
5 changes: 0 additions & 5 deletions spec/models/job_proxy_dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
context "with a vm without a storage" do
before do
# Test a vm without a storage (ie, removed from VC but retained in the VMDB)
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage = nil
@vm.save
Expand All @@ -103,7 +102,6 @@
context "with a Microsoft vm without a storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage = nil
@vm.vendor = "microsoft"
Expand All @@ -119,7 +117,6 @@
context "with a Microsoft vm with a Microsoft storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage.store_type = "CSVFS"
@vm.vendor = "microsoft"
Expand All @@ -135,7 +132,6 @@
context "with a Microsoft vm with an invalid storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage.store_type = "XFS"
@vm.vendor = "microsoft"
Expand All @@ -151,7 +147,6 @@

context "with jobs, a default smartproxy for repo scanning" do
before do
allow(MiqVimBrokerWorker).to receive(:available?).and_return(true)
@repo_proxy = @proxies.last
if @repo_proxy
@repo_proxy.name = "repo_proxy"
Expand Down
20 changes: 10 additions & 10 deletions spec/models/miq_action_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
tenant = FactoryBot.create(:tenant)
group = FactoryBot.create(:miq_group, :tenant => tenant)
@user = FactoryBot.create(:user, :userid => "test", :miq_groups => [group])
@vm = FactoryBot.create(:vm_vmware, :evm_owner => @user, :miq_group => group)
@vm = FactoryBot.create(:vm_infra, :evm_owner => @user, :miq_group => group)
@action = FactoryBot.create(:miq_action)
expect(@action).not_to be_nil
@action.options = {:ae_request => "test_custom_automation"}
Expand Down Expand Up @@ -64,9 +64,9 @@

context "#action_evm_event" do
it "for Vm" do
ems = FactoryBot.create(:ems_vmware)
host = FactoryBot.create(:host_vmware)
vm = FactoryBot.create(:vm_vmware, :host => host, :ext_management_system => ems)
ems = FactoryBot.create(:ems_infra)
host = FactoryBot.create(:host)
vm = FactoryBot.create(:vm_infra, :host => host, :ext_management_system => ems)
action = FactoryBot.create(:miq_action)
res = action.action_evm_event(action, vm, :policy => FactoryBot.create(:miq_policy))

Expand All @@ -86,7 +86,7 @@

context "#raise_automation_event" do
before do
@vm = FactoryBot.create(:vm_vmware)
@vm = FactoryBot.create(:vm_infra)
allow(@vm).to receive(:my_zone).and_return("vm_zone")
FactoryBot.create(:miq_event_definition, :name => "raise_automation_event")
FactoryBot.create(:miq_event_definition, :name => "vm_start")
Expand Down Expand Up @@ -156,9 +156,9 @@
context "#action_vm_stop" do
before do
@zone = FactoryBot.create(:zone)
@ems = FactoryBot.create(:ems_vmware, :zone => @zone)
@host = FactoryBot.create(:host_vmware)
@vm = FactoryBot.create(:vm_vmware, :host => @host, :ext_management_system => @ems)
@ems = FactoryBot.create(:ems_infra, :zone => @zone)
@host = FactoryBot.create(:host)
@vm = FactoryBot.create(:vm_infra, :host => @host, :ext_management_system => @ems)
@action = FactoryBot.create(:miq_action, :name => "vm_stop")
end

Expand Down Expand Up @@ -186,7 +186,7 @@

context "#action_vm_retire" do
before do
@vm = FactoryBot.create(:vm_vmware)
@vm = FactoryBot.create(:vm_infra)
allow(@vm).to receive(:my_zone).and_return("vm_zone")
@event = FactoryBot.create(:miq_event_definition, :name => "assigned_company_tag")
@action = FactoryBot.create(:miq_action, :name => "vm_retire")
Expand Down Expand Up @@ -509,7 +509,7 @@ def stub_csv(data)
let(:tenant) { FactoryBot.create(:tenant) }
let(:group) { FactoryBot.create(:miq_group, :tenant => tenant) }
let(:user) { FactoryBot.create(:user, :userid => "test", :miq_groups => [group]) }
let(:vm) { FactoryBot.create(:vm_vmware, :evm_owner => user, :miq_group => group, :hardware => hardware) }
let(:vm) { FactoryBot.create(:vm_infra, :evm_owner => user, :miq_group => group, :hardware => hardware) }
let(:action) { FactoryBot.create(:miq_action, :name => "run_ansible_playbook", :options => action_options) }
let(:stap) { FactoryBot.create(:service_template_ansible_playbook) }
let(:ip1) { "1.1.1.94" }
Expand Down

0 comments on commit 070edd1

Please sign in to comment.