diff --git a/app/models/manageiq/providers/base_manager/metrics_capture.rb b/app/models/manageiq/providers/base_manager/metrics_capture.rb index 3fcb5220762..314c5c91196 100644 --- a/app/models/manageiq/providers/base_manager/metrics_capture.rb +++ b/app/models/manageiq/providers/base_manager/metrics_capture.rb @@ -26,13 +26,20 @@ def perf_capture queue_captures(targets, target_options) end + # target is an ExtManagementSystem + def perf_capture_gap(start_time, end_time) + targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true) + target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :interval => 'historical'} } + queue_captures(targets, target_options) + end + # @param targets [Array] list of the targets for capture (from `capture_ems_targets`) # @param target_options [ Hash{Object => Hash{Symbol => Object}}] list of options indexed by target def queue_captures(targets, target_options) targets.each do |target| options = target_options[target] || {} interval_name = options[:interval] || perf_target_to_interval_name(target) - target.perf_capture_queue(interval_name, options) + perf_capture_queue(target, interval_name, options) rescue => err _log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}") end @@ -112,7 +119,7 @@ def calc_target_options(targets_by_rollup_parent) task_end_time = Time.now.utc.iso8601 default_task_start_time = 1.hour.ago.utc.iso8601 - target_options = Hash.new { |h, k| h[k] = {:zone => zone} } + target_options = Hash.new { |h, k| h[k] = {} } # Create a new task for each rollup parent # mark each target with the rollup parent targets_by_rollup_parent.each_with_object(target_options) do |(parent, targets), h| @@ -141,12 +148,107 @@ def calc_target_options(targets_by_rollup_parent) h[target] = { :task_id => task.id, :force => true, # Force collection since we've already verified that capture should be done now - :zone => zone, } end end end + def perf_capture_queue(target, interval_name, options = {}) + # for gap, interval_name = historical, start and end time present. + start_time = options[:start_time] + end_time = options[:end_time] + priority = options[:priority] || Metric::Capture.interval_priority(interval_name) + task_id = options[:task_id] + + # cb is the task used to group cluster realtime metrics + cb = {:class_name => target.class.name, :instance_id => target.id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime' + items = queue_items_for_interval(target, interval_name, start_time, end_time) + + # Queue up the actual items + queue_item = { + :class_name => target.class.name, + :instance_id => target.id, + :role => 'ems_metrics_collector', + :queue_name => ems.metrics_collector_queue_name, + :zone => my_zone, + :state => ['ready', 'dequeue'], + } + + messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args) + items.each do |item_interval, *start_and_end_time| + # Should both interval name and args (dates) be part of uniqueness query? + queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}") + queue_item_options[:args] = start_and_end_time if start_and_end_time.present? + next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority + + MiqQueue.put_or_update(queue_item_options) do |msg, qi| + # reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered. + qi[:miq_task_id] = task_id if task_id && item_interval == "realtime" + if msg.nil? + qi[:priority] = priority + qi.delete(:state) + if cb && item_interval == "realtime" + qi[:miq_callback] = cb + end + qi + elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority)) + qi[:priority] = priority + # rerun the job (either with new task or higher priority) + qi.delete(:state) + if task_id && item_interval == "realtime" + existing_tasks = ((msg.miq_callback || {})[:args] || []).first || [] + qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]]) + end + qi + else + interval = qi[:method_name].sub("perf_capture_", "") + _log.debug("Skipping capture of #{target.log_target} - Performance capture for interval #{interval} is still running") + # NOTE: do not update the message queue + nil + end + end + end + end + + def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day) + # Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained + # within the threshold. Then, reverse it so the newest ordered pair is first: + # start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00 + # [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC], + # [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC], + # [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]] + (start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time| + [interval_name, s_time, e_time] + end.reverse + end + + def queue_items_for_interval(target, interval_name, start_time, end_time) + if interval_name == 'historical' + start_time = Metric::Capture.historical_start_time if start_time.nil? + end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day + split_capture_intervals(interval_name, start_time, end_time) + else + # if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day, + # then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time) + # and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day + realtime_cut_off = 4.hours.ago.utc.beginning_of_day + if target.last_perf_capture_on.nil? + # for initial refresh of non-Storage objects, also go back historically + if !target.kind_of?(Storage) && Metric::Capture.historical_days != 0 + [[interval_name, realtime_cut_off]] + + split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day) + else + [[interval_name, realtime_cut_off]] + end + elsif target.last_perf_capture_on < realtime_cut_off + [[interval_name, realtime_cut_off]] + + split_capture_intervals("historical", target.last_perf_capture_on, realtime_cut_off) + else + [interval_name] + end + end + end + def perf_target_to_interval_name(target) case target when Host, VmOrTemplate then "realtime" diff --git a/app/models/metric/capture.rb b/app/models/metric/capture.rb index e3b32e4c0e8..3a3af27de0e 100644 --- a/app/models/metric/capture.rb +++ b/app/models/metric/capture.rb @@ -69,9 +69,7 @@ def self.perf_capture_gap(start_time, end_time, zone_id = nil, ems_id = nil) end emses.each do |ems| pco = ems.perf_capture_object - targets = Metric::Targets.capture_ems_targets(ems, :exclude_storages => true) - target_options = Hash.new { |_n, _v| {:start_time => start_time.utc, :end_time => end_time.utc, :zone => ems.zone, :interval => 'historical'} } - pco.queue_captures(targets, target_options) + pco.perf_capture_gap(start_time, end_time) end _log.info("Queueing performance capture for range: [#{start_time} - #{end_time}]...Complete") diff --git a/app/models/metric/ci_mixin/capture.rb b/app/models/metric/ci_mixin/capture.rb index 8a21c6a4edb..753f354d7a1 100644 --- a/app/models/metric/ci_mixin/capture.rb +++ b/app/models/metric/ci_mixin/capture.rb @@ -9,116 +9,6 @@ def perf_capture_object delegate :perf_collect_metrics, :to => :perf_capture_object - def ems_for_capture_target - if self.kind_of?(ExtManagementSystem) - self - elsif respond_to?(:ext_management_system) && ext_management_system.present? - ext_management_system - end - end - - def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day) - # Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained - # within the threshold. Then, reverse it so the newest ordered pair is first: - # start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00 - # [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC], - # [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC], - # [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]] - (start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time| - [interval_name, s_time, e_time] - end.reverse - end - private :split_capture_intervals - - def perf_capture_queue(interval_name, options = {}) - start_time = options[:start_time] - end_time = options[:end_time] - priority = options[:priority] || Metric::Capture.interval_priority(interval_name) - task_id = options[:task_id] - zone = options[:zone] || my_zone - zone = zone.name if zone.respond_to?(:name) - ems = ems_for_capture_target - - raise ArgumentError, "invalid interval_name '#{interval_name}'" unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name) - raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil? - - # cb is the task used to group cluster realtime metrics - cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime' - items = queue_items_for_interval(interval_name, start_time, end_time) - - # Queue up the actual items - queue_item = { - :class_name => self.class.name, - :instance_id => id, - :role => 'ems_metrics_collector', - :queue_name => ems.metrics_collector_queue_name, - :zone => zone, - :state => ['ready', 'dequeue'], - } - - messages = MiqQueue.where.not(:method_name => 'perf_capture_realtime').where(queue_item).index_by(&:args) - items.each do |item_interval, *start_and_end_time| - # Should both interval name and args (dates) be part of uniqueness query? - queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}") - queue_item_options[:args] = start_and_end_time if start_and_end_time.present? - next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority - MiqQueue.put_or_update(queue_item_options) do |msg, qi| - # reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered. - qi[:miq_task_id] = task_id if task_id && item_interval == "realtime" - if msg.nil? - qi[:priority] = priority - qi.delete(:state) - if cb && item_interval == "realtime" - qi[:miq_callback] = cb - end - qi - elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority)) - qi[:priority] = priority - # rerun the job (either with new task or higher priority) - qi.delete(:state) - if task_id && item_interval == "realtime" - existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || [] - qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]]) - end - qi - else - interval = qi[:method_name].sub("perf_capture_", "") - _log.debug("Skipping capture of #{log_target} - Performance capture for interval #{interval} is still running") - # NOTE: do not update the message queue - nil - end - end - end - end - - def queue_items_for_interval(interval_name, start_time, end_time) - if interval_name == 'historical' - start_time = Metric::Capture.historical_start_time if start_time.nil? - end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day - split_capture_intervals(interval_name, start_time, end_time) - else - # if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day, - # then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time) - # and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day - realtime_cut_off = 4.hours.ago.utc.beginning_of_day - if last_perf_capture_on.nil? - # for initial refresh of non-Storage objects, also go back historically - if !kind_of?(Storage) && Metric::Capture.historical_days != 0 - [[interval_name, realtime_cut_off]] + - split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day) - else - [[interval_name, realtime_cut_off]] - end - elsif last_perf_capture_on < realtime_cut_off - [[interval_name, realtime_cut_off]] + - split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off) - else - [interval_name] - end - end - end - - def perf_capture_realtime(*args) perf_capture('realtime', *args) end @@ -280,6 +170,6 @@ def perf_capture_realtime_now # For UI to enable refresh of realtime charts on demand _log.info("Realtime capture requested for #{log_target}") - perf_capture_queue('realtime', :priority => MiqQueue::HIGH_PRIORITY) + perf_capture_object.queue_captures([self], self => {:interval => 'realtime', :priority => MiqQueue::HIGH_PRIORITY}) end end diff --git a/spec/models/metric/ci_mixin/capture_spec.rb b/spec/models/metric/ci_mixin/capture_spec.rb index 616b882a527..543b4383e1c 100644 --- a/spec/models/metric/ci_mixin/capture_spec.rb +++ b/spec/models/metric/ci_mixin/capture_spec.rb @@ -288,7 +288,7 @@ def verify_historical_queue_item(queue_item, expected_start_time, expected_end_t def verify_perf_capture_queue(last_perf_capture_on, total_queue_items) Timecop.freeze do vm.last_perf_capture_on = last_perf_capture_on - vm.perf_capture_queue("realtime") + ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime"}) expect(MiqQueue.count).to eq total_queue_items # make sure the queue items are in the correct order @@ -357,7 +357,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items) it "links supplied miq_task with queued item which allow to initialize MiqTask#started_on attribute" do MiqQueue.delete_all task = FactoryBot.create(:miq_task) - vm.perf_capture_queue("realtime", :task_id => task.id) + ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "realtime", :task_id => task.id}) expect(MiqQueue.first.miq_task_id).to eq task.id end end @@ -366,7 +366,7 @@ def verify_perf_capture_queue(last_perf_capture_on, total_queue_items) context "with capture days > 0 and multiple attempts" do def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items) vm.last_perf_capture_on = last_perf_capture_on - vm.perf_capture_queue("historical") + ems_openstack.perf_capture_object.queue_captures([vm], vm => {:interval => "historical"}) expect(MiqQueue.count).to eq total_queue_items end @@ -414,31 +414,6 @@ def verify_perf_capture_queue_historical(last_perf_capture_on, total_queue_items end end - context "handles archived container entities" do - it "get the correct queue name and zone from archived container entities" do - ems = FactoryBot.create(:ems_openshift, :name => 'OpenShiftProvider') - group = FactoryBot.create(:container_group, :name => "group", :ext_management_system => ems) - container = FactoryBot.create(:container, - :name => "container", - :container_group => group, - :ext_management_system => ems) - project = FactoryBot.create(:container_project, - :name => "project", - :ext_management_system => ems) - container.disconnect_inv - group.disconnect_inv - project.disconnect_inv - - expect(container.ems_for_capture_target).to eq ems - expect(group.ems_for_capture_target).to eq ems - expect(project.ems_for_capture_target).to eq ems - - expect(container.my_zone).to eq ems.my_zone - expect(group.my_zone).to eq ems.my_zone - expect(project.my_zone).to eq ems.my_zone - end - end - describe ".perf_capture_realtime_now" do context "with enabled and disabled targets", :with_enabled_disabled_vmware do context "executing perf_capture_realtime_now" do