Skip to content

Commit b89c29c

Browse files
committed
Workers now yield self to lifecycle hooks
1 parent 9cd6bc3 commit b89c29c

File tree

6 files changed

+58
-17
lines changed

6 files changed

+58
-17
lines changed

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ And into two different points in the worker's, dispatcher's and scheduler's life
379379
- `(worker|dispatcher|scheduler)_start`: after the worker/dispatcher/scheduler has finished booting and right before it starts the polling loop or loading the recurring schedule.
380380
- `(worker|dispatcher|scheduler)_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).
381381

382+
The hooks for workers will have the worker instance yielded to the block so that you may read its configuration
383+
for logging or other metrics reporting purposes.
384+
382385
You can use the following methods with a block to do this:
383386
```ruby
384387
SolidQueue.on_start
@@ -398,6 +401,14 @@ For example:
398401
```ruby
399402
SolidQueue.on_start { start_metrics_server }
400403
SolidQueue.on_stop { stop_metrics_server }
404+
405+
SolidQueue.on_worker_start do |worker|
406+
Rails.logger.info "Worker #{worker.name} started with queues: #{worker.queues.join(',')}"
407+
end
408+
409+
SolidQueue.on_worker_stop do |worker|
410+
Rails.logger.info "Worker #{worker.name} stopped with queues: #{worker.queues.join(',')}"
411+
end
401412
```
402413

403414
These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.

lib/solid_queue.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,15 @@ module SolidQueue
4545

4646
[ Dispatcher, Scheduler, Worker ].each do |process|
4747
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
48-
process.on_start { block.call }
48+
process.on_start(&block)
4949
end
5050

5151
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
52-
process.on_stop { block.call }
52+
process.on_stop(&block)
5353
end
5454

5555
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
56-
process.on_exit { block.call }
56+
process.on_exit(&block)
5757
end
5858
end
5959

lib/solid_queue/lifecycle_hooks.rb

+9-1
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,18 @@ def run_exit_hooks
4343

4444
def run_hooks_for(event)
4545
self.class.lifecycle_hooks.fetch(event, []).each do |block|
46-
block.call
46+
if yield_self_to_hooks?
47+
block.call(self)
48+
else
49+
block.call
50+
end
4751
rescue Exception => exception
4852
handle_thread_error(exception)
4953
end
5054
end
55+
56+
def yield_self_to_hooks?
57+
false
58+
end
5159
end
5260
end

lib/solid_queue/worker.rb

+10-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ class Worker < Processes::Poller
88
before_shutdown :run_stop_hooks
99
after_shutdown :run_exit_hooks
1010

11-
attr_accessor :queues, :pool
11+
attr_reader :queues
1212

1313
def initialize(**options)
1414
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1515

16-
@queues = Array(options[:queues])
16+
# Ensure that the queues array is deep frozen to prevent accidental modification
17+
@queues = Array(options[:queues]).map(&:freeze).freeze
18+
1719
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
1820

1921
super(**options)
@@ -24,6 +26,8 @@ def metadata
2426
end
2527

2628
private
29+
attr_reader :pool
30+
2731
def poll
2832
claim_executions.then do |executions|
2933
executions.each do |execution|
@@ -54,5 +58,9 @@ def all_work_completed?
5458
def set_procline
5559
procline "waiting for jobs in #{queues.join(",")}"
5660
end
61+
62+
def yield_self_to_hooks?
63+
true
64+
end
5765
end
5866
end

test/integration/lifecycle_hooks_test.rb

+22-8
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,17 @@ class LifecycleHooksTest < ActiveSupport::TestCase
1010
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
1111
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }
1212

13-
SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
14-
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
15-
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }
13+
SolidQueue.on_worker_start do |w|
14+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_start")
15+
end
16+
17+
SolidQueue.on_worker_stop do |w|
18+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_stop")
19+
end
20+
21+
SolidQueue.on_worker_exit do |w|
22+
JobResult.create!(status: :hook_called, value: "worker_#{w.queues.join}_exit")
23+
end
1624

1725
SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
1826
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
@@ -22,23 +30,29 @@ class LifecycleHooksTest < ActiveSupport::TestCase
2230
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
2331
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }
2432

25-
pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
26-
wait_for_registered_processes(4)
33+
pid = run_supervisor_as_fork(
34+
workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ],
35+
dispatchers: [ { batch_size: 100 } ],
36+
skip_recurring: false
37+
)
38+
39+
wait_for_registered_processes(5)
2740

2841
terminate_process(pid)
2942
wait_for_registered_processes(0)
3043

3144

3245
results = skip_active_record_query_cache do
3346
job_results = JobResult.where(status: :hook_called)
34-
assert_equal 12, job_results.count
47+
assert_equal 15, job_results.count
3548
job_results
3649
end
3750

38-
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
51+
assert_equal({ "hook_called" => 15 }, results.map(&:status).tally)
3952
assert_equal %w[
4053
start stop exit
41-
worker_start worker_stop worker_exit
54+
worker_first_queue_start worker_first_queue_stop worker_first_queue_exit
55+
worker_second_queue_start worker_second_queue_stop worker_second_queue_exit
4256
dispatcher_start dispatcher_stop dispatcher_exit
4357
scheduler_start scheduler_stop scheduler_exit
4458
].sort, results.map(&:value).sort

test/unit/worker_test.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class WorkerTest < ActiveSupport::TestCase
156156
@worker.start
157157
wait_for_registered_processes(1, timeout: 1.second)
158158

159-
assert_not @worker.pool.shutdown?
159+
assert_not @worker.instance_variable_get(:@pool).shutdown?
160160

161161
process = SolidQueue::Process.first
162162
assert_equal "Worker", process.kind
@@ -165,8 +165,8 @@ class WorkerTest < ActiveSupport::TestCase
165165

166166
# And now just wait until the worker tries to heartbeat and realises
167167
# it needs to stop
168-
wait_while_with_timeout(2) { !@worker.pool.shutdown? }
169-
assert @worker.pool.shutdown?
168+
wait_while_with_timeout(2) { !@worker.instance_variable_get(:@pool).shutdown? }
169+
assert @worker.instance_variable_get(:@pool).shutdown?
170170
ensure
171171
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
172172
end

0 commit comments

Comments
 (0)