Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return to using executor.wrap around Scheduler execution task #99

Merged
merged 3 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ def self.perform_with_advisory_lock
result = nil
error = nil

connection_pool.with_connection do
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
# TODO: Determine why some records are fetched without an advisory lock at all
break unless good_job&.owns_advisory_lock?

result, error = good_job.perform
end
result, error = good_job.perform
end

[good_job, result, error] if good_job
Expand Down
6 changes: 5 additions & 1 deletion lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ def create_thread(state = nil)
return false unless @performer.next?(state)
end

future = Concurrent::Future.new(executor: @pool, &@performer.method(:next))
future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
future.add_observer(self, :task_observer)
future.execute

Expand Down
4 changes: 2 additions & 2 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def perform(*args, **kwargs)
let(:adapter) { GoodJob::Adapter.new }

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 250 }
let(:number_of_jobs) { 1000 }

let!(:good_jobs) do
number_of_jobs.times do |i|
Expand All @@ -53,7 +53,7 @@ def perform(*args, **kwargs)
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
scheduler = GoodJob::Scheduler.new(performer)

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }
sleep_until(max: 20, increments_of: 0.5) { GoodJob::Job.count == 0 }

rerun_jobs = {}

Expand Down
13 changes: 10 additions & 3 deletions spec/support/sleep_helper.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
module SleepHelper
TooSlowError = Class.new(StandardError)

def sleep_until(max: 5, increments_of: 0.1)
so_many = (max.to_f / increments_of).ceil.to_i

so_many.times do
break if yield
finished = catch(:finished) do
so_many.times do
throw(:finished, true) if yield

sleep increments_of
sleep increments_of
end
false
end

raise TooSlowError unless finished
end
end

Expand Down
8 changes: 4 additions & 4 deletions spec/test_app/config/puma.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@
preload_app! if ENV["PRELOAD_APP"]

before_fork do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end

on_worker_boot do
GoodJob::Scheduler.instances.each { |s| s.restart }
GoodJob.restart
end

on_worker_shutdown do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end

MAIN_PID = Process.pid
at_exit do
GoodJob::Scheduler.instances.each { |s| s.shutdown } if Process.pid == MAIN_PID
GoodJob.shutdown if Process.pid == MAIN_PID
end

# Allow puma to be restarted by `rails restart` command.
Expand Down