Skip to content

Commit

Permalink
Replace Performer with JobPerformer to clearly defer references to Go…
Browse files Browse the repository at this point in the history
…odJob::Job

Necessary because there is a circular dependency in Rails 6.1 when ActiveJob initializes GoodJob, which references ActiveRecord, which tries to initialize ActiveJob (for the destroy associations async feature)
  • Loading branch information
bensheldon committed Jan 16, 2021
1 parent 13a059e commit 14310e0
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 113 deletions.
61 changes: 61 additions & 0 deletions lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
module GoodJob
#
# JobPerformer queries the database for jobs and performs them on behalf of a
# {Scheduler}. It mainly functions as glue between a {Scheduler} and the jobs
# it should be executing.
#
# The JobPerformer must be safe to execute across multiple threads.
#
class JobPerformer
# @param queue_string [String] Queues to execute jobs from
def initialize(queue_string)
@queue_string = queue_string

@job_query = Concurrent::Delay.new { GoodJob::Job.queue_string(queue_string) }
@parsed_queues = Concurrent::Delay.new { GoodJob::Job.queue_parser(queue_string) }
end

# A meaningful name to identify the performer in logs and for debugging.
# @return [String] The queues from which Jobs are worked
def name
@queue_string
end

# Perform the next eligible job
# @return [nil, Object] Returns job result or +nil+ if no job was found
def next
job_query.perform_with_advisory_lock
end

# Tests whether this performer should be used in GoodJob's current state.
#
# For example, state will be a LISTEN/NOTIFY message that is passed down
# from the Notifier to the Scheduler. The Scheduler is able to ask
# its performer "does this message relate to you?", and if not, ignore it
# to minimize thread wake-ups, database queries, and thundering herds.
#
# @return [Boolean] whether the performer's {#next} method should be
# called in the current state.
def next?(state = {})
if parsed_queues[:exclude]
parsed_queues[:exclude].exclude?(state[:queue_name])
elsif parsed_queues[:include]
parsed_queues[:include].include?(state[:queue_name])
else
true
end
end

private

attr_reader :queue_string

def job_query
@job_query.value
end

def parsed_queues
@parsed_queues.value
end
end
end
60 changes: 0 additions & 60 deletions lib/good_job/performer.rb

This file was deleted.

16 changes: 2 additions & 14 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,7 @@ def self.from_configuration(configuration)
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i

job_query = GoodJob::Job.queue_string(queue_string)
parsed = GoodJob::Job.queue_parser(queue_string)
job_filter = proc do |state|
if parsed[:exclude]
parsed[:exclude].exclude?(state[:queue_name])
elsif parsed[:include]
parsed[:include].include? state[:queue_name]
else
true
end
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

job_performer = GoodJob::JobPerformer.new(queue_string)
GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
end

Expand All @@ -63,7 +51,7 @@ def self.from_configuration(configuration)
end
end

# @param performer [GoodJob::Performer]
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
def initialize(performer, max_threads: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def perform(*args, **kwargs)
end

it 'pops items off of the queue and runs them' do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
max_threads.times { scheduler.create_thread }

Expand All @@ -76,7 +76,7 @@ def perform(*args, **kwargs)
let!(:jobs) { ErrorJob.perform_later }

it "handles and retries jobs with errors" do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer)
scheduler.create_thread

Expand Down
6 changes: 3 additions & 3 deletions spec/lib/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4)
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::JobPerformer), max_threads: 4)
end
end

Expand All @@ -64,9 +64,9 @@
end

cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash))
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::JobPerformer), a_kind_of(Hash))

performer_query = performer.instance_variable_get(:@target)
performer_query = performer.send(:job_query)
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).to_sql
end
end
Expand Down
40 changes: 40 additions & 0 deletions spec/lib/good_job/job_performer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
require 'rails_helper'

RSpec.describe GoodJob::JobPerformer do
describe '#name' do
it 'displays the queues' do
queue_string = 'mice, elephants'
job_performer = described_class.new(queue_string)
expect(job_performer.name).to eq queue_string
end
end

describe '#next' do
it 'calls GoodJob.perform_with_advisory_lock' do
allow(GoodJob::Job).to receive(:perform_with_advisory_lock)

job_performer = described_class.new('*')
job_performer.next

expect(GoodJob::Job).to have_received(:perform_with_advisory_lock)
end
end

describe 'next?' do
it 'filters on queue name' do
state = { queue_name: 'elephants' }

result = described_class.new('*').next?(state)
expect(result).to eq true

result = described_class.new('elephants').next?(state)
expect(result).to eq true

result = described_class.new('-elephants').next?(state)
expect(result).to eq false

result = described_class.new('mice').next?(state)
expect(result).to eq false
end
end
end
33 changes: 0 additions & 33 deletions spec/lib/good_job/performer_spec.rb

This file was deleted.

2 changes: 1 addition & 1 deletion spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'rails_helper'

RSpec.describe GoodJob::Scheduler do
let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') }
let(:performer) { instance_double(GoodJob::JobPerformer, next: nil, name: '') }

after do
described_class.instances.each(&:shutdown)
Expand Down

0 comments on commit 14310e0

Please sign in to comment.