Skip to content

Commit

Permalink
Extract Scheduler polling behavior to its own object
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Sep 18, 2020
1 parent 9b187e9 commit 75bc5e1
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 66 deletions.
3 changes: 3 additions & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval

if @execution_mode == :async # rubocop:disable Style/GuardClause
@notifier = notifier || GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@scheduler = scheduler || GoodJob::Scheduler.from_configuration(configuration)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
end
end

Expand Down Expand Up @@ -88,6 +90,7 @@ def enqueue_at(active_job, timestamp)
# @return [void]
def shutdown(wait: true)
@notifier&.shutdown(wait: wait)
@poller&.shutdown(wait: wait)
@scheduler&.shutdown(wait: wait)
end

Expand Down
5 changes: 4 additions & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ class CLI < Thor
desc: "Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1)"
def start
set_up_application!
configuration = GoodJob::Configuration.new(options)

notifier = GoodJob::Notifier.new
configuration = GoodJob::Configuration.new(options)
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

@stop_good_job_executable = false
%w[INT TERM].each do |signal|
Expand All @@ -62,6 +64,7 @@ def start
end

notifier.shutdown
poller.shutdown
scheduler.shutdown
end

Expand Down
5 changes: 2 additions & 3 deletions lib/good_job/log_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ def job_finished(event)
end

# @macro notification_responder
def scheduler_create_pools(event)
def scheduler_create_pool(event)
max_threads = event.payload[:max_threads]
poll_interval = event.payload[:poll_interval]
performer_name = event.payload[:performer_name]
process_id = event.payload[:process_id]

info(tags: [process_id]) do
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads} poll_interval=#{poll_interval}."
"GoodJob started scheduler with queues=#{performer_name} max_threads=#{max_threads}."
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def restart(wait: true)
# If +wait+ is +true+, the notifier will wait for background thread to shutdown.
# If +wait+ is +false+, this method will return immediately even though threads may still be running.
# Use {#shutdown?} to determine whether threads have stopped.
# @param wait [Boolean] Wait for actively executing jobs to finish
# @param wait [Boolean] Wait for actively executing threads to finish
# @return [void]
def shutdown(wait: true)
return unless @pool.running?
Expand Down
95 changes: 95 additions & 0 deletions lib/good_job/poller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require 'concurrent/atomic/atomic_boolean'

module GoodJob # :nodoc:
#
# Pollers regularly wake up execution threads to check for new work.
#
class Poller
# Default poll interval.
DEFAULT_POLL_INTERVAL = 1

# Defaults for instance of Concurrent::TimerTask.
# The timer controls how and when sleeping threads check for new work.
DEFAULT_TIMER_OPTIONS = {
execution_interval: DEFAULT_POLL_INTERVAL,
timeout_interval: 1,
run_now: true,
}.freeze

# @!attribute [r] instances
# @!scope class
# List of all instantiated Pollers in the current process.
# @return [array<GoodJob:Poller>]
cattr_reader :instances, default: [], instance_reader: false

def self.from_configuration(configuration)
GoodJob::Poller.new(poll_interval: configuration.poll_interval)
end

# List of recipients that will receive notifications.
# @return [Array<#call, Array(Object, Symbol)>]
attr_reader :recipients

# @param recipients [Array<#call, Array(Object, Symbol)>]
# @param timer_options [Hash] Options to instantiate a Concurrent::TimerTask
def initialize(*recipients, poll_interval: DEFAULT_POLL_INTERVAL)
@recipients = Concurrent::Array.new(recipients)
@timer_options = DEFAULT_TIMER_OPTIONS.merge(execution_interval: poll_interval)

self.class.instances << self

create_pool
end

# Shut down the poller.
# If +wait+ is +true+, the poller will wait for background thread to shutdown.
# If +wait+ is +false+, this method will return immediately even though threads may still be running.
# Use {#shutdown?} to determine whether threads have stopped.
# @param wait [Boolean] Wait for actively executing threads to finish
# @return [void]
def shutdown(wait: true)
return unless @timer&.running?

@timer.shutdown
@timer.wait_for_termination if wait
end

# Tests whether the poller is shutdown.
# @return [true, false, nil]
def shutdown?
!@timer&.running?
end

# Restart the poller.
# When shutdown, start; or shutdown and start.
# @param wait [Boolean] Wait for background thread to finish
# @return [void]
def restart(wait: true)
shutdown(wait: wait)
create_pool
end

# Invoked on completion of TimerTask task.
# @!visibility private
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end

private

def create_pool
return if @timer_options[:execution_interval] <= 0

@timer = Concurrent::TimerTask.new(@timer_options) do
recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name)
end
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
end
51 changes: 12 additions & 39 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class Scheduler
cattr_reader :instances, default: [], instance_reader: false

# Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
# TODO: move this to GoodJob::Configuration
# @param configuration [GoodJob::Configuration]
# @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
def self.from_configuration(configuration)
Expand All @@ -62,7 +61,7 @@ def self.from_configuration(configuration)
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

GoodJob::Scheduler.new(job_performer, max_threads: max_threads, poll_interval: configuration.poll_interval)
GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
end

if schedulers.size > 1
Expand All @@ -74,22 +73,18 @@ def self.from_configuration(configuration)

# @param performer [GoodJob::Performer]
# @param max_threads [Numeric, nil] the number of execution threads to use
# @param poll_interval [Numeric, nil] the number of seconds between polls for jobs
def initialize(performer, max_threads: nil, poll_interval: nil)
def initialize(performer, max_threads: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

self.class.instances << self

@performer = performer

@timer_options = DEFAULT_TIMER_OPTIONS.dup
@timer_options[:execution_interval] = poll_interval if poll_interval.present?

@pool_options = DEFAULT_POOL_OPTIONS.dup
@pool_options[:max_threads] = max_threads if max_threads.present?
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]} poll_interval=#{@timer_options[:execution_interval]})"
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"

create_pools
create_pool
end

# Shut down the scheduler.
Expand All @@ -100,28 +95,20 @@ def initialize(performer, max_threads: nil, poll_interval: nil)
# @param wait [Boolean] Wait for actively executing jobs to finish
# @return [void]
def shutdown(wait: true)
@_shutdown = true
return unless @pool&.running?

instrument("scheduler_shutdown_start", { wait: wait })
instrument("scheduler_shutdown", { wait: wait }) do
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end

if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end
@pool.shutdown
@pool.wait_for_termination if wait
# TODO: Should be killed if wait is not true
end
end

# Tests whether the scheduler is shutdown.
# @return [true, false, nil]
def shutdown?
@_shutdown
!@pool&.running?
end

# Restart the Scheduler.
Expand All @@ -131,8 +118,7 @@ def shutdown?
def restart(wait: true)
instrument("scheduler_restart_pools") do
shutdown(wait: wait) unless shutdown?
create_pools
@_shutdown = false
create_pool
end
end

Expand All @@ -157,14 +143,6 @@ def create_thread(state = nil)
true
end

# Invoked on completion of TimerTask task.
# @!visibility private
# @return [void]
def timer_observer(time, executed_task, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_timer_task", { result: executed_task, error: thread_error, time: time })
end

# Invoked on completion of ThreadPoolExecutor task
# @!visibility private
# @return [void]
Expand All @@ -176,14 +154,9 @@ def task_observer(time, output, thread_error)

private

def create_pools
instrument("scheduler_create_pools", { performer_name: @performer.name, max_threads: @pool_options[:max_threads], poll_interval: @timer_options[:execution_interval] }) do
def create_pool
instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do
@pool = ThreadPoolExecutor.new(@pool_options)
next unless @timer_options[:execution_interval].positive?

@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end

Expand Down
6 changes: 5 additions & 1 deletion spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ def perform(*args, **kwargs)
end

it 'pops items off of the queue and runs them' do
max_threads = 5

performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
scheduler = GoodJob::Scheduler.new(performer)
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
max_threads.times { scheduler.create_thread }

sleep_until(max: 20, increments_of: 0.5) { GoodJob::Job.count == 0 }

Expand Down Expand Up @@ -87,6 +90,7 @@ def perform(*args, **kwargs)
it "handles and retries jobs with errors" do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
scheduler = GoodJob::Scheduler.new(performer)
scheduler.create_thread

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)

scheduler = instance_double(GoodJob::Scheduler, shutdown: nil, create_thread: nil)
adapter = described_class.new(execution_mode: :async, scheduler: scheduler)
adapter = described_class.new(execution_mode: :async, scheduler: scheduler, poll_interval: -1)

adapter.enqueue(active_job)

Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

cli.start
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4, poll_interval: 1)
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), max_threads: 4)
end
end

Expand Down
36 changes: 36 additions & 0 deletions spec/lib/good_job/poller_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'rails_helper'

RSpec.describe GoodJob::Poller do
describe '.instances' do
it 'contains all registered instances' do
poller = nil
expect do
poller = described_class.new
end.to change { described_class.instances.size }.by(1)

expect(described_class.instances).to include poller
end
end

describe '#initialize' do
it 'accepts a zero or negative poll_interval to disable TimerTask' do
poller = described_class.new(poll_interval: 0)

expect(poller.instance_variable_get(:@task)).to be_nil
end
end

describe '#recipients' do
it 'polls recipients method' do
stub_const 'POLL_COUNT', Concurrent::AtomicFixnum.new(0)

recipient = proc { |_payload| POLL_COUNT.increment }

poller = described_class.new(recipient)
sleep_until(max: 5, increments_of: 0.5) { POLL_COUNT.value > 2 }
poller.shutdown

expect(POLL_COUNT.value).to be > 2
end
end
end
Loading

0 comments on commit 75bc5e1

Please sign in to comment.