Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix counting of available execution threads #58

Merged
merged 1 commit into from
Jul 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ class Scheduler
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60,
max_queue: 0,
fallback_policy: :abort, # shouldn't matter -- 0 max queue
max_queue: -1,
fallback_policy: :discard,
}.freeze

def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
idle_threads = @pool.max_length - @pool.length
create_thread if idle_threads.positive?
create_thread
end
@timer.add_observer(self, :timer_observer)
@timer.execute
Expand Down Expand Up @@ -58,6 +57,8 @@ def shutdown?
end

def create_thread
return false unless @pool.ready_worker_count.positive?

future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
Expand All @@ -75,5 +76,17 @@ def task_observer(time, output, thread_error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
create_thread if output
end

class ThreadPoolExecutor < Concurrent::ThreadPoolExecutor
# https://github.com/ruby-concurrency/concurrent-ruby/issues/684#issuecomment-427594437
def ready_worker_count
synchronize do
workers_still_to_be_created = @max_length - @pool.length
workers_created_but_waiting = @ready.length

workers_still_to_be_created + workers_created_but_waiting
end
end
end
end
end