From 14310e01b919e76705d1fdb1416cebd7d606a0ef Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Sat, 16 Jan 2021 09:31:58 -0800 Subject: [PATCH] Replace Performer with JobPerformer to clearly defer references to GoodJob::Job Necessary because there is a circular dependency in Rails 6.1 when ActiveJob initializes GoodJob, which references ActiveRecord, which tries to initialize ActiveJob (for the destroy associations async feature) --- lib/good_job/job_performer.rb | 61 +++++++++++++++++++++++++ lib/good_job/performer.rb | 60 ------------------------ lib/good_job/scheduler.rb | 16 +------ spec/integration/scheduler_spec.rb | 4 +- spec/lib/good_job/cli_spec.rb | 6 +-- spec/lib/good_job/job_performer_spec.rb | 40 ++++++++++++++++ spec/lib/good_job/performer_spec.rb | 33 ------------- spec/lib/good_job/scheduler_spec.rb | 2 +- 8 files changed, 109 insertions(+), 113 deletions(-) create mode 100644 lib/good_job/job_performer.rb delete mode 100644 lib/good_job/performer.rb create mode 100644 spec/lib/good_job/job_performer_spec.rb delete mode 100644 spec/lib/good_job/performer_spec.rb diff --git a/lib/good_job/job_performer.rb b/lib/good_job/job_performer.rb new file mode 100644 index 000000000..1a2ac056c --- /dev/null +++ b/lib/good_job/job_performer.rb @@ -0,0 +1,61 @@ +module GoodJob + # + # JobPerformer queries the database for jobs and performs them on behalf of a + # {Scheduler}. It mainly functions as glue between a {Scheduler} and the jobs + # it should be executing. + # + # The JobPerformer must be safe to execute across multiple threads. + # + class JobPerformer + # @param queue_string [String] Queues to execute jobs from + def initialize(queue_string) + @queue_string = queue_string + + @job_query = Concurrent::Delay.new { GoodJob::Job.queue_string(queue_string) } + @parsed_queues = Concurrent::Delay.new { GoodJob::Job.queue_parser(queue_string) } + end + + # A meaningful name to identify the performer in logs and for debugging. + # @return [String] The queues from which Jobs are worked + def name + @queue_string + end + + # Perform the next eligible job + # @return [nil, Object] Returns job result or +nil+ if no job was found + def next + job_query.perform_with_advisory_lock + end + + # Tests whether this performer should be used in GoodJob's current state. + # + # For example, state will be a LISTEN/NOTIFY message that is passed down + # from the Notifier to the Scheduler. The Scheduler is able to ask + # its performer "does this message relate to you?", and if not, ignore it + # to minimize thread wake-ups, database queries, and thundering herds. + # + # @return [Boolean] whether the performer's {#next} method should be + # called in the current state. + def next?(state = {}) + if parsed_queues[:exclude] + parsed_queues[:exclude].exclude?(state[:queue_name]) + elsif parsed_queues[:include] + parsed_queues[:include].include?(state[:queue_name]) + else + true + end + end + + private + + attr_reader :queue_string + + def job_query + @job_query.value + end + + def parsed_queues + @parsed_queues.value + end + end +end diff --git a/lib/good_job/performer.rb b/lib/good_job/performer.rb deleted file mode 100644 index a24d87026..000000000 --- a/lib/good_job/performer.rb +++ /dev/null @@ -1,60 +0,0 @@ -module GoodJob - # - # Performer queries the database for jobs and performs them on behalf of a - # {Scheduler}. It mainly functions as glue between a {Scheduler} and the jobs - # it should be executing. - # - # The Performer enforces a callable that does not rely on scoped/closure - # variables because they might not be available when executed in a different - # thread. - # - class Performer - # @!attribute [r] name - # @return [String] - # a meaningful name to identify the performer in logs and for debugging. - # This is usually set to the list of queues the performer will query, - # e.g. +"-transactional_messages,batch_processing"+. - attr_reader :name - - # @param target [Object] - # An object that can perform jobs. It must respond to +method_name+ by - # finding and performing jobs and is usually a {Job} query, - # e.g. +GoodJob::Job.where(queue_name: ['queue1', 'queue2'])+. - # @param method_name [Symbol] - # The name of a method on +target+ that finds and performs jobs. - # @param name [String] - # A name for the performer to be used in logs and for debugging. - # @param filter [#call] - # Used to determine whether the performer should be used in GoodJob's - # current state. GoodJob state is a +Hash+ that will be passed as the - # first argument to +filter+ and includes info like the current queue. - def initialize(target, method_name, name: nil, filter: nil) - @target = target - @method_name = method_name - @name = name - @filter = filter - end - - # Find and perform any eligible jobs. - def next - @target.public_send(@method_name) - end - - # Tests whether this performer should be used in GoodJob's current state by - # calling the +filter+ callable set in {#initialize}. Always returns +true+ - # if there is no filter. - # - # For example, state will be a LISTEN/NOTIFY message that is passed down - # from the Notifier to the Scheduler. The Scheduler is able to ask - # its performer "does this message relate to you?", and if not, ignore it - # to minimize thread wake-ups, database queries, and thundering herds. - # - # @return [Boolean] whether the performer's {#next} method should be - # called in the current state. - def next?(state = {}) - return true unless @filter.respond_to?(:call) - - @filter.call(state) - end - end -end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index de7ea0c91..38706d20d 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -40,19 +40,7 @@ def self.from_configuration(configuration) queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i - job_query = GoodJob::Job.queue_string(queue_string) - parsed = GoodJob::Job.queue_parser(queue_string) - job_filter = proc do |state| - if parsed[:exclude] - parsed[:exclude].exclude?(state[:queue_name]) - elsif parsed[:include] - parsed[:include].include? state[:queue_name] - else - true - end - end - job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) - + job_performer = GoodJob::JobPerformer.new(queue_string) GoodJob::Scheduler.new(job_performer, max_threads: max_threads) end @@ -63,7 +51,7 @@ def self.from_configuration(configuration) end end - # @param performer [GoodJob::Performer] + # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs def initialize(performer, max_threads: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 7f20349c4..1e0eb6c3c 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -51,7 +51,7 @@ def perform(*args, **kwargs) end it 'pops items off of the queue and runs them' do - performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock) + performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads) max_threads.times { scheduler.create_thread } @@ -76,7 +76,7 @@ def perform(*args, **kwargs) let!(:jobs) { ErrorJob.perform_later } it "handles and retries jobs with errors" do - performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock) + performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer) scheduler.create_thread diff --git a/spec/lib/good_job/cli_spec.rb b/spec/lib/good_job/cli_spec.rb index c5e723d6e..30e513769 100644 --- a/spec/lib/good_job/cli_spec.rb +++ b/spec/lib/good_job/cli_spec.rb @@ -44,7 +44,7 @@ allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1) cli.start - expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4) + expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::JobPerformer), max_threads: 4) end end @@ -64,9 +64,9 @@ end cli.start - expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash)) + expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::JobPerformer), a_kind_of(Hash)) - performer_query = performer.instance_variable_get(:@target) + performer_query = performer.send(:job_query) expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).to_sql end end diff --git a/spec/lib/good_job/job_performer_spec.rb b/spec/lib/good_job/job_performer_spec.rb new file mode 100644 index 000000000..38fc49f07 --- /dev/null +++ b/spec/lib/good_job/job_performer_spec.rb @@ -0,0 +1,40 @@ +require 'rails_helper' + +RSpec.describe GoodJob::JobPerformer do + describe '#name' do + it 'displays the queues' do + queue_string = 'mice, elephants' + job_performer = described_class.new(queue_string) + expect(job_performer.name).to eq queue_string + end + end + + describe '#next' do + it 'calls GoodJob.perform_with_advisory_lock' do + allow(GoodJob::Job).to receive(:perform_with_advisory_lock) + + job_performer = described_class.new('*') + job_performer.next + + expect(GoodJob::Job).to have_received(:perform_with_advisory_lock) + end + end + + describe 'next?' do + it 'filters on queue name' do + state = { queue_name: 'elephants' } + + result = described_class.new('*').next?(state) + expect(result).to eq true + + result = described_class.new('elephants').next?(state) + expect(result).to eq true + + result = described_class.new('-elephants').next?(state) + expect(result).to eq false + + result = described_class.new('mice').next?(state) + expect(result).to eq false + end + end +end diff --git a/spec/lib/good_job/performer_spec.rb b/spec/lib/good_job/performer_spec.rb deleted file mode 100644 index 04050dbfe..000000000 --- a/spec/lib/good_job/performer_spec.rb +++ /dev/null @@ -1,33 +0,0 @@ -require 'rails_helper' - -RSpec.describe GoodJob::Performer do - subject(:performer) { described_class.new(target, :the_method) } - - let(:target) { double('The Target', the_method: nil) } # rubocop:disable RSpec/VerifiedDoubles - - describe '#next' do - it 'delegates to target#method_name' do - performer.next - expect(target).to have_received(:the_method) - end - end - - describe '#next?' do - it 'defaults to true' do - expect(performer.next?).to eq true - end - - it 'returns the result of the filter and state' do - filter = ->(state) { "more #{state}" } - performer = described_class.new(target, :the_method, filter: filter) - expect(performer.next?("state")).to eq "more state" - end - end - - describe '#name' do - it 'is assignable' do - performer = described_class.new(target, :the_method, name: 'test-performer') - expect(performer.name).to eq 'test-performer' - end - end -end diff --git a/spec/lib/good_job/scheduler_spec.rb b/spec/lib/good_job/scheduler_spec.rb index 2d6880d4b..caf400959 100644 --- a/spec/lib/good_job/scheduler_spec.rb +++ b/spec/lib/good_job/scheduler_spec.rb @@ -1,7 +1,7 @@ require 'rails_helper' RSpec.describe GoodJob::Scheduler do - let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') } + let(:performer) { instance_double(GoodJob::JobPerformer, next: nil, name: '') } after do described_class.instances.each(&:shutdown)