Skip to content

Commit

Permalink
Merge pull request #19599 from kbrock/cu_capture_move_capture_queue
Browse files Browse the repository at this point in the history
Cu capture move capture queue
  • Loading branch information
agrare committed Dec 5, 2019
2 parents 306b97a + 6c88b86 commit eef634d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 145 deletions.
108 changes: 105 additions & 3 deletions app/models/manageiq/providers/base_manager/metrics_capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ def perf_capture
queue_captures(targets, target_options)
end

# target is an ExtManagementSystem
def perf_capture_gap(start_time, end_time)
targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true)
target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :interval => 'historical'} }
queue_captures(targets, target_options)
end

# @param targets [Array<Object>] list of the targets for capture (from `capture_ems_targets`)
# @param target_options [ Hash{Object => Hash{Symbol => Object}}] list of options indexed by target
def queue_captures(targets, target_options)
targets.each do |target|
options = target_options[target] || {}
interval_name = options[:interval] || perf_target_to_interval_name(target)
target.perf_capture_queue(interval_name, options)
perf_capture_queue(target, interval_name, options)
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
Expand Down Expand Up @@ -112,7 +119,7 @@ def calc_target_options(targets_by_rollup_parent)
task_end_time = Time.now.utc.iso8601
default_task_start_time = 1.hour.ago.utc.iso8601

target_options = Hash.new { |h, k| h[k] = {:zone => zone} }
target_options = Hash.new { |h, k| h[k] = {} }
# Create a new task for each rollup parent
# mark each target with the rollup parent
targets_by_rollup_parent.each_with_object(target_options) do |(parent, targets), h|
Expand Down Expand Up @@ -141,12 +148,107 @@ def calc_target_options(targets_by_rollup_parent)
h[target] = {
:task_id => task.id,
:force => true, # Force collection since we've already verified that capture should be done now
:zone => zone,
}
end
end
end

def perf_capture_queue(target, interval_name, options = {})
# for gap, interval_name = historical, start and end time present.
start_time = options[:start_time]
end_time = options[:end_time]
priority = options[:priority] || Metric::Capture.interval_priority(interval_name)
task_id = options[:task_id]

# cb is the task used to group cluster realtime metrics
cb = {:class_name => target.class.name, :instance_id => target.id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(target, interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
:class_name => target.class.name,
:instance_id => target.id,
:role => 'ems_metrics_collector',
:queue_name => ems.metrics_collector_queue_name,
:zone => my_zone,
:state => ['ready', 'dequeue'],
}

messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args)
items.each do |item_interval, *start_and_end_time|
# Should both interval name and args (dates) be part of uniqueness query?
queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}")
queue_item_options[:args] = start_and_end_time if start_and_end_time.present?
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority

MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
if cb && item_interval == "realtime"
qi[:miq_callback] = cb
end
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id && item_interval == "realtime"
existing_tasks = ((msg.miq_callback || {})[:args] || []).first || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
interval = qi[:method_name].sub("perf_capture_", "")
_log.debug("Skipping capture of #{target.log_target} - Performance capture for interval #{interval} is still running")
# NOTE: do not update the message queue
nil
end
end
end
end

def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day)
# Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained
# within the threshold. Then, reverse it so the newest ordered pair is first:
# start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
# [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
# [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
# [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
(start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
[interval_name, s_time, e_time]
end.reverse
end

def queue_items_for_interval(target, interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if target.last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !target.kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif target.last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", target.last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end

def perf_target_to_interval_name(target)
case target
when Host, VmOrTemplate then "realtime"
Expand Down
4 changes: 1 addition & 3 deletions app/models/metric/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def self.perf_capture_gap(start_time, end_time, zone_id = nil, ems_id = nil)
end
emses.each do |ems|
pco = ems.perf_capture_object
targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true)
target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :zone => ems.zone, :interval => 'historical'} }
pco.queue_captures(targets, target_options)
pco.perf_capture_gap(start_time, end_time)
end

_log.info("Queueing performance capture for range: [#{start_time} - #{end_time}]...Complete")
Expand Down
112 changes: 1 addition & 111 deletions app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,116 +9,6 @@ def perf_capture_object

delegate :perf_collect_metrics, :to => :perf_capture_object

def ems_for_capture_target
if self.kind_of?(ExtManagementSystem)
self
elsif respond_to?(:ext_management_system) && ext_management_system.present?
ext_management_system
end
end

def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day)
# Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained
# within the threshold. Then, reverse it so the newest ordered pair is first:
# start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
# [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
# [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
# [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
(start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
[interval_name, s_time, e_time]
end.reverse
end
private :split_capture_intervals

def perf_capture_queue(interval_name, options = {})
start_time = options[:start_time]
end_time = options[:end_time]
priority = options[:priority] || Metric::Capture.interval_priority(interval_name)
task_id = options[:task_id]
zone = options[:zone] || my_zone
zone = zone.name if zone.respond_to?(:name)
ems = ems_for_capture_target

raise ArgumentError, "invalid interval_name '#{interval_name}'" unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil?

# cb is the task used to group cluster realtime metrics
cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
:class_name => self.class.name,
:instance_id => id,
:role => 'ems_metrics_collector',
:queue_name => ems.metrics_collector_queue_name,
:zone => zone,
:state => ['ready', 'dequeue'],
}

messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args)
items.each do |item_interval, *start_and_end_time|
# Should both interval name and args (dates) be part of uniqueness query?
queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}")
queue_item_options[:args] = start_and_end_time if start_and_end_time.present?
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority
MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
if cb && item_interval == "realtime"
qi[:miq_callback] = cb
end
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id && item_interval == "realtime"
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
interval = qi[:method_name].sub("perf_capture_", "")
_log.debug("Skipping capture of #{log_target} - Performance capture for interval #{interval} is still running")
# NOTE: do not update the message queue
nil
end
end
end
end

def queue_items_for_interval(interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end


def perf_capture_realtime(*args)
perf_capture('realtime', *args)
end
Expand Down Expand Up @@ -280,6 +170,6 @@ def perf_capture_realtime_now
# For UI to enable refresh of realtime charts on demand
_log.info("Realtime capture requested for #{log_target}")

perf_capture_queue('realtime', :priority => MiqQueue::HIGH_PRIORITY)
perf_capture_object.queue_captures([self], self => {:interval => 'realtime', :priority => MiqQueue::HIGH_PRIORITY})
end
end
31 changes: 3 additions & 28 deletions spec/models/metric/ci_mixin/capture_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def verify_historical_queue_item(queue_item, expected_start_time, expected_end_t
def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
Timecop.freeze do
vm.last_perf_capture_on = last_perf_capture_on
vm.perf_capture_queue("realtime")
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime"})
expect(MiqQueue.count).to eq total_queue_items

# make sure the queue items are in the correct order
Expand Down Expand Up @@ -357,7 +357,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
it "links supplied miq_task with queued item which allow to initialize MiqTask#started_on attribute" do
MiqQueue.delete_all
task = FactoryBot.create(:miq_task)
vm.perf_capture_queue("realtime", :task_id => task.id)
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime", :task_id => task.id})
expect(MiqQueue.first.miq_task_id).to eq task.id
end
end
Expand All @@ -366,7 +366,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
context "with capture days > 0 and multiple attempts" do
def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items)
vm.last_perf_capture_on = last_perf_capture_on
vm.perf_capture_queue("historical")
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "historical"})
expect(MiqQueue.count).to eq total_queue_items
end

Expand Down Expand Up @@ -414,31 +414,6 @@ def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items
end
end

context "handles archived container entities" do
it "get the correct queue name and zone from archived container entities" do
ems = FactoryBot.create(:ems_openshift, :name => 'OpenShiftProvider')
group = FactoryBot.create(:container_group, :name => "group", :ext_management_system => ems)
container = FactoryBot.create(:container,
:name => "container",
:container_group => group,
:ext_management_system => ems)
project = FactoryBot.create(:container_project,
:name => "project",
:ext_management_system => ems)
container.disconnect_inv
group.disconnect_inv
project.disconnect_inv

expect(container.ems_for_capture_target).to eq ems
expect(group.ems_for_capture_target).to eq ems
expect(project.ems_for_capture_target).to eq ems

expect(container.my_zone).to eq ems.my_zone
expect(group.my_zone).to eq ems.my_zone
expect(project.my_zone).to eq ems.my_zone
end
end

describe ".perf_capture_realtime_now" do
context "with enabled and disabled targets", :with_enabled_disabled_vmware do
context "executing perf_capture_realtime_now" do
Expand Down

0 comments on commit eef634d

Please sign in to comment.