Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gc option and nil for grace_time option #4

Merged
merged 2 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![Gem Version](https://badge.fury.io/rb/sidekiq-worker-killer.svg)](https://badge.fury.io/rb/sidekiq-worker-killer)
[![CircleCI](https://circleci.com/gh/klaxit/sidekiq-worker-killer.svg?style=shield&circle-token=:circle-token)](https://circleci.com/gh/klaxit/sidekiq-worker-killer)

[Sidekiq](https://github.com/mperham/sidekiq) is probably the best background processing framework today. At the same time, memory leaks are very hard to tackle in Ruby and we often find ourselves with growing memory consumption. Instead of spending herculean effort fixing leaks, why not kill your processes when they got to be too large ?
[Sidekiq](https://github.com/mperham/sidekiq) is probably the best background processing framework today. At the same time, memory leaks are very hard to tackle in Ruby and we often find ourselves with growing memory consumption. Instead of spending herculean effort fixing leaks, why not kill your processes when they got to be too large?

Highly inspired by [Gitlab Sidekiq MemoryKiller](https://gitlab.com/gitlab-org/gitlab-ce/blob/master/lib/gitlab/sidekiq_middleware/shutdown.rb) and [Noxa Sidekiq killer](https://github.com/Noxa/sidekiq-killer).

Expand Down Expand Up @@ -36,9 +36,10 @@ The following options can be overrided.
| Option | Defaults | Description |
| ------- | ------- | ----------- |
| max_rss | 0 MB (disabled) | max RSS in megabytes. Above this, shutdown will be triggered. |
| grace_time | 900 seconds | when shutdown is triggered, the Sidekiq process will not accept new job and wait at most 15 minutes for running jobs to finish. |
| grace_time | 900 seconds | when shutdown is triggered, the Sidekiq process will not accept new job and wait at most 15 minutes for running jobs to finish. If Float::INFINITY specified, will wait forever |
| shutdown_wait | 30 seconds | when the grace time expires, still running jobs get 30 seconds to terminate. After that, kill signal is triggered. |
| kill_signal | SIGKILL | Signal to use kill Sidekiq process if it doesn't terminate. |
| gc | true | Try to run garbage collection before Sidekiq process terminate in case of max_rss exceeded |

## Development

Expand Down
35 changes: 24 additions & 11 deletions lib/sidekiq/worker_killer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ class Sidekiq::WorkerKiller
MUTEX = Mutex.new

def initialize(options = {})
@max_rss = (options[:max_rss] || 0)
@grace_time = (options[:grace_time] || 15 * 60)
@shutdown_wait = (options[:shutdown_wait] || 30)
@kill_signal = (options[:kill_signal] || "SIGKILL")
@max_rss = options.fetch(:max_rss, 0)
@grace_time = options.fetch(:grace_time, 15 * 60)
@shutdown_wait = options.fetch(:shutdown_wait, 30)
@kill_signal = options.fetch(:kill_signal, "SIGKILL")
@gc = options.fetch(:gc, true)
end

def call(_worker, _job, _queue)
yield
# Skip if the max RSS is not exceeded
return unless @max_rss > 0 && current_rss > @max_rss

GC.start(full_mark: true, immediate_sweep: true)
return unless @max_rss > 0 && current_rss > @max_rss

return unless @max_rss > 0
return unless current_rss > @max_rss
GC.start(full_mark: true, immediate_sweep: true) if @gc
return unless current_rss > @max_rss
# Launch the shutdown process
warn "current RSS #{current_rss} of #{identity} exceeds " \
"maximum RSS #{@max_rss}"
Expand Down Expand Up @@ -62,12 +62,25 @@ def shutdown
def wait_job_finish_in_grace_time
start = Time.now
loop do
break if start + @grace_time < Time.now || no_jobs_on_quiet_processes?

break if grace_time_exceeded?(start)
break if no_jobs_on_quiet_processes?
sleep(1)
end
end

def grace_time_exceeded?(start)
return false if @grace_time == Float::INFINITY

start + @grace_time < Time.now
end

def no_jobs_on_quiet_processes?
Sidekiq::ProcessSet.new.each do |process|
return false if !process["busy"].zero? && process["quiet"]
end
true
end

def no_jobs_on_quiet_processes?
Sidekiq::ProcessSet.new.each do |process|
return false if !process["busy"] == 0 && process["quiet"]
Expand Down
41 changes: 34 additions & 7 deletions spec/sidekiq/worker_killer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@
expect(subject).to receive(:request_shutdown)
subject.call(worker, job, queue){}
end
it "should call garbage collect" do
allow(subject).to receive(:request_shutdown)
expect(GC).to receive(:start).with(full_mark: true, immediate_sweep: true)
subject.call(worker, job, queue){}
end
context "when gc is false" do
subject{ described_class.new(max_rss: 2, gc: false) }
it "should not call garbage collect" do
allow(subject).to receive(:request_shutdown)
expect(GC).not_to receive(:start)
subject.call(worker, job, queue){}
end
end
context "but max rss is 0" do
subject{ described_class.new(max_rss: 0) }
it "should not request shutdown" do
Expand All @@ -35,14 +48,28 @@
end

describe "#request_shutdown" do
before { allow(subject).to receive(:shutdown){ sleep 0.01 } }
it "should call shutdown" do
expect(subject).to receive(:shutdown)
subject.send(:request_shutdown).join
context "grace time is default" do
before { allow(subject).to receive(:shutdown){ sleep 0.01 } }
it "should call shutdown" do
expect(subject).to receive(:shutdown)
subject.send(:request_shutdown).join
end
it "should not call shutdown twice when called concurrently" do
expect(subject).to receive(:shutdown).once
2.times.map{ subject.send(:request_shutdown) }.each(&:join)
end
end
it "should not call shutdown twice when called concurrently" do
expect(subject).to receive(:shutdown).once
2.times.map{ subject.send(:request_shutdown) }.each(&:join)
context "grace time is Float::INFINITY" do
subject{ described_class.new(max_rss: 2, grace_time: Float::INFINITY, shutdown_wait: 0) }
it "call signal only on jobs" do
allow(subject).to receive(:no_jobs_on_quiet_processes?).and_return(true)
allow(subject).to receive(:pid).and_return(99)
expect(Process).to receive(:kill).with('TSTP', 99)
expect(Process).to receive(:kill).with('SIGTERM', 99)
expect(Process).to receive(:kill).with('SIGKILL', 99)

subject.send(:request_shutdown).join
end
end
end

Expand Down