From 2cf004bdbed58bbff27217cf097a825d0d58790a Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Wed, 5 Aug 2020 08:06:37 -0700 Subject: [PATCH] Move all stdout to LogSubscriber - Renames instrumentation `scheduler_start_shutdown.good_job` to `scheduler_shutdown_start.good_job` - Adds `Performer#name` --- lib/good_job.rb | 4 +- lib/good_job/cli.rb | 13 ++-- lib/good_job/log_subscriber.rb | 98 ++++++++++++++++++++++++++++++ lib/good_job/logging.rb | 70 --------------------- lib/good_job/performer.rb | 5 +- lib/good_job/railtie.rb | 1 + lib/good_job/scheduler.rb | 20 +++--- spec/good_job/cli_spec.rb | 16 ++--- spec/good_job/performer_spec.rb | 21 +++++++ spec/good_job/scheduler_spec.rb | 6 +- spec/integration/scheduler_spec.rb | 4 ++ 11 files changed, 157 insertions(+), 101 deletions(-) create mode 100644 lib/good_job/log_subscriber.rb delete mode 100644 lib/good_job/logging.rb create mode 100644 spec/good_job/performer_spec.rb diff --git a/lib/good_job.rb b/lib/good_job.rb index cbbdb6b30..53f54a166 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -1,7 +1,7 @@ require "rails" require 'good_job/railtie' -require 'good_job/logging' +require 'good_job/log_subscriber' require 'good_job/lockable' require 'good_job/job' require 'good_job/scheduler' @@ -12,9 +12,9 @@ require 'active_job/queue_adapters/good_job_adapter' module GoodJob + cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT)) mattr_accessor :preserve_job_records, default: false mattr_accessor :reperform_jobs_on_standard_error, default: true - include Logging ActiveSupport.run_load_hooks(:good_job, self) end diff --git a/lib/good_job/cli.rb b/lib/good_job/cli.rb index c1a67f2ba..722186fe7 100644 --- a/lib/good_job/cli.rb +++ b/lib/good_job/cli.rb @@ -37,9 +37,7 @@ def start ).to_i job_query = GoodJob::Job.queue_string(queue_string) - job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock) - - $stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_string}" + job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string) timer_options = {} timer_options[:execution_interval] = poll_interval if poll_interval.positive? @@ -60,9 +58,7 @@ def start break if @stop_good_job_executable || scheduler.shutdown? end - $stdout.puts "\nFinishing GoodJob's current jobs before exiting..." scheduler.shutdown - $stdout.puts "GoodJob's jobs finished, exiting..." end desc :cleanup_preserved_jobs, "Delete preserved job records" @@ -74,8 +70,11 @@ def cleanup_preserved_jobs require RAILS_ENVIRONMENT_RB timestamp = Time.current - options[:before_seconds_ago] - result = GoodJob::Job.finished(timestamp).delete_all - $stdout.puts "Deleted #{result} preserved #{'job'.pluralize(result)} finished before #{timestamp}." + ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { before_seconds_ago: options[:before_seconds_ago], timestamp: timestamp }) do |payload| + deleted_records_count = GoodJob::Job.finished(timestamp).delete_all + + payload[:deleted_records_count] = deleted_records_count + end end default_task :start diff --git a/lib/good_job/log_subscriber.rb b/lib/good_job/log_subscriber.rb new file mode 100644 index 000000000..222f45882 --- /dev/null +++ b/lib/good_job/log_subscriber.rb @@ -0,0 +1,98 @@ +module GoodJob + class LogSubscriber < ActiveSupport::LogSubscriber + def create(event) + good_job = event.payload[:good_job] + + debug do + "GoodJob created job resource with id #{good_job.id}" + end + end + + def timer_task_finished(event) + exception = event.payload[:error] + return unless exception + + error do + "GoodJob error: #{exception}\n #{exception.backtrace}" + end + end + + def job_finished(event) + exception = event.payload[:error] + return unless exception + + error do + "GoodJob error: #{exception}\n #{exception.backtrace}" + end + end + + def scheduler_create_pools(event) + max_threads = event.payload[:max_threads] + poll_interval = event.payload[:poll_interval] + performer_name = event.payload[:performer_name] + + info_and_stdout do + "GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads} poll_interval=#{poll_interval}." + end + end + + def scheduler_shutdown_start(_event) + info_and_stdout do + "GoodJob shutting down scheduler..." + end + end + + def scheduler_shutdown(_event) + info_and_stdout do + "GoodJob scheduler is shut down." + end + end + + def scheduler_restart_pools(_event) + info_and_stdout do + "GoodJob scheduler has restarted." + end + end + + def cleanup_preserved_jobs(event) + timestamp = event.payload[:timestamp] + deleted_records_count = event.payload[:deleted_records_count] + + info_and_stdout do + "GoodJob deleted #{deleted_records_count} preserved #{'job'.pluralize(deleted_records_count)} finished before #{timestamp}." + end + end + + private + + def logger + GoodJob.logger + end + + %w(info debug warn error fatal unknown).each do |level| + class_eval <<-METHOD, __FILE__, __LINE__ + 1 + def #{level}(progname = nil, &block) + return unless logger + + if logger.respond_to?(:tagged) + logger.tagged('GoodJob') do + logger.#{level}(progname, &block) + end + else + logger.#{level}(progname, &block) + end + end + METHOD + end + + def info_and_stdout(progname = nil, &block) + $stdout.puts yield unless ActiveSupport::Logger.logger_outputs_to?(logger, STDOUT) + + info(progname, &block) + end + + def thread_name + Thread.current.name || Thread.current.object_id + end + end +end diff --git a/lib/good_job/logging.rb b/lib/good_job/logging.rb deleted file mode 100644 index e0bf1e68e..000000000 --- a/lib/good_job/logging.rb +++ /dev/null @@ -1,70 +0,0 @@ -module GoodJob - module Logging - extend ActiveSupport::Concern - - included do - cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT)) - - def self.tag_logger(*tags) - if logger.respond_to?(:tagged) - tags.unshift "GoodJob" unless logger.formatter.current_tags.include?("GoodJob") - logger.tagged(*tags) { yield } - else - yield - end - end - end - - class LogSubscriber < ActiveSupport::LogSubscriber - def create(event) - good_job = event.payload[:good_job] - - debug do - "Created GoodJob resource with id #{good_job.id}" - end - end - - def timer_task_finished(event) - exception = event.payload[:error] - return unless exception - - error do - "ERROR: #{exception}\n #{exception.backtrace}" - end - end - - def job_finished(event) - exception = event.payload[:error] - return unless exception - - error do - "ERROR: #{exception}\n #{exception.backtrace}" - end - end - - def scheduler_start_shutdown(_event) - info do - "Shutting down scheduler..." - end - end - - def scheduler_shutdown(_event) - info do - "Scheduler is shut down." - end - end - - private - - def logger - GoodJob.logger - end - - def thread_name - Thread.current.name || Thread.current.object_id - end - end - end -end - -GoodJob::Logging::LogSubscriber.attach_to :good_job diff --git a/lib/good_job/performer.rb b/lib/good_job/performer.rb index 78f2abadf..d4ec32710 100644 --- a/lib/good_job/performer.rb +++ b/lib/good_job/performer.rb @@ -1,8 +1,11 @@ module GoodJob class Performer - def initialize(target, method_name) + attr_reader :name + + def initialize(target, method_name, name: nil) @target = target @method_name = method_name + @name = name end def next diff --git a/lib/good_job/railtie.rb b/lib/good_job/railtie.rb index 5bda3e39d..cc95229dc 100644 --- a/lib/good_job/railtie.rb +++ b/lib/good_job/railtie.rb @@ -2,6 +2,7 @@ module GoodJob class Railtie < ::Rails::Railtie initializer "good_job.logger" do ActiveSupport.on_load(:good_job) { self.logger = ::Rails.logger } + GoodJob::LogSubscriber.attach_to :good_job end end end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 9425e9b75..44acefe7d 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -33,7 +33,7 @@ def initialize(performer, timer_options: {}, pool_options: {}) def shutdown(wait: true) @_shutdown = true - ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait }) + ActiveSupport::Notifications.instrument("scheduler_shutdown_start.good_job", { wait: wait }) ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do if @timer&.running? @timer.shutdown @@ -52,8 +52,10 @@ def shutdown? end def restart(wait: true) - shutdown(wait: wait) unless shutdown? - create_pools + ActiveSupport::Notifications.instrument("scheduler_restart_pools.good_job") do + shutdown(wait: wait) unless shutdown? + create_pools + end end def create_thread @@ -92,12 +94,14 @@ def ready_worker_count private def create_pools - @pool = ThreadPoolExecutor.new(@pool_options) - return unless @timer_options[:execution_interval].positive? + ActiveSupport::Notifications.instrument("scheduler_create_pools.good_job", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do + @pool = ThreadPoolExecutor.new(@pool_options) + next unless @timer_options[:execution_interval].positive? - @timer = Concurrent::TimerTask.new(@timer_options) { create_thread } - @timer.add_observer(self, :timer_observer) - @timer.execute + @timer = Concurrent::TimerTask.new(@timer_options) { create_thread } + @timer.add_observer(self, :timer_observer) + @timer.execute + end end end end diff --git a/spec/good_job/cli_spec.rb b/spec/good_job/cli_spec.rb index 56286c31d..bba3de918 100644 --- a/spec/good_job/cli_spec.rb +++ b/spec/good_job/cli_spec.rb @@ -29,12 +29,7 @@ it 'can gracefully shut down on INT signal' do cli = described_class.new([], {}, {}) - cli_thread = Concurrent::Promises.future do - expect do - cli.start - end.to output(/finished, exiting/).to_stdout - end - + cli_thread = Concurrent::Promises.future { cli.start } sleep_until { cli.instance_variable_get(:@stop_good_job_executable) == false } Process.kill 'INT', Process.pid # Send the signal to ourselves @@ -53,10 +48,7 @@ stub_const 'ENV', ENV.to_hash.merge({ 'RAILS_MAX_THREADS' => 3, 'GOOD_JOB_MAX_THREADS' => 2 }) allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1) - expect do - cli.start - end.to output.to_stdout - + cli.start expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), pool_options: { max_threads: 4 }, timer_options: {}) end end @@ -76,7 +68,7 @@ scheduler_mock end - expect { cli.start }.to output.to_stdout + cli.start expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash)) performer_query = performer.instance_variable_get(:@target) @@ -92,7 +84,7 @@ it 'deletes finished jobs' do cli = described_class.new([], { before_seconds_ago: 24.hours.to_i }, {}) - expect { cli.cleanup_preserved_jobs }.to output(/Deleted 1 preserved job/).to_stdout + expect { cli.cleanup_preserved_jobs }.to output(/GoodJob deleted 1 preserved job/).to_stdout expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error diff --git a/spec/good_job/performer_spec.rb b/spec/good_job/performer_spec.rb new file mode 100644 index 000000000..a49a802d5 --- /dev/null +++ b/spec/good_job/performer_spec.rb @@ -0,0 +1,21 @@ +require 'rails_helper' + +RSpec.describe GoodJob::Performer do + let(:target) { double('The Target', the_method: nil) } # rubocop:disable RSpec/VerifiedDoubles + + describe '#next' do + it 'delegates to target#method_name' do + performer = described_class.new(target, :the_method) + performer.next + + expect(target).to have_received(:the_method) + end + end + + describe '#name' do + it 'is assignable' do + performer = described_class.new(target, :the_method, name: 'test-performer') + expect(performer.name).to eq 'test-performer' + end + end +end diff --git a/spec/good_job/scheduler_spec.rb b/spec/good_job/scheduler_spec.rb index d1b5c5250..290fbaa7f 100644 --- a/spec/good_job/scheduler_spec.rb +++ b/spec/good_job/scheduler_spec.rb @@ -1,7 +1,11 @@ require 'rails_helper' RSpec.describe GoodJob::Scheduler do - let(:performer) { instance_double(GoodJob::Performer, next: nil) } + let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') } + + around do |example| + expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook + end describe '#shutdown' do it 'shuts down the theadpools' do diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 4f3308609..c949a78f8 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -40,6 +40,10 @@ def perform(*args, **kwargs) let(:adapter) { GoodJob::Adapter.new } + around do |example| + expect { example.run }.to output.to_stdout # rubocop:disable RSpec/ExpectInHook + end + context 'when there are a large number of jobs' do let(:number_of_jobs) { 250 }