Skip to content

Commit

Permalink
Merge pull request #15377 from gtanzillo/rearch-heartbeat-to-file
Browse files Browse the repository at this point in the history
Support  worker heartbeat to a local file instead of Drb.
  • Loading branch information
jrafanie authored Jun 23, 2017
2 parents d5b9dd4 + 34a11ab commit a0b650f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 12 deletions.
1 change: 1 addition & 0 deletions app/models/miq_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def start
#############################################################
# Start all the configured workers
#############################################################
clean_heartbeat_files
sync_config
start_drb_server
sync_workers
Expand Down
18 changes: 15 additions & 3 deletions app/models/miq_server/worker_management/heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def post_message_for_workers(class_name = nil, resync_needed = false, sync_messa

# Get the latest heartbeat between the SQL and memory (updated via DRb)
def validate_heartbeat(w)
last_heartbeat = workers_last_heartbeat(w.pid)
last_heartbeat = workers_last_heartbeat(w)

if w.last_heartbeat.nil?
last_heartbeat ||= Time.now.utc
Expand All @@ -91,11 +91,23 @@ def validate_heartbeat(w)
end
end

def clean_heartbeat_files
Dir.glob(Rails.root.join("tmp", "*.hb")).each { |f| File.delete(f) }
end

private

def workers_last_heartbeat(pid)
def workers_last_heartbeat(w)
ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? workers_last_heartbeat_to_file(w) : workers_last_heartbeat_to_drb(w)
end

def workers_last_heartbeat_to_drb(w)
@workers_lock.synchronize(:SH) do
@workers.fetch_path(pid, :last_heartbeat)
@workers.fetch_path(w.pid, :last_heartbeat)
end
end

def workers_last_heartbeat_to_file(w)
File.exist?(w.heartbeat_file) ? File.mtime(w.heartbeat_file).utc : Time.now.utc
end
end
4 changes: 4 additions & 0 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ def worker_settings(options = {})
self.class.fetch_worker_settings_from_server(miq_server, options)
end

def heartbeat_file
@heartbeat_file ||= ENV["WORKER_HEARTBEAT_FILE"] || Rails.root.join("tmp", "#{guid}.hb")
end

def self.worker_settings(options = {})
fetch_worker_settings_from_server(MiqServer.my_server, options)
end
Expand Down
19 changes: 14 additions & 5 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'miq-process'
require 'thread'
require 'fileutils'

class MiqWorker::Runner
class TemporaryFailure < RuntimeError
Expand Down Expand Up @@ -361,6 +362,15 @@ def do_work_loop
end

def heartbeat
ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb
do_heartbeat_work
rescue SystemExit, SignalException
raise
rescue Exception => err
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def heartbeat_to_drb
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?
Expand All @@ -371,13 +381,12 @@ def heartbeat
messages = worker_monitor_drb.worker_heartbeat(@worker.pid, @worker.class.name, @worker.queue_name)
@last_hb = now
messages.each { |msg, *args| process_message(msg, *args) }
do_heartbeat_work
rescue DRb::DRbError => err
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
rescue SystemExit, SignalException
raise
rescue Exception => err
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def heartbeat_to_file
FileUtils.touch(@worker.heartbeat_file)
end

def do_gc
Expand Down
32 changes: 28 additions & 4 deletions spec/models/miq_server/worker_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,34 @@
server.setup_drb_variables
end

it "should mark not responding if not recently heartbeated" do
worker.update(:last_heartbeat => 20.minutes.ago)
expect(server.validate_worker(worker)).to be_falsey
expect(worker.reload.status).to eq(MiqWorker::STATUS_STOPPING)
context "for heartbeat" do
it "should mark not responding if not recently heartbeated via Drb" do
worker.update(:last_heartbeat => 20.minutes.ago)
expect(server.validate_worker(worker)).to be_falsey
expect(worker.reload.status).to eq(MiqWorker::STATUS_STOPPING)
end

context "to a file" do
before do
ENV["WORKER_HEARTBEAT_METHOD"] = "file"
end

after do
ENV["WORKER_HEARTBEAT_METHOD"] = nil
end

it "should mark not responding if not recently heartbeated via file" do
allow(server).to receive(:workers_last_heartbeat_to_file).and_return(20.minutes.ago)

expect(server.validate_worker(worker)).to be_falsey
expect(worker.reload.status).to eq(MiqWorker::STATUS_STOPPING)
end

it "should detect responding if recently heartbeated via file" do
expect(server.validate_worker(worker)).to be_truthy
expect(worker.reload.status).to eq(MiqWorker::STATUS_READY)
end
end
end

context "for excessive memory" do
Expand Down

0 comments on commit a0b650f

Please sign in to comment.