From bf74dcbec713654b68b0a9d5b85d673ddf18875b Mon Sep 17 00:00:00 2001 From: Jeroen Visser Date: Fri, 28 Oct 2016 11:28:44 +0200 Subject: [PATCH] Add LeakyBucket-based throttling Using the leaky bucket algorithm, you can get a better distrubution that allows a more human behaviour and that should (in theory) decrease false-positives. It does that because it throttles differently. Let's take the old one, and concider you have a limit of 3 for 5 mins. If the user then did 3 requests, it'll have to wait for the next 5-min slot to do more requests. Using the leaky bucket algorithm, doing 3 requests adds 3 drops in the bucket, and the bucket is full. The bucket then leaks consistently, and when leaked enough, it'll allow 1 more request (and then it will be full again). [Shopify uses and explains this too here](https://help.shopify.com/api/guides/api-call-limit). We might want to add a custom throttled_response for the leaky bucket algorithm, so that it has the limits in there too. --- lib/rack/attack.rb | 50 +++++--- lib/rack/attack/leaky_bucket.rb | 56 +++++++++ lib/rack/attack/throttle_with_leaky_bucket.rb | 68 +++++++++++ spec/leaky_bucket_spec.rb | 69 +++++++++++ ..._attack_throttle_with_leaky_bucket_spec.rb | 114 ++++++++++++++++++ 5 files changed, 340 insertions(+), 17 deletions(-) create mode 100644 lib/rack/attack/leaky_bucket.rb create mode 100644 lib/rack/attack/throttle_with_leaky_bucket.rb create mode 100644 spec/leaky_bucket_spec.rb create mode 100644 spec/rack_attack_throttle_with_leaky_bucket_spec.rb diff --git a/lib/rack/attack.rb b/lib/rack/attack.rb index 0f375677..05e337d3 100644 --- a/lib/rack/attack.rb +++ b/lib/rack/attack.rb @@ -2,20 +2,22 @@ require 'forwardable' class Rack::Attack - autoload :Cache, 'rack/attack/cache' - autoload :PathNormalizer, 'rack/attack/path_normalizer' - autoload :Check, 'rack/attack/check' - autoload :Throttle, 'rack/attack/throttle' - autoload :Safelist, 'rack/attack/safelist' - autoload :Blocklist, 'rack/attack/blocklist' - autoload :Track, 'rack/attack/track' - autoload :StoreProxy, 'rack/attack/store_proxy' - autoload :DalliProxy, 'rack/attack/store_proxy/dalli_proxy' - autoload :MemCacheProxy, 'rack/attack/store_proxy/mem_cache_proxy' - autoload :RedisStoreProxy, 'rack/attack/store_proxy/redis_store_proxy' - autoload :Fail2Ban, 'rack/attack/fail2ban' - autoload :Allow2Ban, 'rack/attack/allow2ban' - autoload :Request, 'rack/attack/request' + autoload :Cache, 'rack/attack/cache' + autoload :PathNormalizer, 'rack/attack/path_normalizer' + autoload :Check, 'rack/attack/check' + autoload :Throttle, 'rack/attack/throttle' + autoload :ThrottleWithLeakyBucket, 'rack/attack/throttle_with_leaky_bucket' + autoload :LeakyBucket, 'rack/attack/leaky_bucket' + autoload :Safelist, 'rack/attack/safelist' + autoload :Blocklist, 'rack/attack/blocklist' + autoload :Track, 'rack/attack/track' + autoload :StoreProxy, 'rack/attack/store_proxy' + autoload :DalliProxy, 'rack/attack/store_proxy/dalli_proxy' + autoload :MemCacheProxy, 'rack/attack/store_proxy/mem_cache_proxy' + autoload :RedisStoreProxy, 'rack/attack/store_proxy/redis_store_proxy' + autoload :Fail2Ban, 'rack/attack/fail2ban' + autoload :Allow2Ban, 'rack/attack/allow2ban' + autoload :Request, 'rack/attack/request' class << self @@ -43,14 +45,19 @@ def throttle(name, options, &block) self.throttles[name] = Throttle.new(name, options, block) end + def throttle_with_leaky_bucket(name, options, &block) + self.throttles_with_leaky_bucket[name] = ThrottleWithLeakyBucket.new(name, options, block) + end + def track(name, options = {}, &block) self.tracks[name] = Track.new(name, options, block) end def safelists; @safelists ||= {}; end def blocklists; @blocklists ||= {}; end - def throttles; @throttles ||= {}; end - def tracks; @tracks ||= {}; end + def throttles; @throttles ||= {}; end + def throttles_with_leaky_bucket; @throttles_with_leaky_bucket ||= {}; end + def tracks; @tracks ||= {}; end def whitelists warn "[DEPRECATION] 'Rack::Attack.whitelists' is deprecated. Please use 'safelists' instead." @@ -90,6 +97,12 @@ def throttled?(req) end end + def throttled_with_leaky_bucket?(req) + throttles_with_leaky_bucket.any? do |_, throttle_with_leaky_bucket| + throttle_with_leaky_bucket[req] + end + end + def tracked?(req) tracks.each_value do |tracker| tracker[req] @@ -105,7 +118,7 @@ def cache end def clear! - @safelists, @blocklists, @throttles, @tracks = {}, {}, {}, {} + @safelists, @blocklists, @throttles, @tracks, @throttles_with_leaky_bucket = {}, {}, {}, {}, {} end def blacklisted_response=(res) @@ -142,6 +155,8 @@ def call(env) self.class.blocklisted_response.call(env) elsif throttled?(req) self.class.throttled_response.call(env) + elsif throttled_with_leaky_bucket?(req) + self.class.throttled_response.call(env) else tracked?(req) @app.call(env) @@ -152,5 +167,6 @@ def call(env) def_delegators self, :safelisted?, :blocklisted?, :throttled?, + :throttled_with_leaky_bucket?, :tracked? end diff --git a/lib/rack/attack/leaky_bucket.rb b/lib/rack/attack/leaky_bucket.rb new file mode 100644 index 00000000..36d1a25a --- /dev/null +++ b/lib/rack/attack/leaky_bucket.rb @@ -0,0 +1,56 @@ +module Rack + class Attack + class LeakyBucket + attr_reader :value, :capacity, :leak, :last_updated_at + + def initialize(capacity, leak, last_updated_at, value = 0) + raise ArgumentError, "wrong value for `leak`, must be larger than zero" unless leak > 0 + raise ArgumentError, "wrong value for `capacity`, must be larger than zero" unless capacity > 0 + + @capacity = capacity.to_i + @leak = leak.to_f + @last_updated_at = (last_updated_at.to_f > 0 ? last_updated_at : Time.now).to_f + @value = value.to_f > 0 ? value.to_f : 0 + @updated = false + end + + def update_leak! + @value = current_value + @last_updated_at = Time.now.to_f + end + + def current_value + seconds_since_last_update = Time.now.to_f - @last_updated_at + value = @value - (@leak * seconds_since_last_update) + value > 0 ? value : 0 + end + + def seconds_until_drained + current_value / @leak + end + + def add(value_to_add) + update_leak! + @updated = true + @value += value_to_add + end + + def full? + current_value + 1 > @capacity + end + + def updated? + @updated + end + + def serialize + "#{@value.to_f}|#{@last_updated_at.to_f}" + end + + def self.unserialize(bucket_data, capacity, leak) + value, last_updated_at = (bucket_data || "0|#{Time.now.to_f}").split("|", 2) + new(capacity, leak, last_updated_at, value) + end + end + end +end diff --git a/lib/rack/attack/throttle_with_leaky_bucket.rb b/lib/rack/attack/throttle_with_leaky_bucket.rb new file mode 100644 index 00000000..90b227c8 --- /dev/null +++ b/lib/rack/attack/throttle_with_leaky_bucket.rb @@ -0,0 +1,68 @@ +module Rack + class Attack + class ThrottleWithLeakyBucket + MANDATORY_OPTIONS = [:capacity, :leak] + attr_reader :name, :capacity, :leak, :block, :type + + def initialize(name, options, block) + @name, @block = name, block + MANDATORY_OPTIONS.each do |opt| + raise ArgumentError.new("Must pass #{opt.inspect} option") unless options[opt] + end + @capacity = options[:capacity] + @leak = options[:leak] + @type = options.fetch(:type, :throttle_with_leaky_bucket) + end + + def cache + Rack::Attack.cache + end + + def [](req) + discriminator = block[req] + return false unless discriminator + + # Normalize blocks to values + current_capacity = normalize_block(capacity, req) + current_leak = normalize_block(leak, req) + + # Read the bucket data and unserialize it. We only update the bucket data + # if we've changed the value. We don't write to update the leaked amount + # since that can be calculated and since the TTL will remove the item when + # it has drained. + key = "#{name}:#{discriminator}" + bucket = LeakyBucket.unserialize(cache.read(key), current_capacity, current_leak) + throttled = bucket.full? + bucket.add(1) unless bucket.full? + store_bucket(key, bucket) if bucket.updated? + + data = { + :bucket => bucket, + :leak => current_leak, + :capacity => current_capacity + } + (req.env['rack.attack.throttle_with_leaky_bucket_data'] ||= {})[name] = data + + if throttled + req.env['rack.attack.matched'] = name + req.env['rack.attack.match_discriminator'] = discriminator + req.env['rack.attack.match_type'] = type + req.env['rack.attack.match_data'] = data + Rack::Attack.instrument(req) + end + + throttled + end + + private + + def store_bucket(key, bucket) + cache.write(key, bucket.serialize, bucket.seconds_until_drained.ceil) + end + + def normalize_block(value_or_block, *args_for_block) + value_or_block.respond_to?(:call) ? value_or_block.call(*args_for_block) : value_or_block + end + end + end +end diff --git a/spec/leaky_bucket_spec.rb b/spec/leaky_bucket_spec.rb new file mode 100644 index 00000000..a2cd052e --- /dev/null +++ b/spec/leaky_bucket_spec.rb @@ -0,0 +1,69 @@ +require_relative "spec_helper" +require "active_support/core_ext/numeric/time" + +describe "Rack::Attack::LeakyBucket" do + describe ".new(1, 1, Time.now, 0), empty bucket" do + it "isn't full" do + bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 0) + assert !bucket.full?, "Empty bucket reports as full" + end + + it "becomes full when 1 is added" do + bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 0) + bucket.add(1) + assert bucket.full?, "Bucket that has value set to it's capacity should be full" + end + end + + describe ".new(1, 1, Time.now, 1), full bucket" do + it "reports seconds_to_drain as 1" do + Time.stub :now, Time.now do + bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 1) + assert bucket.seconds_until_drained == 1.0 + end + end + + it "becomes empty after 1 second" do + bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 1) + Time.stub :now, 1.second.from_now do + assert !bucket.full?, "Bucket wasn't empty, instead had value = #{bucket.value}" + assert bucket.seconds_until_drained == 0, "Bucket reports seconds_until_drained = #{bucket.seconds_until_drained}" + end + end + end + + describe ".unserialize" do + it "unserializes raw data correctly" do + Time.stub :now, Time.now do + bucket = Rack::Attack::LeakyBucket.unserialize("1|#{Time.now.to_f}", 1, 1) + assert_equal bucket.value, 1 + assert_equal bucket.last_updated_at, Time.now.to_f + assert_equal bucket.leak, 1 + assert_equal bucket.capacity, 1 + assert bucket.full?, "Bucket isn't full" + end + end + + it "handles nils correctly" do + Time.stub :now, Time.now do + bucket = Rack::Attack::LeakyBucket.unserialize(nil, 1, 1) + assert_equal bucket.value, 0 + assert_equal bucket.last_updated_at, Time.now.to_f + assert_equal bucket.leak, 1 + assert_equal bucket.capacity, 1 + assert !bucket.full? + end + end + + it "handles wrong values correctly" do + Time.stub :now, Time.now do + bucket = Rack::Attack::LeakyBucket.unserialize("-1|-132", 1, 1) + assert_equal bucket.value, 0 + assert_equal bucket.last_updated_at, Time.now.to_f + assert_equal bucket.leak, 1 + assert_equal bucket.capacity, 1 + assert !bucket.full? + end + end + end +end diff --git a/spec/rack_attack_throttle_with_leaky_bucket_spec.rb b/spec/rack_attack_throttle_with_leaky_bucket_spec.rb new file mode 100644 index 00000000..d8fabfcf --- /dev/null +++ b/spec/rack_attack_throttle_with_leaky_bucket_spec.rb @@ -0,0 +1,114 @@ +require_relative 'spec_helper' + +describe 'Rack::Attack.throttle_with_leaky_bucket' do + before do + Rack::Attack.cache.store = ActiveSupport::Cache::MemoryStore.new + Rack::Attack.throttle_with_leaky_bucket('ip/sec', :capacity => 1, :leak => 1) { |req| req.ip } + end + + it('should have a throttle') { Rack::Attack.throttles_with_leaky_bucket.key?('ip/sec') } + allow_ok_requests + + describe 'a single request' do + before { get '/', {}, 'REMOTE_ADDR' => '1.2.3.4' } + it 'should set the counter for one request' do + key = "rack::attack:ip/sec:1.2.3.4" + serialized_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'][:bucket].serialize + Rack::Attack.cache.store.read(key).must_equal serialized_data + end + + it 'should populate throttle data' do + throttle_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'] + assert throttle_data[:leak] == 1 + assert throttle_data[:capacity] == 1 + assert throttle_data[:bucket].value == 1 + throttle_data[:bucket].must_be_instance_of Rack::Attack::LeakyBucket + end + end + describe "with 2 requests" do + before do + 2.times { get '/', {}, 'REMOTE_ADDR' => '1.2.3.4' } + end + it 'should block the last request' do + last_response.status.must_equal 429 + end + it 'should tag the env' do + match_data = last_request.env['rack.attack.match_data'] + assert match_data[:leak] == 1 + assert match_data[:capacity] == 1 + assert match_data[:bucket].value == 1 + assert match_data[:bucket].full? + + last_request.env['rack.attack.matched'].must_equal 'ip/sec' + last_request.env['rack.attack.match_type'].must_equal :throttle_with_leaky_bucket + last_request.env['rack.attack.match_discriminator'].must_equal('1.2.3.4') + end + end +end + +describe 'Rack::Attack.throttle_with_leaky_bucket with leak as proc' do + before do + Rack::Attack.cache.store = ActiveSupport::Cache::MemoryStore.new + Rack::Attack.throttle_with_leaky_bucket('ip/sec', :leak => lambda { |req| 1 }, :capacity => 1) { |req| req.ip } + end + + allow_ok_requests + + describe 'a single request' do + before { get '/', {}, 'REMOTE_ADDR' => '1.2.3.4' } + it 'should set the counter for one request' do + key = "rack::attack:ip/sec:1.2.3.4" + serialized_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'][:bucket].serialize + Rack::Attack.cache.store.read(key).must_equal serialized_data + end + + it 'should populate throttle data' do + throttle_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'] + assert throttle_data[:leak] == 1 + end + end +end + +describe 'Rack::Attack.throttle_with_leaky_bucket with capacity as proc' do + before do + Rack::Attack.cache.store = ActiveSupport::Cache::MemoryStore.new + Rack::Attack.throttle_with_leaky_bucket('ip/sec', :capacity => lambda { |req| 1 }, :leak => 1) { |req| req.ip } + end + + allow_ok_requests + + describe 'a single request' do + before { get '/', {}, 'REMOTE_ADDR' => '1.2.3.4' } + it 'should set the counter for one request' do + key = "rack::attack:ip/sec:1.2.3.4" + serialized_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'][:bucket].serialize + Rack::Attack.cache.store.read(key).must_equal serialized_data + end + + it 'should populate throttle data' do + throttle_data = last_request.env['rack.attack.throttle_with_leaky_bucket_data']['ip/sec'] + assert throttle_data[:capacity] == 1 + end + end +end + +describe 'Rack::Attack.throttle_with_leaky_bucket with block retuning nil' do + before do + Rack::Attack.cache.store = ActiveSupport::Cache::MemoryStore.new + Rack::Attack.throttle_with_leaky_bucket('ip/sec', :leak => 1, :capacity => 1) { |_| nil } + end + + allow_ok_requests + + describe 'a single request' do + before { get '/', {}, 'REMOTE_ADDR' => '1.2.3.4' } + it 'should not set the counter' do + key = "rack::attack:ip/sec:1.2.3.4" + Rack::Attack.cache.store.read(key).must_equal nil + end + + it 'should not populate throttle data' do + last_request.env['rack.attack.throttle_with_leaky_bucket_data'].must_equal nil + end + end +end