Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,47 @@ Finally, run the migrations:
$ bin/rails db:migrate
```

## Lifecycle hooks

In Solid queue, you can hook into two different points in the supervisor's life:
- `start`: after the supervisor has finished booting and right before it forks workers and dispatchers.
- `stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown.

And into two different points in a worker's life:
- `worker_start`: after the worker has finished booting and right before it starts the polling loop.
- `worker_stop`: after receiving a signal (`TERM`, `INT` or `QUIT`) and right before starting graceful or immediate shutdown (which is just `exit!`).

You can use the following methods with a block to do this:
```ruby
SolidQueue.on_start
SolidQueue.on_stop

SolidQueue.on_worker_start
SolidQueue.on_worker_stop
```

For example:
```ruby
SolidQueue.on_start { start_metrics_server }
SolidQueue.on_stop { stop_metrics_server }
```

These can be called several times to add multiple hooks, but it needs to happen before Solid Queue is started. An initializer would be a good place to do this.


### Other configuration settings
_Note_: The settings in this section should be set in your `config/application.rb` or your environment config like this: `config.solid_queue.silence_polling = true`

There are several settings that control how Solid Queue works that you can set as well:
- `logger`: the logger you want Solid Queue to use. Defaults to the app logger.
- `app_executor`: the [Rails executor](https://guides.rubyonrails.org/threading_and_code_execution.html#executor) used to wrap asynchronous operations, defaults to the app executor
- `on_thread_error`: custom lambda/Proc to call when there's an error within a thread that takes the exception raised as argument. Defaults to
- `on_thread_error`: custom lambda/Proc to call when there's an error within a Solid Queue thread that takes the exception raised as argument. Defaults to

```ruby
-> (exception) { Rails.error.report(exception, handled: false) }
```
**This is not used for errors raised within a job execution**. Errors happening in jobs are handled by Active Job's `retry_on` or `discard_on`, and ultimately will result in [failed jobs](#failed-jobs-and-retries). This is for errors happening within Solid Queue itself.

- `use_skip_locked`: whether to use `FOR UPDATE SKIP LOCKED` when performing locking reads. This will be automatically detected in the future, and for now, you'd only need to set this to `false` if your database doesn't support it. For MySQL, that'd be versions < 8, and for PostgreSQL, versions < 9.5. If you use SQLite, this has no effect, as writes are sequential.
- `process_heartbeat_interval`: the heartbeat interval that all processes will follow—defaults to 60 seconds.
- `process_alive_threshold`: how long to wait until a process is considered dead after its last heartbeat—defaults to 5 minutes.
Expand Down
10 changes: 10 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

delegate :on_start, :on_stop, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def supervisor?
supervisor
end
Expand Down
43 changes: 43 additions & 0 deletions lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

module SolidQueue
module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
end

class_methods do
def on_start(&block)
self.lifecycle_hooks[:start] << block
end

def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def clear_hooks
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
end

private
def run_start_hooks
run_hooks_for :start
end

def run_stop_hooks
run_hooks_for :stop
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
rescue Exception => exception
handle_thread_error(exception)
end
end
end
end
22 changes: 10 additions & 12 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ module Runnable
attr_writer :mode

def start
@stopped = false

SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) { boot }
end
boot

if running_async?
@thread = create_thread { run }
Expand All @@ -25,10 +21,6 @@ def stop
@thread&.join
end

def alive?
!running_async? || @thread.alive?
end

private
DEFAULT_MODE = :async

Expand All @@ -37,9 +29,15 @@ def mode
end

def boot
if running_as_fork?
register_signal_handlers
set_procline
SolidQueue.instrument(:start_process, process: self) do
run_callbacks(:boot) do
@stopped = false

if running_as_fork?
register_signal_handlers
set_procline
end
end
end
end

Expand Down
3 changes: 3 additions & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

module SolidQueue
class Supervisor < Processes::Base
include LifecycleHooks
include Maintenance, Signals, Pidfiled

class << self
Expand All @@ -27,6 +28,7 @@ def initialize(configuration)

def start
boot
run_start_hooks

start_processes
launch_maintenance_task
Expand All @@ -36,6 +38,7 @@ def start

def stop
@stopped = true
run_stop_hooks
end

private
Expand Down
5 changes: 5 additions & 0 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

module SolidQueue
class Worker < Processes::Poller
include LifecycleHooks

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

attr_accessor :queues, :pool

def initialize(**options)
Expand Down
52 changes: 52 additions & 0 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require "test_helper"

class LifecycleHooksTest < ActiveSupport::TestCase
self.use_transactional_tests = false

test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }

pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: "*" } ] })
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

results = skip_active_record_query_cache do
assert_equal 4, JobResult.count
JobResult.last(4)
end

assert_equal "hook_called", results.map(&:status).first
assert_equal [ "start", "stop", "worker_start", "worker_stop" ], results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end

test "handle errors on lifecycle hooks" do
previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) }
SolidQueue.on_start { raise RuntimeError, "everything is broken" }

pid = run_supervisor_as_fork
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)

result = skip_active_record_query_cache { JobResult.last }

assert_equal "error", result.status
assert_equal "everything is broken", result.value
ensure
SolidQueue.on_thread_error = previous_on_thread_error
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
end
end
File renamed without changes.