Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cap&U Extract logic to determine queue items #19448

Merged
merged 7 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions app/models/metric/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ def self.historical_start_time
historical_days.days.ago.utc.beginning_of_day
end

def self.targets_archived_from
archived_for_setting = Settings.performance.targets.archived_for
archived_for_setting.to_i_with_method.seconds.ago.utc
end

def self.concurrent_requests(interval_name)
requests = ::Settings.performance.concurrent_requests[interval_name]
requests = 20 if requests < 20 && interval_name == 'realtime'
Expand Down Expand Up @@ -204,17 +199,10 @@ def self.queue_captures(targets, target_options)

targets.each do |target|
interval_name = perf_target_to_interval_name(target)

options = target_options[target]

begin
target.perf_capture_queue(interval_name, options)
if !target.kind_of?(Storage) && use_historical && target.last_perf_capture_on.nil?
target.perf_capture_queue('historical')
end
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
target.perf_capture_queue(interval_name, options)
rescue => err
_log.warn("Failed to queue perf_capture for target [#{target.class.name}], [#{target.id}], [#{target.name}]: #{err}")
end
end
private_class_method :queue_captures
Expand Down
66 changes: 35 additions & 31 deletions app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.d
private :split_capture_intervals

def perf_capture_queue(interval_name, options = {})
start_time = options[:start_time]
end_time = options[:end_time]
start_time = options[:start_time]&.utc
end_time = options[:end_time]&.utc
priority = options[:priority] || Metric::Capture.const_get("#{interval_name.upcase}_PRIORITY")
task_id = options[:task_id]
zone = options[:zone] || my_zone
Expand All @@ -40,33 +40,9 @@ def perf_capture_queue(interval_name, options = {})
raise ArgumentError, "end_time cannot be specified if start_time is nil" if start_time.nil? && !end_time.nil?
raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil?

start_time = start_time.utc unless start_time.nil?
end_time = end_time.utc unless end_time.nil?

# Determine the items to queue up
# cb is the task used to group cluster realtime metrics
cb = nil
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
items = split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
items =
if last_perf_capture_on.nil?
[[interval_name, realtime_cut_off]]
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end

cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id
end
cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id && interval_name == 'realtime'
items = queue_items_for_interval(interval_name, start_time, end_time)

# Queue up the actual items
queue_item = {
Expand All @@ -86,7 +62,7 @@ def perf_capture_queue(interval_name, options = {})
next if item_interval != 'realtime' && messages[start_and_end_time].try(:priority) == priority
MiqQueue.put_or_update(queue_item_options) do |msg, qi|
# reason for setting MiqQueue#miq_task_id is to initializes MiqTask.started_on column when message delivered.
qi[:miq_task_id] = task_id if task_id
qi[:miq_task_id] = task_id if task_id && item_interval == "realtime"
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
Expand All @@ -98,9 +74,9 @@ def perf_capture_queue(interval_name, options = {})
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id
if task_id && item_interval == "realtime"
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]]) if item_interval == "realtime"
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
Expand All @@ -113,6 +89,34 @@ def perf_capture_queue(interval_name, options = {})
end
end

def queue_items_for_interval(interval_name, start_time, end_time)
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time ||= 1.day.from_now.utc.beginning_of_day # Ensure no more than one historical collection is queue up in the same day
split_capture_intervals(interval_name, start_time, end_time)
else
# if last_perf_capture_on is earlier than 4.hour.ago.beginning_of_day,
# then create *one* realtime capture for start_time = 4.hours.ago.beginning_of_day (no end_time)
# and create historical captures for each day from last_perf_capture_on until 4.hours.ago.beginning_of_day
realtime_cut_off = 4.hours.ago.utc.beginning_of_day
if last_perf_capture_on.nil?
# for initial refresh of non-Storage objects, also go back historically
if !kind_of?(Storage) && Metric::Capture.historical_days != 0
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", Metric::Capture.historical_start_time, 1.day.from_now.utc.beginning_of_day)
else
[[interval_name, realtime_cut_off]]
end
elsif last_perf_capture_on < realtime_cut_off
[[interval_name, realtime_cut_off]] +
split_capture_intervals("historical", last_perf_capture_on, realtime_cut_off)
else
[interval_name]
end
end
end


def perf_capture_realtime(*args)
perf_capture('realtime', *args)
end
Expand Down
7 changes: 6 additions & 1 deletion app/models/metric/targets.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ def self.perf_capture_always=(options)
MiqRegion.my_region.perf_capture_always = options
end

def self.targets_archived_from
archived_for_setting = Settings.performance.targets.archived_for
archived_for_setting.to_i_with_method.seconds.ago.utc
end

def self.capture_ems_targets(ems, options = {})
case ems
when EmsCloud then capture_cloud_targets([ems], options)
Expand Down Expand Up @@ -49,7 +54,7 @@ def self.capture_cloud_targets(emses, options = {})

def self.with_archived(scope)
# We will look also for freshly archived entities, if the entity was short-lived or even sub-hour
archived_from = Metric::Capture.targets_archived_from
archived_from = targets_archived_from
scope.where(:deleted_on => nil).or(scope.where(:deleted_on => (archived_from..Time.now.utc)))
end

Expand Down