Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all stdout to LogSubscriber #67

Merged
merged 1 commit into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "rails"
require 'good_job/railtie'

require 'good_job/logging'
require 'good_job/log_subscriber'
require 'good_job/lockable'
require 'good_job/job'
require 'good_job/scheduler'
Expand All @@ -12,9 +12,9 @@
require 'active_job/queue_adapters/good_job_adapter'

module GoodJob
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
mattr_accessor :preserve_job_records, default: false
mattr_accessor :reperform_jobs_on_standard_error, default: true
include Logging

ActiveSupport.run_load_hooks(:good_job, self)
end
13 changes: 6 additions & 7 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ def start
).to_i

job_query = GoodJob::Job.queue_string(queue_string)
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock)

$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_string}"
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string)

timer_options = {}
timer_options[:execution_interval] = poll_interval if poll_interval.positive?
Expand All @@ -60,9 +58,7 @@ def start
break if @stop_good_job_executable || scheduler.shutdown?
end

$stdout.puts "\nFinishing GoodJob's current jobs before exiting..."
scheduler.shutdown
$stdout.puts "GoodJob's jobs finished, exiting..."
end

desc :cleanup_preserved_jobs, "Delete preserved job records"
Expand All @@ -74,8 +70,11 @@ def cleanup_preserved_jobs
require RAILS_ENVIRONMENT_RB

timestamp = Time.current - options[:before_seconds_ago]
result = GoodJob::Job.finished(timestamp).delete_all
$stdout.puts "Deleted #{result} preserved #{'job'.pluralize(result)} finished before #{timestamp}."
ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { before_seconds_ago: options[:before_seconds_ago], timestamp: timestamp }) do |payload|
deleted_records_count = GoodJob::Job.finished(timestamp).delete_all

payload[:deleted_records_count] = deleted_records_count
end
end

default_task :start
Expand Down
98 changes: 98 additions & 0 deletions lib/good_job/log_subscriber.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
module GoodJob
class LogSubscriber < ActiveSupport::LogSubscriber
def create(event)
good_job = event.payload[:good_job]

debug do
"GoodJob created job resource with id #{good_job.id}"
end
end

def timer_task_finished(event)
exception = event.payload[:error]
return unless exception

error do
"GoodJob error: #{exception}\n #{exception.backtrace}"
end
end

def job_finished(event)
exception = event.payload[:error]
return unless exception

error do
"GoodJob error: #{exception}\n #{exception.backtrace}"
end
end

def scheduler_create_pools(event)
max_threads = event.payload[:max_threads]
poll_interval = event.payload[:poll_interval]
performer_name = event.payload[:performer_name]

info_and_stdout do
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads} poll_interval=#{poll_interval}."
end
end

def scheduler_shutdown_start(_event)
info_and_stdout do
"GoodJob shutting down scheduler..."
end
end

def scheduler_shutdown(_event)
info_and_stdout do
"GoodJob scheduler is shut down."
end
end

def scheduler_restart_pools(_event)
info_and_stdout do
"GoodJob scheduler has restarted."
end
end

def cleanup_preserved_jobs(event)
timestamp = event.payload[:timestamp]
deleted_records_count = event.payload[:deleted_records_count]

info_and_stdout do
"GoodJob deleted #{deleted_records_count} preserved #{'job'.pluralize(deleted_records_count)} finished before #{timestamp}."
end
end

private

def logger
GoodJob.logger
end

%w(info debug warn error fatal unknown).each do |level|
class_eval <<-METHOD, __FILE__, __LINE__ + 1
def #{level}(progname = nil, &block)
return unless logger

if logger.respond_to?(:tagged)
logger.tagged('GoodJob') do
logger.#{level}(progname, &block)
end
else
logger.#{level}(progname, &block)
end
end
METHOD
end

def info_and_stdout(progname = nil, &block)
$stdout.puts yield unless ActiveSupport::Logger.logger_outputs_to?(logger, STDOUT)

info(progname, &block)
end

def thread_name
Thread.current.name || Thread.current.object_id
end
end
end
70 changes: 0 additions & 70 deletions lib/good_job/logging.rb

This file was deleted.

5 changes: 4 additions & 1 deletion lib/good_job/performer.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
module GoodJob
class Performer
def initialize(target, method_name)
attr_reader :name

def initialize(target, method_name, name: nil)
@target = target
@method_name = method_name
@name = name
end

def next
Expand Down
1 change: 1 addition & 0 deletions lib/good_job/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module GoodJob
class Railtie < ::Rails::Railtie
initializer "good_job.logger" do
ActiveSupport.on_load(:good_job) { self.logger = ::Rails.logger }
GoodJob::LogSubscriber.attach_to :good_job
end
end
end
20 changes: 12 additions & 8 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def initialize(performer, timer_options: {}, pool_options: {})
def shutdown(wait: true)
@_shutdown = true

ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait })
ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait })
ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do
if @timer&.running?
@timer.shutdown
Expand All @@ -52,8 +52,10 @@ def shutdown?
end

def restart(wait: true)
shutdown(wait: wait) unless shutdown?
create_pools
ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job") do
shutdown(wait: wait) unless shutdown?
create_pools
end
end

def create_thread
Expand Down Expand Up @@ -92,12 +94,14 @@ def ready_worker_count
private

def create_pools
@pool = ThreadPoolExecutor.new(@pool_options)
return unless @timer_options[:execution_interval].positive?
ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?

@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
end
end
16 changes: 4 additions & 12 deletions spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@
it 'can gracefully shut down on INT signal' do
cli = described_class.new([], {}, {})

cli_thread = Concurrent::Promises.future do
expect do
cli.start
end.to output(/finished, exiting/).to_stdout
end

cli_thread = Concurrent::Promises.future { cli.start }
sleep_until { cli.instance_variable_get(:@stop_good_job_executable) == false }

Process.kill 'INT', Process.pid # Send the signal to ourselves
Expand All @@ -53,10 +48,7 @@
stub_const 'ENV', ENV.to_hash.merge({ 'RAILS_MAX_THREADS' => 3, 'GOOD_JOB_MAX_THREADS' => 2 })
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

expect do
cli.start
end.to output.to_stdout

cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), pool_options: { max_threads: 4 }, timer_options: {})
end
end
Expand All @@ -76,7 +68,7 @@
scheduler_mock
end

expect { cli.start }.to output.to_stdout
cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash))

performer_query = performer.instance_variable_get(:@target)
Expand All @@ -92,7 +84,7 @@

it 'deletes finished jobs' do
cli = described_class.new([], { before_seconds_ago: 24.hours.to_i }, {})
expect { cli.cleanup_preserved_jobs }.to output(/Deleted 1 preserved job/).to_stdout
expect { cli.cleanup_preserved_jobs }.to output(/GoodJob deleted 1 preserved job/).to_stdout

expect { recent_job.reload }.not_to raise_error
expect { old_unfinished_job.reload }.not_to raise_error
Expand Down
21 changes: 21 additions & 0 deletions spec/good_job/performer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require 'rails_helper'

RSpec.describe GoodJob::Performer do
let(:target) { double('The Target', the_method: nil) } # rubocop:disable RSpec/VerifiedDoubles

describe '#next' do
it 'delegates to target#method_name' do
performer = described_class.new(target, :the_method)
performer.next

expect(target).to have_received(:the_method)
end
end

describe '#name' do
it 'is assignable' do
performer = described_class.new(target, :the_method, name: 'test-performer')
expect(performer.name).to eq 'test-performer'
end
end
end
6 changes: 5 additions & 1 deletion spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
require 'rails_helper'

RSpec.describe GoodJob::Scheduler do
let(:performer) { instance_double(GoodJob::Performer, next: nil) }
let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') }

around do |example|
expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook
end

describe '#shutdown' do
it 'shuts down the theadpools' do
Expand Down
4 changes: 4 additions & 0 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def perform(*args, **kwargs)

let(:adapter) { GoodJob::Adapter.new }

around do |example|
expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook
end

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 250 }

Expand Down