Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Scheduler polling behavior to its own object #152

Merged
merged 1 commit into from
Oct 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval

if @execution_mode == :async # rubocop:disable Style/GuardClause
@notifier = notifier || GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@scheduler = scheduler || GoodJob::Scheduler.from_configuration(configuration)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
end
end

Expand Down Expand Up @@ -88,6 +90,7 @@ def enqueue_at(active_job, timestamp)
# @return [void]
def shutdown(wait: true)
@notifier&.shutdown(wait: wait)
@poller&.shutdown(wait: wait)
@scheduler&.shutdown(wait: wait)
end

Expand Down
5 changes: 4 additions & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ class CLI < Thor
desc: "Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 5)"
def start
set_up_application!
configuration = GoodJob::Configuration.new(options)

notifier = GoodJob::Notifier.new
configuration = GoodJob::Configuration.new(options)
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

@stop_good_job_executable = false
%w[INT TERM].each do |signal|
Expand All @@ -62,6 +64,7 @@ def start
end

notifier.shutdown
poller.shutdown
scheduler.shutdown
end

Expand Down
5 changes: 2 additions & 3 deletions lib/good_job/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,13 @@ def finished_job_task(event)
end

# @macro notification_responder
def scheduler_create_pools(event)
def scheduler_create_pool(event)
max_threads = event.payload[:max_threads]
poll_interval = event.payload[:poll_interval]
performer_name = event.payload[:performer_name]
process_id = event.payload[:process_id]

info(tags: [process_id]) do
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads} poll_interval=#{poll_interval}."
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads}."
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def restart(wait: true)
# If +wait+ is +true+, the notifier will wait for background thread to shutdown.
# If +wait+ is +false+, this method will return immediately even though threads may still be running.
# Use {#shutdown?} to determine whether threads have stopped.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @param wait [Boolean] Wait for actively executing threads to finish
# @return [void]
def shutdown(wait: true)
return unless @pool.running?
Expand Down
94 changes: 94 additions & 0 deletions lib/good_job/poller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
require 'concurrent/atomic/atomic_boolean'

module GoodJob # :nodoc:
#
# Pollers regularly wake up execution threads to check for new work.
#
class Poller
# Defaults for instance of Concurrent::TimerTask.
# The timer controls how and when sleeping threads check for new work.
DEFAULT_TIMER_OPTIONS = {
execution_interval: Configuration::DEFAULT_POLL_INTERVAL,
timeout_interval: 1,
run_now: true,
}.freeze

# @!attribute [r] instances
# @!scope class
# List of all instantiated Pollers in the current process.
# @return [array<GoodJob:Poller>]
cattr_reader :instances, default: [], instance_reader: false

def self.from_configuration(configuration)
GoodJob::Poller.new(poll_interval: configuration.poll_interval)
end

# List of recipients that will receive notifications.
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients

# @param recipients [Array<#call, Array(Object, Symbol)>]
# @param poll_interval [Hash] number of seconds between polls
def initialize(*recipients, poll_interval: nil)
@recipients = Concurrent::Array.new(recipients)

@timer_options = DEFAULT_TIMER_OPTIONS.dup
@timer_options[:execution_interval] = poll_interval if poll_interval.present?

self.class.instances << self

create_pool
end

# Shut down the poller.
# If +wait+ is +true+, the poller will wait for background thread to shutdown.
# If +wait+ is +false+, this method will return immediately even though threads may still be running.
# Use {#shutdown?} to determine whether threads have stopped.
# @param wait [Boolean] Wait for actively executing threads to finish
# @return [void]
def shutdown(wait: true)
return unless @timer&.running?

@timer.shutdown
@timer.wait_for_termination if wait
end

# Tests whether the poller is shutdown.
# @return [true, false, nil]
def shutdown?
!@timer&.running?
end

# Restart the poller.
# When shutdown, start; or shutdown and start.
# @param wait [Boolean] Wait for background thread to finish
# @return [void]
def restart(wait: true)
shutdown(wait: wait)
create_pool
end

# Invoked on completion of TimerTask task.
# @!visibility private
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end

private

def create_pool
return if @timer_options[:execution_interval] <= 0

@timer = Concurrent::TimerTask.new(@timer_options) do
recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name)
end
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
end
61 changes: 13 additions & 48 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,13 @@ class Scheduler
fallback_policy: :discard,
}.freeze

# Defaults for instance of Concurrent::TimerTask.
# The timer controls how and when sleeping threads check for new work.
DEFAULT_TIMER_OPTIONS = {
execution_interval: Configuration::DEFAULT_POLL_INTERVAL,
timeout_interval: 1,
run_now: true,
}.freeze

# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
# @return [array<GoodJob:Scheduler>]
cattr_reader :instances, default: [], instance_reader: false

# Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
# TODO: move this to GoodJob::Configuration
# @param configuration [GoodJob::Configuration]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration)
Expand All @@ -62,7 +53,7 @@ def self.from_configuration(configuration)
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

GoodJob::Scheduler.new(job_performer, max_threads: max_threads, poll_interval: configuration.poll_interval)
GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
end

if schedulers.size > 1
Expand All @@ -73,23 +64,19 @@ def self.from_configuration(configuration)
end

# @param performer [GoodJob::Performer]
# @param max_threads [Numeric, nil] the number of execution threads to use
# @param poll_interval [Numeric, nil] the number of seconds between polls for jobs
def initialize(performer, max_threads: nil, poll_interval: nil)
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
def initialize(performer, max_threads: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

self.class.instances << self

@performer = performer

@timer_options = DEFAULT_TIMER_OPTIONS.dup
@timer_options[:execution_interval] = poll_interval if poll_interval.present?

@pool_options = DEFAULT_POOL_OPTIONS.dup
@pool_options[:max_threads] = max_threads if max_threads.present?
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"

create_pools
create_pool
end

# Shut down the scheduler.
Expand All @@ -100,28 +87,20 @@ def initialize(performer, max_threads: nil, poll_interval: nil)
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def shutdown(wait: true)
@_shutdown = true
return unless @pool&.running?

instrument("scheduler_shutdown_start", { wait: wait })
instrument("scheduler_shutdown", { wait: wait }) do
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end

if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end
@pool.shutdown
@pool.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end
end

# Tests whether the scheduler is shutdown.
# @return [true, false, nil]
def shutdown?
@_shutdown
!@pool&.running?
end

# Restart the Scheduler.
Expand All @@ -131,8 +110,7 @@ def shutdown?
def restart(wait: true)
instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pools
@_shutdown = false
create_pool
end
end

Expand All @@ -157,14 +135,6 @@ def create_thread(state = nil)
true
end

# Invoked on completion of TimerTask task.
# @!visibility private
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end

# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
Expand All @@ -176,14 +146,9 @@ def task_observer(time, output, thread_error)

private

def create_pools
instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
def create_pool
instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?

@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end

Expand Down
6 changes: 5 additions & 1 deletion spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ def perform(*args, **kwargs)
end

it 'pops items off of the queue and runs them' do
max_threads = 5

performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
scheduler = GoodJob::Scheduler.new(performer)
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
max_threads.times { scheduler.create_thread }

sleep_until(max: 20, increments_of: 0.5) { GoodJob::Job.count == 0 }

Expand Down Expand Up @@ -87,6 +90,7 @@ def perform(*args, **kwargs)
it "handles and retries jobs with errors" do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
scheduler = GoodJob::Scheduler.new(performer)
scheduler.create_thread

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)

scheduler = instance_double(GoodJob::Scheduler, shutdown: nil, create_thread: nil)
adapter = described_class.new(execution_mode: :async, scheduler: scheduler)
adapter = described_class.new(execution_mode: :async, scheduler: scheduler, poll_interval: -1)

adapter.enqueue(active_job)

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4, poll_interval: 5)
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4)
end
end

Expand Down
36 changes: 36 additions & 0 deletions spec/lib/good_job/poller_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'rails_helper'

RSpec.describe GoodJob::Poller do
describe '.instances' do
it 'contains all registered instances' do
poller = nil
expect do
poller = described_class.new
end.to change { described_class.instances.size }.by(1)

expect(described_class.instances).to include poller
end
end

describe '#initialize' do
it 'accepts a zero or negative poll_interval to disable TimerTask' do
poller = described_class.new(poll_interval: 0)

expect(poller.instance_variable_get(:@task)).to be_nil
end
end

describe '#recipients' do
it 'polls recipients method' do
stub_const 'POLL_COUNT', Concurrent::AtomicFixnum.new(0)

recipient = proc { |_payload| POLL_COUNT.increment }

poller = described_class.new(recipient, poll_interval: 1)
sleep_until(max: 5, increments_of: 0.5) { POLL_COUNT.value > 2 }
poller.shutdown

expect(POLL_COUNT.value).to be > 2
end
end
end
Loading