Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure GoodJob via Rails.application.config instead of recommending GoodJob::Adapter.new #199

Merged
merged 2 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Command-line options](#command-line-options)
- [`good_job start`](#good_job-start)
- [`good_job cleanup_preserved_jobs`](#good_job-cleanup_preserved_jobs)
- [Adapter options](#adapter-options)
- [Global options](#global-options)
- [Configuration options](#configuration-options)
- [Global options](#global-options)pter
- [Dashboard](#dashboard)
- [Go deeper](#go-deeper)
- [Exceptions, retries, and reliability](#exceptions-retries-and-reliability)
Expand Down Expand Up @@ -189,9 +189,31 @@ If you are preserving job records this way, use this command regularly
to delete old records and preserve space in your database.
```

### Adapter options
### Configuration options

To use GoodJob, you can set `config.active_job.queue_adapter` to a `:good_job` or to an instance of `GoodJob::Adapter`, which you can configure with several options:
To use GoodJob, you can set `config.active_job.queue_adapter` to a `:good_job`.

Additional configuration can be provided via `config.good_job.OPTION = ...` for example:

```ruby
# config/application.rb

config.active_job.queue_adapter = :good_job

# Configure options individually...
config.good_job.execution_mode = :async
config.good_job.max_threads = 5
config.good_job.poll_interval = 30 # seconds

# ...or all at once.
config.good_job = {
execution_mode: :async,
max_threads: 5,
poll_interval: 30,
}
```

Available configuration options are:

- `execution_mode` (symbol) specifies how and where jobs should be executed. You can also set this with the environment variable `GOOD_JOB_EXECUTION_MODE`. It can be any one of:
- `:inline` executes jobs immediately in whatever process queued them (usually the web server process). This should only be used in test and development environments.
Expand All @@ -201,17 +223,21 @@ To use GoodJob, you can set `config.active_job.queue_adapter` to a `:good_job` o
- `queues` (string) determines which queues to execute jobs from when `execution_mode` is set to `:async`. See the description of `good_job start` for more details on the format of this string. You can also set this with the environment variable `GOOD_JOB_QUEUES`.
- `poll_interval` (integer) sets the number of seconds between polls for jobs when `execution_mode` is set to `:async`. You can also set this with the environment variable `GOOD_JOB_POLL_INTERVAL`.

Using the symbol instead of explicitly configuring the options above (i.e. setting `config.active_job.queue_adapter = :good_job`) is equivalent to:
By default, GoodJob configures the following execution modes per environment:

```ruby

# config/environments/development.rb
config.active_job.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
config.active_job.queue_adapter = :good_job
config.good_job.execution_mode = :inline

# config/environments/test.rb
config.active_job.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
config.active_job.queue_adapter = :good_job
config.good_job.execution_mode = :inline

# config/environments/production.rb
config.active_job.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
config.active_job.queue_adapter = :good_job
config.good_job.execution_mode = :external
```

### Global options
Expand Down Expand Up @@ -442,14 +468,24 @@ pool: <%= [ENV.fetch("RAILS_MAX_THREADS", 5).to_i, ENV.fetch("GOOD_JOB_MAX_THREA

GoodJob can execute jobs "async" in the same process as the webserver (e.g. `bin/rail s`). GoodJob's async execution mode offers benefits of economy by not requiring a separate job worker process, but with the tradeoff of increased complexity. Async mode can be configured in two ways:

- Directly configure the ActiveJob adapter:
- Via Rails configuration:

```ruby
# config/environments/production.rb
config.active_job.queue_adapter = GoodJob::Adapter.new(execution_mode: :async, max_threads: 4, poll_interval: 30)
config.active_job.queue_adapter = :good_job

# To change the execution mode
config.good_job.execution_mode = :async

# Or with more configuration
config.good_job = {
execution_mode: :async,
max_threads: 4,
poll_interval: 30
}
```

- Or, when using `...queue_adapter = :good_job`, via environment variables:
- Or, with environment variables:

```bash
$ GOOD_JOB_EXECUTION_MODE=async GOOD_JOB_MAX_THREADS=4 GOOD_JOB_POLL_INTERVAL=30 bin/rails server
Expand Down
6 changes: 3 additions & 3 deletions lib/active_job/queue_adapters/good_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ module ActiveJob # :nodoc:
module QueueAdapters # :nodoc:
# See {GoodJob::Adapter} for details.
class GoodJobAdapter < GoodJob::Adapter
def initialize(execution_mode: nil, max_threads: nil, poll_interval: nil, scheduler: nil, inline: false)
configuration = GoodJob::Configuration.new({ execution_mode: execution_mode }, env: ENV)
super(execution_mode: configuration.rails_execution_mode, max_threads: max_threads, poll_interval: poll_interval, scheduler: scheduler, inline: inline)
def initialize(**options)
configuration = GoodJob::Configuration.new(options, env: ENV)
super(**options.merge(execution_mode: configuration.rails_execution_mode))
end
end
end
Expand Down
33 changes: 18 additions & 15 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@ class Adapter
# @param max_threads [nil, Integer] sets the number of threads per scheduler to use when +execution_mode+ is set to +:async+. The +queues+ parameter can specify a number of threads for each group of queues which will override this value. You can also set this with the environment variable +GOOD_JOB_MAX_THREADS+. Defaults to +5+.
# @param queues [nil, String] determines which queues to execute jobs from when +execution_mode+ is set to +:async+. See {file:README.md#optimize-queues-threads-and-processes} for more details on the format of this string. You can also set this with the environment variable +GOOD_JOB_QUEUES+. Defaults to +"*"+.
# @param poll_interval [nil, Integer] sets the number of seconds between polls for jobs when +execution_mode+ is set to +:async+. You can also set this with the environment variable +GOOD_JOB_POLL_INTERVAL+. Defaults to +1+.
# @param scheduler [nil, Scheduler] (deprecated) a scheduler to be managed by the adapter
# @param notifier [nil, Notifier] (deprecated) a notifier to be managed by the adapter
# @param inline [nil, Boolean] (deprecated) whether to run in inline execution mode
def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval: nil, scheduler: nil, notifier: nil, inline: false)
if inline && execution_mode.nil?
ActiveSupport::Deprecation.warn('GoodJob::Adapter#new(inline: true) is deprecated; use GoodJob::Adapter.new(execution_mode: :inline) instead')
execution_mode = :inline
def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval: nil)
if caller[0..4].find { |c| c.include?("/config/application.rb") || c.include?("/config/environments/") }
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob no longer recommends creating a GoodJob::Adapter instance:

config.active_job.queue_adapter = GoodJob::Adapter.new...

Instead, configure GoodJob through configuration:

config.active_job.queue_adapter = :good_job
config.good_job.execution_mode = :#{execution_mode}
config.good_job.max_threads = #{max_threads}
config.good_job.poll_interval = #{poll_interval}
# etc...

DEPRECATION
end

configuration = GoodJob::Configuration.new(
Expand All @@ -42,9 +51,9 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode)

if @execution_mode == :async # rubocop:disable Style/GuardClause
@notifier = notifier || GoodJob::Notifier.new
@notifier = GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@scheduler = scheduler || GoodJob::Scheduler.from_configuration(configuration)
@scheduler = GoodJob::Scheduler.from_configuration(configuration)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
end
Expand Down Expand Up @@ -108,11 +117,5 @@ def execute_externally?
def execute_inline?
@execution_mode == :inline
end

# (deprecated) Whether in +:inline+ execution mode.
def inline?
ActiveSupport::Deprecation.warn('GoodJob::Adapter::inline? is deprecated; use GoodJob::Adapter::execute_inline? instead')
execute_inline?
end
end
end
12 changes: 12 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def initialize(options, env: ENV)
def execution_mode(default: :external)
if options[:execution_mode]
options[:execution_mode]
elsif rails_config[:execution_mode]
rails_config[:execution_mode]
elsif env['GOOD_JOB_EXECUTION_MODE'].present?
env['GOOD_JOB_EXECUTION_MODE'].to_sym
else
Expand Down Expand Up @@ -72,6 +74,7 @@ def rails_execution_mode
def max_threads
(
options[:max_threads] ||
rails_config[:max_threads] ||
env['GOOD_JOB_MAX_THREADS'] ||
env['RAILS_MAX_THREADS'] ||
DEFAULT_MAX_THREADS
Expand All @@ -85,6 +88,7 @@ def max_threads
# @return [String]
def queue_string
options[:queues] ||
rails_config[:queues] ||
env['GOOD_JOB_QUEUES'] ||
'*'
end
Expand All @@ -96,6 +100,7 @@ def queue_string
def poll_interval
(
options[:poll_interval] ||
rails_config[:poll_interval] ||
env['GOOD_JOB_POLL_INTERVAL'] ||
DEFAULT_POLL_INTERVAL
).to_i
Expand All @@ -107,9 +112,16 @@ def poll_interval
def cleanup_preserved_jobs_before_seconds_ago
(
options[:before_seconds_ago] ||
rails_config[:cleanup_preserved_jobs_before_seconds_ago] ||
env['GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO'] ||
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO
).to_i
end

private

def rails_config
Rails.application.config.good_job
end
end
end
63 changes: 63 additions & 0 deletions lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
require 'concurrent/delay'

module GoodJob
#
# JobPerformer queries the database for jobs and performs them on behalf of a
# {Scheduler}. It mainly functions as glue between a {Scheduler} and the jobs
# it should be executing.
#
# The JobPerformer must be safe to execute across multiple threads.
#
class JobPerformer
# @param queue_string [String] Queues to execute jobs from
def initialize(queue_string)
@queue_string = queue_string

@job_query = Concurrent::Delay.new { GoodJob::Job.queue_string(queue_string) }
@parsed_queues = Concurrent::Delay.new { GoodJob::Job.queue_parser(queue_string) }
end

# A meaningful name to identify the performer in logs and for debugging.
# @return [String] The queues from which Jobs are worked
def name
@queue_string
end

# Perform the next eligible job
# @return [nil, Object] Returns job result or +nil+ if no job was found
def next
job_query.perform_with_advisory_lock
end

# Tests whether this performer should be used in GoodJob's current state.
#
# For example, state will be a LISTEN/NOTIFY message that is passed down
# from the Notifier to the Scheduler. The Scheduler is able to ask
# its performer "does this message relate to you?", and if not, ignore it
# to minimize thread wake-ups, database queries, and thundering herds.
#
# @return [Boolean] whether the performer's {#next} method should be
# called in the current state.
def next?(state = {})
if parsed_queues[:exclude]
parsed_queues[:exclude].exclude?(state[:queue_name])
elsif parsed_queues[:include]
parsed_queues[:include].include?(state[:queue_name])
else
true
end
end

private

attr_reader :queue_string

def job_query
@job_query.value
end

def parsed_queues
@parsed_queues.value
end
end
end
60 changes: 0 additions & 60 deletions lib/good_job/performer.rb

This file was deleted.

8 changes: 6 additions & 2 deletions lib/good_job/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
module GoodJob
# Ruby on Rails integration.
class Railtie < ::Rails::Railtie
initializer "good_job.logger" do
ActiveSupport.on_load(:good_job) { self.logger = ::Rails.logger }
config.good_job = ActiveSupport::OrderedOptions.new

initializer "good_job.logger" do |_app|
ActiveSupport.on_load(:good_job) do
self.logger = ::Rails.logger
end
GoodJob::LogSubscriber.attach_to :good_job
end

Expand Down
16 changes: 2 additions & 14 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,7 @@ def self.from_configuration(configuration)
queue_string, max_threads = queue_string_and_max_threads.split(':')
max_threads = (max_threads || configuration.max_threads).to_i

job_query = GoodJob::Job.queue_string(queue_string)
parsed = GoodJob::Job.queue_parser(queue_string)
job_filter = proc do |state|
if parsed[:exclude]
parsed[:exclude].exclude?(state[:queue_name])
elsif parsed[:include]
parsed[:include].include? state[:queue_name]
else
true
end
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

job_performer = GoodJob::JobPerformer.new(queue_string)
GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
end

Expand All @@ -63,7 +51,7 @@ def self.from_configuration(configuration)
end
end

# @param performer [GoodJob::Performer]
# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
def initialize(performer, max_threads: nil)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
Expand Down
Loading