Skip to content

Commit

Permalink
Extract GoodJob::Capsule
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Feb 24, 2023
1 parent 616e070 commit fe1ec5b
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 119 deletions.
25 changes: 11 additions & 14 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

require "good_job/assignable_connection"
require "good_job/bulk"
require "good_job/capsule"
require "good_job/cleanup_tracker"
require "good_job/cli"
require "good_job/configuration"
Expand Down Expand Up @@ -91,6 +92,12 @@ module GoodJob
# @return [GoodJob::Configuration, nil]
mattr_accessor :configuration, default: GoodJob::Configuration.new({})

# @!attribute [rw] capsule
# @!scope class
# Global/default execution capsule for GoodJob.
# @return [GoodJob::Capsule, nil]
mattr_accessor :capsule, default: GoodJob::Capsule.new(configuration: configuration)

# Called with exception when a GoodJob thread raises an exception
# @param exception [Exception] Exception that was raised
# @return [void]
Expand All @@ -108,16 +115,15 @@ def self._on_thread_error(exception)
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any active tasks.
# * +1..+, the scheduler will wait that many seconds before stopping any remaining active tasks.
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(timeout: -1)
_shutdown_all(_executables, timeout: timeout)
_shutdown_all(Capsule.instances, timeout: timeout)
end

# Tests whether jobs have stopped executing.
# @return [Boolean] whether background threads are shut down
def self.shutdown?
_executables.all?(&:shutdown?)
Capsule.instances.all?(&:shutdown?)
end

# Stops and restarts executing jobs.
Expand All @@ -128,7 +134,7 @@ def self.shutdown?
# @param timeout [Numeric, nil] Seconds to wait for active threads to finish.
# @return [void]
def self.restart(timeout: -1)
_shutdown_all(_executables, :restart, timeout: timeout)
_shutdown_all(Capsule.instances, :restart, timeout: timeout)
end

# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager})
Expand All @@ -154,7 +160,7 @@ def self._shutdown_all(executables, method_name = :shutdown, timeout: -1)
# analyze or inspect job performance.
# If you are preserving job records this way, use this method regularly to
# destroy old records and preserve space in your database.
# @params older_than [nil,Numeric,ActiveSupport::Duration] Jobs older than this will be destroyed (default: +86400+).
# @param older_than [nil,Numeric,ActiveSupport::Duration] Jobs older than this will be destroyed (default: +86400+).
# @return [Integer] Number of job execution records and batches that were destroyed.
def self.cleanup_preserved_jobs(older_than: nil)
older_than ||= GoodJob.configuration.cleanup_preserved_jobs_before_seconds_ago
Expand Down Expand Up @@ -194,14 +200,5 @@ def self.perform_inline(queue_string = "*")
end
end

def self._executables
[].concat(
CronManager.instances,
Notifier.instances,
Poller.instances,
Scheduler.instances
)
end

ActiveSupport.run_load_hooks(:good_job, self)
end
33 changes: 10 additions & 23 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class Adapter
# -+test+: +:inline+
# - +production+ and all other environments: +:external+
#
def initialize(execution_mode: nil)
def initialize(execution_mode: nil, capsule: GoodJob.capsule)
@_execution_mode_override = execution_mode
GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
@capsule = capsule

self.class.instances << self
start_async if GoodJob.async_ready?
Expand Down Expand Up @@ -98,7 +99,7 @@ def enqueue_all(active_jobs)
state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
state[:scheduled_at] = scheduled_at if scheduled_at

executed_locally = execute_async? && @scheduler&.create_thread(state)
executed_locally = execute_async? && @capsule&.create_thread(state)
unless executed_locally
state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
Notifier.notify(state) unless state[:count].zero?
Expand Down Expand Up @@ -141,7 +142,7 @@ def enqueue_at(active_job, timestamp)
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at

executed_locally = execute_async? && @scheduler&.create_thread(job_state)
executed_locally = execute_async? && @capsule&.create_thread(job_state)
Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end

Expand All @@ -150,20 +151,13 @@ def enqueue_at(active_job, timestamp)

# Shut down the thread pool executors.
# @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads.
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
# * +nil+ trigger a shutdown but not wait for it to complete.
# * +-1+ wait until the shutdown is complete.
# * +0+ immediately shutdown and stop any threads.
# * A positive number will wait that many seconds before stopping any remaining active threads.
# @return [void]
def shutdown(timeout: :default)
timeout = if timeout == :default
GoodJob.configuration.shutdown_timeout
else
timeout
end

executables = [@notifier, @poller, @scheduler].compact
GoodJob._shutdown_all(executables, timeout: timeout)
@capsule&.shutdown(timeout: timeout)
@_async_started = false
end

Expand Down Expand Up @@ -199,14 +193,7 @@ def execute_inline?
def start_async
return unless execute_async?

@notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
@poller = GoodJob::Poller.new(poll_interval: GoodJob.configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(GoodJob.configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]

@cron_manager = GoodJob::CronManager.new(GoodJob.configuration.cron_entries, start_on_initialize: true) if GoodJob.configuration.enable_cron?

@capsule.start
@_async_started = true
end

Expand Down Expand Up @@ -234,8 +221,8 @@ def in_server_process?
end

def send_notify?(active_job)
return false unless GoodJob.configuration.enable_listen_notify
return true unless active_job.respond_to?(:good_job_notify)
return false unless GoodJob.configuration.enable_listen_notify

!(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?))
end
Expand Down
83 changes: 83 additions & 0 deletions lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# frozen_string_literal: true
module GoodJob
# A GoodJob::Capsule contains the resources necessary to execute jobs, including
# a {GoodJob::Scheduler}, {GoodJob::Poller}, {GoodJob::Notifier}, and {GoodJob::CronManager}.
# GoodJob creates a default capsule on initialization.
class Capsule
# @!attribute [r] instances
# @!scope class
# List of all instantiated Capsules in the current process.
# @return [Array<GoodJob::Capsule>, nil]
cattr_reader :instances, default: [], instance_reader: false

# @param configuration [GoodJob::Configuration] Configuration to use for this capsule.
def initialize(configuration: GoodJob.configuration)
self.class.instances << self
@configuration = configuration

@autostart = true
@running = false
@mutex = Mutex.new
end

# Start executing jobs (if not already running).
def start
return if @running

@mutex.synchronize do
return if @running

@notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify)
@poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]

@cron_manager = GoodJob::CronManager.new(@configuration.cron_entries, start_on_initialize: true) if @configuration.enable_cron?

@autostart = false
@running = true
end
end

# Shut down the thread pool executors.
# @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads.
# * +-1+ will wait for all active threads to complete.
# * +0+ will interrupt active threads.
# * +N+ will wait at most N seconds and then interrupt active threads.
# * +nil+ will trigger a shutdown but not wait for it to complete.
# @return [void]
def shutdown(timeout: :default)
timeout = timeout == :default ? @configuration.shutdown_timeout : timeout
GoodJob._shutdown_all([@notifier, @poller, @scheduler, @cron_manager].compact, timeout: timeout)
@autostart = false
@running = false
end

# Shutdown and then start the capsule again.
# @param timeout [nil, Numeric, Symbol] Seconds to wait for active threads.
# @return [void]
def restart(timeout: :default)
shutdown(timeout: timeout)
start
end

# @return [Boolean] Whether the capsule is currently running.
def running?
@running
end

# @return [Boolean] Whether the capsule has been shutdown.
def shutdown?
[@notifier, @poller, @scheduler, @cron_manager].compact.all?(&:shutdown?)
end

# Creates an execution thread(s) with the given attributes.
# @param job_state [Hash, nil] See {GoodJob::Scheduler#create_thread}.
# @return [Boolean, nil] Whether work was started.
def create_thread(job_state = nil)
start if !running? && @autostart
@scheduler&.create_thread(job_state)
end
end
end
14 changes: 4 additions & 10 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,8 @@ def start

Daemon.new(pidfile: configuration.pidfile).daemonize if configuration.daemonize?

notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
poller.recipients << [scheduler, :create_thread]

cron_manager = GoodJob::CronManager.new(configuration.cron_entries, start_on_initialize: true) if configuration.enable_cron?
capsule = GoodJob::Capsule.new
capsule.start

if configuration.probe_port
probe_server = GoodJob::ProbeServer.new(port: configuration.probe_port)
Expand All @@ -115,11 +110,10 @@ def start

Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
break if @stop_good_job_executable || capsule.shutdown?
end

executors = [notifier, poller, cron_manager, scheduler].compact
GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout)
capsule.shutdown(timeout: configuration.shutdown_timeout)
probe_server&.stop
end

Expand Down
3 changes: 2 additions & 1 deletion spec/integration/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def perform(*_args, **_kwargs)
end

describe 'Async execution mode' do
let(:adapter) { GoodJob::Adapter.new execution_mode: :async_all }
let(:capsule) { GoodJob::Capsule.new(configuration: GoodJob::Configuration.new({ max_threads: 5, queue_string: '*' })) }
let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, capsule: capsule) }

it 'executes the job', skip_if_java: true do
elephant_adapter = GoodJob::Adapter.new execution_mode: :async_all
Expand Down
26 changes: 8 additions & 18 deletions spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,19 @@ def perform(succeed: true)
end

context 'when async' do
it 'triggers an execution thread and the notifier' do
it 'triggers the capsule and the notifier' do
allow(GoodJob::Execution).to receive(:enqueue).and_return(good_job)
allow(GoodJob::Notifier).to receive(:notify).with({ queue_name: 'default' })

scheduler = instance_double(GoodJob::Scheduler, shutdown: nil, create_thread: nil)
allow(GoodJob::Scheduler).to receive(:new).and_return(scheduler)

poller = instance_double(GoodJob::Poller, recipients: [], shutdown: nil)
allow(GoodJob::Poller).to receive(:new).and_return(poller)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

adapter = described_class.new(execution_mode: :async_all)
adapter.enqueue(active_job)

expect(scheduler).to have_received(:create_thread)
expect(capsule).to have_received(:start)
expect(capsule).to have_received(:create_thread)
expect(GoodJob::Notifier).to have_received(:notify)
end
end
Expand Down Expand Up @@ -201,17 +200,8 @@ def perform
let(:adapter) { described_class.new(execution_mode: :async_server) }

before do
scheduler = instance_double(GoodJob::Scheduler)
allow(GoodJob::Scheduler).to receive(:new).and_return(scheduler)

notifier = instance_double(GoodJob::Notifier, recipients: [])
allow(GoodJob::Notifier).to receive(:new).and_return(notifier)

poller = instance_double(GoodJob::Poller, recipients: [])
allow(GoodJob::Poller).to receive(:new).and_return(poller)

cron_manager = instance_double(GoodJob::CronManager)
allow(GoodJob::CronManager).to receive(:new).and_return(cron_manager)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
allow(GoodJob::Capsule).to receive(:new).and_return(capsule)
end

context 'when Rails::Server is defined' do
Expand Down
69 changes: 69 additions & 0 deletions spec/lib/good_job/capsule_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# frozen_string_literal: true
require 'rails_helper'

describe GoodJob::Capsule do
describe '#initialize' do
it 'does not start' do
capsule = described_class.new
expect(capsule).not_to be_running
end
end

describe '#start' do
it 'creates execution objects' do
capsule = described_class.new
expect { capsule.start }
.to change(GoodJob::Notifier.instances, :size).by(1)
.and change(GoodJob::Scheduler.instances, :size).by(1)
.and change(GoodJob::Poller.instances, :size).by(1)
.and change(GoodJob::Poller.instances, :size).by(1)
capsule.shutdown
end

it 'is safe to call from multiple threads' do
capsule = described_class.new
Array.new(100) { Thread.new { capsule.start } }.each(&:join)
capsule.shutdown
expect(GoodJob::Scheduler.instances.size).to eq 1
end
end

describe '#shutdown' do
it 'operates if the capsule has not been started' do
capsule = described_class.new
expect { capsule.shutdown }.not_to raise_error
end
end

describe '#create_thread' do
it 'passes the job state to the scheduler' do
scheduler = instance_double(GoodJob::Scheduler, create_thread: nil, shutdown?: true, shutdown: nil)
allow(GoodJob::Scheduler).to receive(:new).and_return(scheduler)
job_state = "STATE"

capsule = described_class.new
capsule.start
capsule.create_thread(job_state)

expect(scheduler).to have_received(:create_thread).with(job_state)
end

it 'starts the capsule if it is not running' do
capsule = described_class.new
expect { capsule.create_thread }.to change(capsule, :running?).from(false).to(true)
end

it 'will not start the capsule if it has been shutdown' do
capsule = described_class.new
capsule.start
capsule.shutdown
expect { capsule.create_thread }.not_to change(capsule, :running?).from(false)
end

it 'returns nil if the capsule is not running' do
capsule = described_class.new
capsule.shutdown
expect(capsule.create_thread).to be_nil
end
end
end
Loading

0 comments on commit fe1ec5b

Please sign in to comment.