diff --git a/app/models/miq_queue_worker_base/runner.rb b/app/models/miq_queue_worker_base/runner.rb index 363536bacb7..5a0f45d8a20 100644 --- a/app/models/miq_queue_worker_base/runner.rb +++ b/app/models/miq_queue_worker_base/runner.rb @@ -105,6 +105,7 @@ def deliver_queue_message(msg) begin $_miq_worker_current_msg = msg Thread.current[:tracking_label] = msg.tracking_label || msg.task_id + heartbeat_message_timeout(msg) status, message, result = msg.deliver if status == MiqQueue::STATUS_TIMEOUT @@ -155,4 +156,14 @@ def do_work deliver_message(msg) end end + + private + + # Only for file based heartbeating + def heartbeat_message_timeout(message) + if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout + timeout = worker_settings[:poll] + message.msg_timeout + heartbeat_to_file(timeout) + end + end end diff --git a/app/models/miq_server/worker_management/monitor/settings.rb b/app/models/miq_server/worker_management/monitor/settings.rb index 5b0e347c564..eea8e57ebaa 100644 --- a/app/models/miq_server/worker_management/monitor/settings.rb +++ b/app/models/miq_server/worker_management/monitor/settings.rb @@ -30,8 +30,8 @@ def get_worker_poll(worker) def get_time_threshold(worker) settings = @child_worker_settings[worker.class.settings_name] - heartbeat_timeout = settings[:heartbeat_timeout] || 2.minutes - starting_timeout = settings[:starting_timeout] || 10.minutes + heartbeat_timeout = settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout + starting_timeout = settings[:starting_timeout] || Workers::MiqDefaults.starting_timeout return starting_timeout if MiqWorker::STATUSES_STARTING.include?(worker.status) diff --git a/app/models/miq_worker.rb b/app/models/miq_worker.rb index f5638acfc69..844819edafb 100644 --- a/app/models/miq_worker.rb +++ b/app/models/miq_worker.rb @@ -224,7 +224,7 @@ def worker_settings(options = {}) end def heartbeat_file - @heartbeat_file ||= ENV["WORKER_HEARTBEAT_FILE"] || Rails.root.join("tmp", "#{guid}.hb") + @heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(guid) end def self.worker_settings(options = {}) @@ -441,7 +441,7 @@ def stopping_for_too_long? # Note, a 'stopping' worker heartbeats in DRb but NOT to # the database, so we can see how long it's been # 'stopping' by checking the last_heartbeat. - stopping_timeout = self.class.worker_settings[:stopping_timeout] || 10.minutes + stopping_timeout = self.class.worker_settings[:stopping_timeout] || Workers::MiqDefaults.stopping_timeout status == MiqWorker::STATUS_STOPPING && last_heartbeat < stopping_timeout.seconds.ago end diff --git a/app/models/miq_worker/runner.rb b/app/models/miq_worker/runner.rb index dfae1a128d0..36971c80754 100644 --- a/app/models/miq_worker/runner.rb +++ b/app/models/miq_worker/runner.rb @@ -1,6 +1,5 @@ require 'miq-process' require 'thread' -require 'fileutils' class MiqWorker::Runner class TemporaryFailure < RuntimeError @@ -385,8 +384,9 @@ def heartbeat_to_drb do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1) end - def heartbeat_to_file - FileUtils.touch(@worker.heartbeat_file) + def heartbeat_to_file(timeout = nil) + timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout + File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s) end def do_gc diff --git a/lib/workers/bin/heartbeat_check.rb b/lib/workers/bin/heartbeat_check.rb new file mode 100644 index 00000000000..6659be46123 --- /dev/null +++ b/lib/workers/bin/heartbeat_check.rb @@ -0,0 +1,34 @@ +require "optparse" + +options = {} +opt_parser = OptionParser.new do |opts| + opts.banner = "usage: #{File.basename($PROGRAM_NAME, '.rb')} [HEARTBEAT_FILE]" + + opts.on("-b=HBFILE", "--heartbeat-file=HBFILE", "Heartbeat file to read (overrides arg val)") do |val| + options[:heartbeat_file] = val + end + + opts.on("-g=GUID", "--guid=GUID", "Use this guid for finding the heartbeat file") do |val| + options[:guid] = val + end + + opts.on("-v", "--[no-]verbose", "Verbose output") do |val| + options[:verbose] = val + end + + opts.on("-h", "--help", "Displays this help") do + puts opts + exit + end +end +opt_parser.parse! + +require "English" +require File.expand_path("../../heartbeat.rb", __FILE__) +heartbeat_file = options[:heartbeat_file] || ARGV[0] +heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(options[:guid]) + +exit_status = Workers::Heartbeat.file_check(heartbeat_file) + +at_exit { puts $ERROR_INFO.status if options[:verbose] } +exit exit_status diff --git a/lib/workers/heartbeat.rb b/lib/workers/heartbeat.rb new file mode 100644 index 00000000000..df299c02175 --- /dev/null +++ b/lib/workers/heartbeat.rb @@ -0,0 +1,22 @@ +require_relative "miq_defaults" + +module Workers + class Heartbeat + def self.file_check(heartbeat_file = Workers::MiqDefaults.heartbeat_file) + if File.exist?(heartbeat_file) + current_time = Time.now.utc + contents = File.read(heartbeat_file) + mtime = File.mtime(heartbeat_file).utc + timeout = if contents.empty? + (mtime + Workers::MiqDefaults.heartbeat_timeout).utc + else + Time.parse(contents).utc + end + + current_time < timeout + else + false + end + end + end +end diff --git a/lib/workers/miq_defaults.rb b/lib/workers/miq_defaults.rb new file mode 100644 index 00000000000..0084589d8d8 --- /dev/null +++ b/lib/workers/miq_defaults.rb @@ -0,0 +1,26 @@ +require "active_support/core_ext/numeric/time" + +module Workers + class MiqDefaults + HEARTBEAT_TIMEOUT = 2.minutes.freeze + STARTING_TIMEOUT = 10.minutes.freeze + STOPPING_TIMEOUT = 10.minutes.freeze + + def self.heartbeat_timeout + HEARTBEAT_TIMEOUT + end + + def self.starting_timeout + STARTING_TIMEOUT + end + + def self.stopping_timeout + STOPPING_TIMEOUT + end + + def self.heartbeat_file(guid = nil) + guid ||= "miq_worker" + ENV["WORKER_HEARTBEAT_FILE"] || File.expand_path("../../../tmp/#{guid}.hb", __FILE__) + end + end +end diff --git a/spec/lib/workers/heartbeat_spec.rb b/spec/lib/workers/heartbeat_spec.rb new file mode 100644 index 00000000000..0417220a351 --- /dev/null +++ b/spec/lib/workers/heartbeat_spec.rb @@ -0,0 +1,72 @@ +require "workers/heartbeat" + +shared_examples_for "heartbeat file checker" do |heartbeat_file = nil| + # This is used instead of just passing in the `heartbeat_file` value directly + # into the method because we can splat it in the argument list and force a "no + # args" method call in each of the tests + let(:file_check_args) { [heartbeat_file].compact } + let(:calculated_hb_file) { Pathname.new(heartbeat_file || ENV["WORKER_HEARTBEAT_FILE"]) } + around do |example| + FileUtils.mkdir_p(calculated_hb_file.parent) + File.write(calculated_hb_file, "") + + example.run + + FileUtils.rm_f(calculated_hb_file.to_s) + end + + it "returns false when the heartbeat file does not exist" do + FileUtils.rm_f calculated_hb_file.to_s # Early delete + expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(false) + end + + it "returns true with a newly created heartbeat file with no content" do + expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(true) + end + + it "returns false with a stale heartbeat file with no content" do + to_the_future = (2.minutes + 2.seconds).from_now + Timecop.travel(to_the_future) do + expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(false) + end + end + + it "returns true with a heartbeat file with content within the timeout" do + # Set timeout in heartbeat file + File.write(calculated_hb_file, 3.minutes.from_now) + + to_the_future = (2.minutes + 2.seconds).from_now + Timecop.travel(to_the_future) do + expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(true) + end + end +end + +describe Workers::Heartbeat do + describe ".file_check" do + context "using the default heartbeat_file" do + let(:test_heartbeat_file) { ManageIQ.root.join("tmp", "spec", "test.hb") } + + around do |example| + # This is given the highest priority when calling + # Workers::MiqDefaults.heartbeat_file. + # + # Trying to avoid using mocks... + old_env = ENV["WORKER_HEARTBEAT_FILE"] + ENV["WORKER_HEARTBEAT_FILE"] = test_heartbeat_file.to_s + + example.run + + ENV["WORKER_HEARTBEAT_FILE"] = old_env + end + + it_should_behave_like "heartbeat file checker" + end + + context "when passing in a filepath as an argument" do + other_heartbeat_file = ManageIQ.root.join("tmp", "spec", "other.hb").to_s + + it_should_behave_like "heartbeat file checker", other_heartbeat_file + end + end +end