Skip to content

Commit

Permalink
Add global enable_listen_notify configuration to disable both notify …
Browse files Browse the repository at this point in the history
…and listen (#810)
  • Loading branch information
mitchellhenke authored Feb 6, 2023
1 parent 834c805 commit 2fb51f5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 20 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ Options:
[--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)
[--shutdown-timeout=SECONDS] # Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))
[--enable-cron] # Whether to run cron process (default: false)
[--enable-listen-notify] # Whether to enqueue and read jobs with Postgres LISTEN/NOTIFY (default: true)
[--daemonize] # Run as a background daemon (default: false)
[--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)
[--probe-port=PORT] # Port for http health check (env var: GOOD_JOB_PROBE_PORT, default: nil)
Expand Down Expand Up @@ -272,6 +273,7 @@ Available configuration options are:
- `max_cache` (integer) sets the maximum number of scheduled jobs that will be stored in memory to reduce execution latency when also polling for scheduled jobs. Caching 10,000 scheduled jobs uses approximately 20MB of memory. You can also set this with the environment variable `GOOD_JOB_MAX_CACHE`.
- `shutdown_timeout` (integer) number of seconds to wait for jobs to finish when shutting down before stopping the thread. Defaults to forever: `-1`. You can also set this with the environment variable `GOOD_JOB_SHUTDOWN_TIMEOUT`.
- `enable_cron` (boolean) whether to run cron process. Defaults to `false`. You can also set this with the environment variable `GOOD_JOB_ENABLE_CRON`.
- `enable_listen_notify` (boolean) whether to enqueue and read jobs with Postgres LISTEN/NOTIFY. Defaults to `true`. You can also set this with the environment variable `GOOD_JOB_ENABLE_LISTEN_NOTIFY`.
- `cron` (hash) cron configuration. Defaults to `{}`. You can also set this as a JSON string with the environment variable `GOOD_JOB_CRON`
- `cleanup_discarded_jobs` (boolean) whether to destroy discarded jobs when cleaning up preserved jobs using the `$ good_job cleanup_preserved_jobs` CLI command or calling `GoodJob.cleanup_preserved_jobs`. Defaults to `true`. Can also be set with the environment variable `GOOD_JOB_CLEANUP_DISCARDED_JOBS`. _This configuration is only used when {GoodJob.preserve_job_records} is `true`._
- `cleanup_preserved_jobs_before_seconds_ago` (integer) number of seconds to preserve jobs when using the `$ good_job cleanup_preserved_jobs` CLI command or calling `GoodJob.cleanup_preserved_jobs`. Defaults to `1209600` (14 days). Can also be set with the environment variable `GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO`. _This configuration is only used when {GoodJob.preserve_job_records} is `true`._
Expand Down
3 changes: 2 additions & 1 deletion lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def execute_inline?
def start_async
return unless execute_async?

@notifier = GoodJob::Notifier.new
@notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
@poller = GoodJob::Poller.new(poll_interval: GoodJob.configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(GoodJob.configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
Expand Down Expand Up @@ -235,6 +235,7 @@ def in_server_process?

def send_notify?(active_job)
return true unless active_job.respond_to?(:good_job_notify)
return false unless GoodJob.configuration.enable_listen_notify

!(active_job.good_job_notify == false || (active_job.class.good_job_notify == false && active_job.good_job_notify.nil?))
end
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def start

Daemon.new(pidfile: configuration.pidfile).daemonize if configuration.daemonize?

notifier = GoodJob::Notifier.new
notifier = GoodJob::Notifier.new(enable_listening: GoodJob.configuration.enable_listen_notify)
poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: true)
notifier.recipients << [scheduler, :create_thread]
Expand Down
10 changes: 10 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class Configuration
DEFAULT_SHUTDOWN_TIMEOUT = -1
# Default to not running cron
DEFAULT_ENABLE_CRON = false
# Default to enabling LISTEN/NOTIFY
DEFAULT_ENABLE_LISTEN_NOTIFY = true

def self.validate_execution_mode(execution_mode)
raise ArgumentError, "GoodJob execution mode must be one of #{EXECUTION_MODES.join(', ')}. It was '#{execution_mode}' which is not valid." unless execution_mode.in?(EXECUTION_MODES)
Expand Down Expand Up @@ -331,6 +333,14 @@ def probe_port
env['GOOD_JOB_PROBE_PORT']
end

def enable_listen_notify
return options[:enable_listen_notify] unless options[:enable_listen_notify].nil?
return rails_config[:enable_listen_notify] unless rails_config[:enable_listen_notify].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_ENABLE_LISTEN_NOTIFY']) unless env['GOOD_JOB_ENABLE_LISTEN_NOTIFY'].nil?

DEFAULT_ENABLE_LISTEN_NOTIFY
end

private

def rails_config
Expand Down
37 changes: 26 additions & 11 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,28 @@ def self.notify(message)
attr_reader :recipients

# @param recipients [Array<#call, Array(Object, Symbol)>]
def initialize(*recipients)
# @param enable_listening [true, false]
def initialize(*recipients, enable_listening: true)
@recipients = Concurrent::Array.new(recipients)
@connected = Concurrent::AtomicBoolean.new(false)
@listening = Concurrent::AtomicBoolean.new(false)
@connection_errors_count = Concurrent::AtomicFixnum.new(0)
@connection_errors_reported = Concurrent::AtomicBoolean.new(false)
@enable_listening = enable_listening

self.class.instances << self

create_executor
listen
end

# Tests whether the notifier is active and listening for new messages.
# Tests whether the notifier is active and has acquired a dedicated database connection.
# @return [true, false, nil]
def connected?
@connected.true?
end

# Tests whether the notifier is listening for new messages.
# @return [true, false, nil]
def listening?
@listening.true?
Expand Down Expand Up @@ -165,15 +174,17 @@ def create_executor
end

def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @enable_listening, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_enable_listening, thr_listening|
with_connection do
begin
Rails.application.executor.wrap do
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
thr_listening.make_true
end
end
thr_listening.make_true
end
end

Expand All @@ -195,9 +206,11 @@ def listen(delay: 0)
ensure
Rails.application.executor.wrap do
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
if thr_enable_listening
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
thr_listening.make_false
connection.execute("UNLISTEN *")
end
end
end
end
Expand All @@ -215,20 +228,22 @@ def with_connection
end
end
connection.execute("SET application_name = #{connection.quote(self.class.name)}")
@connected.make_true

yield
ensure
@connected.make_false
connection&.disconnect!
self.connection = nil
end

def wait_for_notify
raw_connection = connection.raw_connection
if raw_connection.respond_to?(:wait_for_notify)
if @enable_listening && raw_connection.respond_to?(:wait_for_notify)
raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
yield(channel, payload)
end
elsif raw_connection.respond_to?(:jdbc_connection)
elsif @enable_listening && raw_connection.respond_to?(:jdbc_connection)
raw_connection.execute_query("SELECT 1")
notifications = raw_connection.jdbc_connection.getNotifications
Array(notifications).each do |notification|
Expand Down
14 changes: 14 additions & 0 deletions spec/lib/good_job/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,18 @@
end
end
end

describe 'enable_listen_notify' do
it 'defaults to true' do
configuration = described_class.new({})
expect(configuration.enable_listen_notify).to be true
end

it 'can set false with 0 from ENV' do
stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_ENABLE_LISTEN_NOTIFY' => '0' })

configuration = described_class.new({})
expect(configuration.enable_listen_notify).to be false
end
end
end
36 changes: 30 additions & 6 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
it 'contains all registered instances' do
notifier = nil
expect do
notifier = described_class.new
notifier = described_class.new(enable_listening: true)
end.to change { described_class.instances.size }.by(1)

expect(described_class.instances).to include notifier
Expand All @@ -19,13 +19,24 @@
end
end

describe '#connected?' do
it 'becomes true when the notifier is connected' do
notifier = described_class.new(enable_listening: true)
sleep_until(max: 5, increments_of: 0.5) { notifier.connected? }

expect do
notifier.shutdown
end.to change(notifier, :connected?).from(true).to(false)
end
end

describe '#listen' do
it 'loops until it receives a command' do
stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false)

recipient = proc { |_payload| RECEIVED_MESSAGE.make_true }

notifier = described_class.new(recipient)
notifier = described_class.new(recipient, enable_listening: true)
sleep_until(max: 5, increments_of: 0.5) { notifier.listening? }
described_class.notify(true)
sleep_until(max: 5, increments_of: 0.5) { RECEIVED_MESSAGE.true? }
Expand All @@ -34,13 +45,26 @@
expect(RECEIVED_MESSAGE.true?).to be true
end

it 'loops but does not receive a command if listening is not enabled' do
stub_const 'RECEIVED_MESSAGE', Concurrent::AtomicBoolean.new(false)

recipient = proc { |_payload| RECEIVED_MESSAGE.make_true }
notifier = described_class.new(recipient, enable_listening: false)
expect(notifier.listening?).to be false
described_class.notify(true)
sleep_until(max: 1, increments_of: 0.5) { RECEIVED_MESSAGE.false? }
notifier.shutdown

expect(RECEIVED_MESSAGE.false?).to be true
end

it 'raises exception to GoodJob.on_thread_error' do
stub_const('ExpectedError', Class.new(StandardError))
on_thread_error = instance_double(Proc, call: nil)
allow(GoodJob).to receive(:on_thread_error).and_return(on_thread_error)
allow(JSON).to receive(:parse).and_raise ExpectedError

notifier = described_class.new
notifier = described_class.new(enable_listening: true)
sleep_until(max: 5, increments_of: 0.5) { notifier.listening? }

described_class.notify(true)
Expand All @@ -56,7 +80,7 @@
allow(GoodJob).to receive(:on_thread_error).and_return(on_thread_error)
allow(JSON).to receive(:parse).and_raise ExpectedError

notifier = described_class.new
notifier = described_class.new(enable_listening: true)
sleep_until(max: 5, increments_of: 0.5) { notifier.listening? }

described_class.notify(true)
Expand All @@ -68,7 +92,7 @@

describe 'Process tracking' do
it 'creates and destroys a new Process record' do
notifier = described_class.new
notifier = described_class.new(enable_listening: true)

wait_until { expect(GoodJob::Process.count).to eq 1 }

Expand All @@ -83,7 +107,7 @@
context 'when, for some reason, the process already exists' do
it 'does not create a new process' do
process = GoodJob::Process.register
notifier = described_class.new
notifier = described_class.new(enable_listening: true)

wait_until { expect(notifier).to be_listening }
expect(GoodJob::Process.count).to eq 1
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
describe GoodJob do
let(:configuration) { GoodJob::Configuration.new({ queues: 'mice:1', poll_interval: -1 }) }
let!(:scheduler) { GoodJob::Scheduler.from_configuration(configuration) }
let!(:notifier) { GoodJob::Notifier.new([scheduler, :create_thread]) }
let!(:notifier) { GoodJob::Notifier.new([scheduler, :create_thread], enable_listening: true) }

describe '.shutdown' do
it 'shuts down all scheduler and notifier instances' do
Expand Down

0 comments on commit 2fb51f5

Please sign in to comment.