Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cu capture move capture queue #19599

Merged
merged 8 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions app/models/manageiq/providers/base_manager/metrics_capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ def perf_capture
queue_captures(targets, target_options)
end

# target is an ExtManagementSystem
def perf_capture_gap(start_time, end_time)
targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true)
target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :interval => 'historical'} }
queue_captures(targets, target_options)
end

# @param targets [Array<Object>] list of the targets for capture (from `capture_ems_targets`)
# @param target_options [ Hash{Object => Hash{Symbol => Object}}] list of options indexed by target
def queue_captures(targets, target_options)
targets.each do |target|
options = target_options[target] || {}
interval_name = options[:interval] || perf_target_to_interval_name(target)
target.perf_capture_queue(interval_name, options)
perf_capture_queue(target, interval_name, options)
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
Expand Down Expand Up @@ -112,7 +119,7 @@ def calc_target_options(targets_by_rollup_parent)
task_end_time = Time.now.utc.iso8601
default_task_start_time = 1.hour.ago.utc.iso8601

target_options = Hash.new { |h, k| h[k] = {:zone => zone} }
target_options = Hash.new { |h, k| h[k] = {} }
# Create a new task for each rollup parent
# mark each target with the rollup parent
targets_by_rollup_parent.each_with_object(target_options) do |(parent, targets), h|
Expand Down Expand Up @@ -141,12 +148,107 @@ def calc_target_options(targets_by_rollup_parent)
h[target] = {
:task_id => task.id,
:force => true, # Force collection since we've already verified that capture should be done now
:zone => zone,
}
end
end
end

def perf_capture_queue(target, interval_name, options = {})
# for gap, interval_name = historical, start and end time present.
start_time = options[:start_time]
end_time = options[:end_time]
priority = options[:priority] || Metric::Capture.interval_priority(interval_name)
task_id = options[:task_id]

# cb is the task used to group cluster realtime metrics
cb = {:class_name => target.class.name, :instance_id => target.id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(target, interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
:class_name => target.class.name,
:instance_id => target.id,
:role => 'ems_metrics_collector',
:queue_name => ems.metrics_collector_queue_name,
:zone => my_zone,
:state => ['ready', 'dequeue'],
}

messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args)
items.each do |item_interval, *start_and_end_time|
# Should both interval name and args (dates) be part of uniqueness query?
queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}")
queue_item_options[:args] = start_and_end_time if start_and_end_time.present?
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority

MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
if cb && item_interval == "realtime"
qi[:miq_callback] = cb
end
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id && item_interval == "realtime"
existing_tasks = ((msg.miq_callback || {})[:args] || []).first || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
interval = qi[:method_name].sub("perf_capture_", "")
_log.debug("Skipping capture of #{target.log_target} - Performance capture for interval #{interval} is still running")
# NOTE: do not update the message queue
nil
end
end
end
end

def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day)
# Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained
# within the threshold. Then, reverse it so the newest ordered pair is first:
# start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
# [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
# [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
# [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
(start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
[interval_name, s_time, e_time]
end.reverse
end

def queue_items_for_interval(target, interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if target.last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !target.kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif target.last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", target.last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end

def perf_target_to_interval_name(target)
case target
when Host, VmOrTemplate then "realtime"
Expand Down
4 changes: 1 addition & 3 deletions app/models/metric/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ def self.perf_capture_gap(start_time, end_time, zone_id = nil, ems_id = nil)
end
emses.each do |ems|
pco = ems.perf_capture_object
targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true)
target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :zone => ems.zone, :interval => 'historical'} }
pco.queue_captures(targets, target_options)
pco.perf_capture_gap(start_time, end_time)
end

_log.info("Queueing performance capture for range: [#{start_time} - #{end_time}]...Complete")
Expand Down
112 changes: 1 addition & 111 deletions app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,116 +9,6 @@ def perf_capture_object

delegate :perf_collect_metrics, :to => :perf_capture_object

def ems_for_capture_target
if self.kind_of?(ExtManagementSystem)
self
elsif respond_to?(:ext_management_system) && ext_management_system.present?
ext_management_system
end
end

def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day)
# Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained
# within the threshold. Then, reverse it so the newest ordered pair is first:
# start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
# [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
# [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
# [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
(start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
[interval_name, s_time, e_time]
end.reverse
end
private :split_capture_intervals

def perf_capture_queue(interval_name, options = {})
start_time = options[:start_time]
end_time = options[:end_time]
priority = options[:priority] || Metric::Capture.interval_priority(interval_name)
task_id = options[:task_id]
zone = options[:zone] || my_zone
zone = zone.name if zone.respond_to?(:name)
ems = ems_for_capture_target

raise ArgumentError, "invalid interval_name '#{interval_name}'" unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil?

# cb is the task used to group cluster realtime metrics
cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
:class_name => self.class.name,
:instance_id => id,
:role => 'ems_metrics_collector',
:queue_name => ems.metrics_collector_queue_name,
:zone => zone,
:state => ['ready', 'dequeue'],
}

messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args)
items.each do |item_interval, *start_and_end_time|
# Should both interval name and args (dates) be part of uniqueness query?
queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}")
queue_item_options[:args] = start_and_end_time if start_and_end_time.present?
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority
MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
if cb && item_interval == "realtime"
qi[:miq_callback] = cb
end
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id && item_interval == "realtime"
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
interval = qi[:method_name].sub("perf_capture_", "")
_log.debug("Skipping capture of #{log_target} - Performance capture for interval #{interval} is still running")
# NOTE: do not update the message queue
nil
end
end
end
end

def queue_items_for_interval(interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end


def perf_capture_realtime(*args)
perf_capture('realtime', *args)
end
Expand Down Expand Up @@ -280,6 +170,6 @@ def perf_capture_realtime_now
# For UI to enable refresh of realtime charts on demand
_log.info("Realtime capture requested for #{log_target}")

perf_capture_queue('realtime', :priority => MiqQueue::HIGH_PRIORITY)
perf_capture_object.queue_captures([self], self => {:interval => 'realtime', :priority => MiqQueue::HIGH_PRIORITY})
end
end
31 changes: 3 additions & 28 deletions spec/models/metric/ci_mixin/capture_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def verify_historical_queue_item(queue_item, expected_start_time, expected_end_t
def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
Timecop.freeze do
vm.last_perf_capture_on = last_perf_capture_on
vm.perf_capture_queue("realtime")
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime"})
expect(MiqQueue.count).to eq total_queue_items

# make sure the queue items are in the correct order
Expand Down Expand Up @@ -357,7 +357,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
it "links supplied miq_task with queued item which allow to initialize MiqTask#started_on attribute" do
MiqQueue.delete_all
task = FactoryBot.create(:miq_task)
vm.perf_capture_queue("realtime", :task_id => task.id)
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime", :task_id => task.id})
expect(MiqQueue.first.miq_task_id).to eq task.id
end
end
Expand All @@ -366,7 +366,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items)
context "with capture days > 0 and multiple attempts" do
def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items)
vm.last_perf_capture_on = last_perf_capture_on
vm.perf_capture_queue("historical")
ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "historical"})
expect(MiqQueue.count).to eq total_queue_items
end

Expand Down Expand Up @@ -414,31 +414,6 @@ def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items
end
end

context "handles archived container entities" do
it "get the correct queue name and zone from archived container entities" do
ems = FactoryBot.create(:ems_openshift, :name => 'OpenShiftProvider')
group = FactoryBot.create(:container_group, :name => "group", :ext_management_system => ems)
container = FactoryBot.create(:container,
:name => "container",
:container_group => group,
:ext_management_system => ems)
project = FactoryBot.create(:container_project,
:name => "project",
:ext_management_system => ems)
container.disconnect_inv
group.disconnect_inv
project.disconnect_inv

expect(container.ems_for_capture_target).to eq ems
expect(group.ems_for_capture_target).to eq ems
expect(project.ems_for_capture_target).to eq ems

expect(container.my_zone).to eq ems.my_zone
expect(group.my_zone).to eq ems.my_zone
expect(project.my_zone).to eq ems.my_zone
end
end

describe ".perf_capture_realtime_now" do
context "with enabled and disabled targets", :with_enabled_disabled_vmware do
context "executing perf_capture_realtime_now" do
Expand Down