Skip to content

Commit

Permalink
Merge pull request #15494 from NickLaMuro/heartbeat_check
Browse files Browse the repository at this point in the history
Add heartbeat_check script for file-based worker process heartbeating
  • Loading branch information
gtanzillo authored Jul 20, 2017
2 parents e3b4570 + 3dd93ab commit 99a3672
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 7 deletions.
11 changes: 11 additions & 0 deletions app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def deliver_queue_message(msg)
begin
$_miq_worker_current_msg = msg
Thread.current[:tracking_label] = msg.tracking_label || msg.task_id
heartbeat_message_timeout(msg)
status, message, result = msg.deliver

if status == MiqQueue::STATUS_TIMEOUT
Expand Down Expand Up @@ -155,4 +156,14 @@ def do_work
deliver_message(msg)
end
end

private

# Only for file based heartbeating
def heartbeat_message_timeout(message)
if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
end
end
end
4 changes: 2 additions & 2 deletions app/models/miq_server/worker_management/monitor/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def get_worker_poll(worker)
def get_time_threshold(worker)
settings = @child_worker_settings[worker.class.settings_name]

heartbeat_timeout = settings[:heartbeat_timeout] || 2.minutes
starting_timeout = settings[:starting_timeout] || 10.minutes
heartbeat_timeout = settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
starting_timeout = settings[:starting_timeout] || Workers::MiqDefaults.starting_timeout

return starting_timeout if MiqWorker::STATUSES_STARTING.include?(worker.status)

Expand Down
4 changes: 2 additions & 2 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def worker_settings(options = {})
end

def heartbeat_file
@heartbeat_file ||= ENV["WORKER_HEARTBEAT_FILE"] || Rails.root.join("tmp", "#{guid}.hb")
@heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(guid)
end

def self.worker_settings(options = {})
Expand Down Expand Up @@ -441,7 +441,7 @@ def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || 10.minutes
stopping_timeout = self.class.worker_settings[:stopping_timeout] || Workers::MiqDefaults.stopping_timeout
status == MiqWorker::STATUS_STOPPING && last_heartbeat < stopping_timeout.seconds.ago
end

Expand Down
6 changes: 3 additions & 3 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require 'miq-process'
require 'thread'
require 'fileutils'

class MiqWorker::Runner
class TemporaryFailure < RuntimeError
Expand Down Expand Up @@ -385,8 +384,9 @@ def heartbeat_to_drb
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
end

def heartbeat_to_file
FileUtils.touch(@worker.heartbeat_file)
def heartbeat_to_file(timeout = nil)
timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)
end

def do_gc
Expand Down
34 changes: 34 additions & 0 deletions lib/workers/bin/heartbeat_check.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require "optparse"

options = {}
opt_parser = OptionParser.new do |opts|
opts.banner = "usage: #{File.basename($PROGRAM_NAME, '.rb')} [HEARTBEAT_FILE]"

opts.on("-b=HBFILE", "--heartbeat-file=HBFILE", "Heartbeat file to read (overrides arg val)") do |val|
options[:heartbeat_file] = val
end

opts.on("-g=GUID", "--guid=GUID", "Use this guid for finding the heartbeat file") do |val|
options[:guid] = val
end

opts.on("-v", "--[no-]verbose", "Verbose output") do |val|
options[:verbose] = val
end

opts.on("-h", "--help", "Displays this help") do
puts opts
exit
end
end
opt_parser.parse!

require "English"
require File.expand_path("../../heartbeat.rb", __FILE__)
heartbeat_file = options[:heartbeat_file] || ARGV[0]
heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(options[:guid])

exit_status = Workers::Heartbeat.file_check(heartbeat_file)

at_exit { puts $ERROR_INFO.status if options[:verbose] }
exit exit_status
22 changes: 22 additions & 0 deletions lib/workers/heartbeat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require_relative "miq_defaults"

module Workers
class Heartbeat
def self.file_check(heartbeat_file = Workers::MiqDefaults.heartbeat_file)
if File.exist?(heartbeat_file)
current_time = Time.now.utc
contents = File.read(heartbeat_file)
mtime = File.mtime(heartbeat_file).utc
timeout = if contents.empty?
(mtime + Workers::MiqDefaults.heartbeat_timeout).utc
else
Time.parse(contents).utc
end

current_time < timeout
else
false
end
end
end
end
26 changes: 26 additions & 0 deletions lib/workers/miq_defaults.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
require "active_support/core_ext/numeric/time"

module Workers
class MiqDefaults
HEARTBEAT_TIMEOUT = 2.minutes.freeze
STARTING_TIMEOUT = 10.minutes.freeze
STOPPING_TIMEOUT = 10.minutes.freeze

def self.heartbeat_timeout
HEARTBEAT_TIMEOUT
end

def self.starting_timeout
STARTING_TIMEOUT
end

def self.stopping_timeout
STOPPING_TIMEOUT
end

def self.heartbeat_file(guid = nil)
guid ||= "miq_worker"
ENV["WORKER_HEARTBEAT_FILE"] || File.expand_path("../../../tmp/#{guid}.hb", __FILE__)
end
end
end
72 changes: 72 additions & 0 deletions spec/lib/workers/heartbeat_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
require "workers/heartbeat"

shared_examples_for "heartbeat file checker" do |heartbeat_file = nil|
# This is used instead of just passing in the `heartbeat_file` value directly
# into the method because we can splat it in the argument list and force a "no
# args" method call in each of the tests
let(:file_check_args) { [heartbeat_file].compact }
let(:calculated_hb_file) { Pathname.new(heartbeat_file || ENV["WORKER_HEARTBEAT_FILE"]) }
around do |example|
FileUtils.mkdir_p(calculated_hb_file.parent)
File.write(calculated_hb_file, "")

example.run

FileUtils.rm_f(calculated_hb_file.to_s)
end

it "returns false when the heartbeat file does not exist" do
FileUtils.rm_f calculated_hb_file.to_s # Early delete
expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(false)
end

it "returns true with a newly created heartbeat file with no content" do
expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(true)
end

it "returns false with a stale heartbeat file with no content" do
to_the_future = (2.minutes + 2.seconds).from_now
Timecop.travel(to_the_future) do
expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(false)
end
end

it "returns true with a heartbeat file with content within the timeout" do
# Set timeout in heartbeat file
File.write(calculated_hb_file, 3.minutes.from_now)

to_the_future = (2.minutes + 2.seconds).from_now
Timecop.travel(to_the_future) do
expect(Workers::Heartbeat.file_check(*file_check_args)).to eq(true)
end
end
end

describe Workers::Heartbeat do
describe ".file_check" do
context "using the default heartbeat_file" do
let(:test_heartbeat_file) { ManageIQ.root.join("tmp", "spec", "test.hb") }

around do |example|
# This is given the highest priority when calling
# Workers::MiqDefaults.heartbeat_file.
#
# Trying to avoid using mocks...
old_env = ENV["WORKER_HEARTBEAT_FILE"]
ENV["WORKER_HEARTBEAT_FILE"] = test_heartbeat_file.to_s

example.run

ENV["WORKER_HEARTBEAT_FILE"] = old_env
end

it_should_behave_like "heartbeat file checker"
end

context "when passing in a filepath as an argument" do
other_heartbeat_file = ManageIQ.root.join("tmp", "spec", "other.hb").to_s

it_should_behave_like "heartbeat file checker", other_heartbeat_file
end
end
end

0 comments on commit 99a3672

Please sign in to comment.