Skip to content

Commit

Permalink
adds after_exceed callback
Browse files Browse the repository at this point in the history
  • Loading branch information
msxavi committed Oct 25, 2019
1 parent 882a72e commit 06dce2e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,32 @@ The following options can be overrided.
| shutdown_wait | 30 seconds | when the grace time expires, still running jobs get 30 seconds to terminate. After that, kill signal is triggered. |
| kill_signal | SIGKILL | Signal to use kill Sidekiq process if it doesn't terminate. |
| gc | true | Try to run garbage collection before Sidekiq process terminate in case of max_rss exceeded |
| after_exceed | proc { false } | A callback option to be executed after max_rss exceeded but before shutdown is triggered |


## Callbacks

### after_exceed

This is offered to execute some logic after max_rss exceeded. Check out https://github.com/mperham/sidekiq/wiki/Middleware#server-middleware for details on available variables.

*skip_shutdown* is an option you can set dynamically if don't want to trigger the shutdown for a particular scenario.

```ruby
require 'sidekiq/worker_killer'

Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add Sidekiq::WorkerKiller, max_rss: 480, after_exceed: ->(middleware) do
# you have access to worker, job and queue;
# check https://github.com/mperham/sidekiq/wiki/Middleware#server-middleware for details
middleware.worker && middleware.job && middleware.queue
# additional feature exposes a skip_shutdown option
middleware.skip_shutdown = true if middleware.worker.to_s = 'LongWorker'
end
end
end
```

## Development

Expand Down
22 changes: 19 additions & 3 deletions lib/sidekiq/worker_killer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(options = {})
@shutdown_wait = options.fetch(:shutdown_wait, 30)
@kill_signal = options.fetch(:kill_signal, "SIGKILL")
@gc = options.fetch(:gc, true)
@skip_shutdown = options.fetch(:skip_shutdown_if, nil)
@after_exceed = options.fetch(:after_exceed, proc { false })
end

def call(worker, job, queue)
Expand All @@ -25,8 +25,12 @@ def call(worker, job, queue)
return unless current_rss > @max_rss
GC.start(full_mark: true, immediate_sweep: true) if @gc
return unless current_rss > @max_rss
if @skip_shutdown && @skip_shutdown.call(worker, job, queue)
warn "#{worker.class} exceeds maximum RSS #{@max_rss}, however shutdown will be ignored"

middleware = MiddlewareAdapter.new(worker, job, queue)
@after_exceed.call(middleware)

if middleware.skip_shutdown?
warn "current RSS #{current_rss} exceeds maximum RSS #{@max_rss}, however shutdown will be ignored"
return
end

Expand All @@ -35,6 +39,17 @@ def call(worker, job, queue)
request_shutdown
end

class MiddlewareAdapter
attr_reader :worker, :job, :queue
# accessible options
attr_accessor :skip_shutdown
alias :skip_shutdown? :skip_shutdown

def initialize(worker, job, queue)
@worker, @job, @queue = worker, job, queue
end
end

private

def request_shutdown
Expand All @@ -49,6 +64,7 @@ def request_shutdown
def shutdown
warn "sending #{quiet_signal} to #{identity}"
signal(quiet_signal, pid)
sleep(5) # gives Sidekiq API 5 seconds to update ProcessSet

warn "shutting down #{identity} in #{@grace_time} seconds"
wait_job_finish_in_grace_time
Expand Down
28 changes: 20 additions & 8 deletions spec/sidekiq/worker_killer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,44 @@
subject.call(worker, job, queue){}
end

context "when skip_shutdown_if is given" do
subject{ described_class.new(max_rss: 2, skip_shutdown_if: skip_shutdown_proc) }
context "when after_exceed callback is given" do
subject{ described_class.new(max_rss: 2, after_exceed: after_exceed_proc) }

context "and skip_shutdown_if is a proc" do
let(:skip_shutdown_proc) { proc { |worker| true } }
context "and after_exceed is a proc" do
let(:after_exceed_proc) { proc { |middleware| middleware.skip_shutdown = true } }
it "should NOT request shutdown" do
expect(subject).not_to receive(:request_shutdown)
subject.call(worker, job, queue){}
end
end

context "and skip_shutdown_if is a lambda" do
let(:skip_shutdown_proc) { ->(worker, job, queue) { true } }
context "and after_exceed is a lambda" do
let(:after_exceed_proc) { ->(middleware) { middleware.skip_shutdown = true } }
it "should NOT request shutdown" do
expect(subject).not_to receive(:request_shutdown)
subject.call(worker, job, queue){}
end
end

context "and skip_shutdown_if returns false" do
let(:skip_shutdown_proc) { proc { |worker, job, queue| false } }
context "and after_exceed returns false" do
let(:after_exceed_proc) { ->(middleware) { middleware.skip_shutdown = false } }
it "should still request shutdown" do
expect(subject).to receive(:request_shutdown)
subject.call(worker, job, queue){}
end
end

context "and worker, job and queue are accessible" do
let(:after_exceed_proc) do
->(middleware) do
middleware.worker && middleware.job && middleware.queue
end
end
it "should request shutdown" do
expect(subject).to receive(:request_shutdown)
subject.call(worker, job, queue){}
end
end
end

context "when gc is false" do
Expand Down

0 comments on commit 06dce2e

Please sign in to comment.