diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 9ec214975..a02fe2c78 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -56,13 +56,12 @@ def self.perform_with_advisory_lock result = nil error = nil - connection_pool.with_connection do - unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs| - good_job = good_jobs.first - break unless good_job + unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs| + good_job = good_jobs.first + # TODO: Determine why some records are fetched without an advisory lock at all + break unless good_job&.owns_advisory_lock? - result, error = good_job.perform - end + result, error = good_job.perform end [good_job, result, error] if good_job diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index d86d7279c..4106a285d 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -138,7 +138,11 @@ def create_thread(state = nil) return false unless @performer.next?(state) end - future = Concurrent::Future.new(executor: @pool, &@performer.method(:next)) + future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| + output = nil + Rails.application.executor.wrap { output = performer.next } + output + end future.add_observer(self, :task_observer) future.execute diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 4f3308609..24d7ebce9 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -41,7 +41,7 @@ def perform(*args, **kwargs) let(:adapter) { GoodJob::Adapter.new } context 'when there are a large number of jobs' do - let(:number_of_jobs) { 250 } + let(:number_of_jobs) { 1000 } let!(:good_jobs) do number_of_jobs.times do |i| @@ -53,7 +53,7 @@ def perform(*args, **kwargs) performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock) scheduler = GoodJob::Scheduler.new(performer) - sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 } + sleep_until(max: 20, increments_of: 0.5) { GoodJob::Job.count == 0 } rerun_jobs = {} diff --git a/spec/support/sleep_helper.rb b/spec/support/sleep_helper.rb index 2941e9893..6f41eabf7 100644 --- a/spec/support/sleep_helper.rb +++ b/spec/support/sleep_helper.rb @@ -1,12 +1,19 @@ module SleepHelper + TooSlowError = Class.new(StandardError) + def sleep_until(max: 5, increments_of: 0.1) so_many = (max.to_f / increments_of).ceil.to_i - so_many.times do - break if yield + finished = catch(:finished) do + so_many.times do + throw(:finished, true) if yield - sleep increments_of + sleep increments_of + end + false end + + raise TooSlowError unless finished end end diff --git a/spec/test_app/config/puma.rb b/spec/test_app/config/puma.rb index f13b3ea6d..a44965701 100644 --- a/spec/test_app/config/puma.rb +++ b/spec/test_app/config/puma.rb @@ -35,20 +35,20 @@ preload_app! if ENV["PRELOAD_APP"] before_fork do - GoodJob::Scheduler.instances.each { |s| s.shutdown } + GoodJob.shutdown end on_worker_boot do - GoodJob::Scheduler.instances.each { |s| s.restart } + GoodJob.restart end on_worker_shutdown do - GoodJob::Scheduler.instances.each { |s| s.shutdown } + GoodJob.shutdown end MAIN_PID = Process.pid at_exit do - GoodJob::Scheduler.instances.each { |s| s.shutdown } if Process.pid == MAIN_PID + GoodJob.shutdown if Process.pid == MAIN_PID end # Allow puma to be restarted by `rails restart` command.