Skip to content

Commit

Permalink
Add LeakyBucket-based throttling
Browse files Browse the repository at this point in the history
Using the leaky bucket algorithm, you can get a better distrubution that
allows a more human behaviour and that should (in theory) decrease
false-positives.

It does that because it throttles differently. Let's take the old one,
and concider you have a limit of 3 for 5 mins. If the user then did 3
requests, it'll have to wait for the next 5-min slot to do more
requests.

Using the leaky bucket algorithm, doing 3 requests adds 3 drops in the
bucket, and the bucket is full. The bucket then leaks consistently, and
when leaked enough, it'll allow 1 more request (and then it will be full
again).

[Shopify uses and explains this too here](https://help.shopify.com/api/guides/api-call-limit).

We might want to add a custom throttled_response for the leaky bucket
algorithm, so that it has the limits in there too.
  • Loading branch information
jeroenvisser101 committed Oct 28, 2016
1 parent d85d318 commit bf74dcb
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 17 deletions.
50 changes: 33 additions & 17 deletions lib/rack/attack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
require 'forwardable'

class Rack::Attack
autoload :Cache, 'rack/attack/cache'
autoload :PathNormalizer, 'rack/attack/path_normalizer'
autoload :Check, 'rack/attack/check'
autoload :Throttle, 'rack/attack/throttle'
autoload :Safelist, 'rack/attack/safelist'
autoload :Blocklist, 'rack/attack/blocklist'
autoload :Track, 'rack/attack/track'
autoload :StoreProxy, 'rack/attack/store_proxy'
autoload :DalliProxy, 'rack/attack/store_proxy/dalli_proxy'
autoload :MemCacheProxy, 'rack/attack/store_proxy/mem_cache_proxy'
autoload :RedisStoreProxy, 'rack/attack/store_proxy/redis_store_proxy'
autoload :Fail2Ban, 'rack/attack/fail2ban'
autoload :Allow2Ban, 'rack/attack/allow2ban'
autoload :Request, 'rack/attack/request'
autoload :Cache, 'rack/attack/cache'
autoload :PathNormalizer, 'rack/attack/path_normalizer'
autoload :Check, 'rack/attack/check'
autoload :Throttle, 'rack/attack/throttle'
autoload :ThrottleWithLeakyBucket, 'rack/attack/throttle_with_leaky_bucket'
autoload :LeakyBucket, 'rack/attack/leaky_bucket'
autoload :Safelist, 'rack/attack/safelist'
autoload :Blocklist, 'rack/attack/blocklist'
autoload :Track, 'rack/attack/track'
autoload :StoreProxy, 'rack/attack/store_proxy'
autoload :DalliProxy, 'rack/attack/store_proxy/dalli_proxy'
autoload :MemCacheProxy, 'rack/attack/store_proxy/mem_cache_proxy'
autoload :RedisStoreProxy, 'rack/attack/store_proxy/redis_store_proxy'
autoload :Fail2Ban, 'rack/attack/fail2ban'
autoload :Allow2Ban, 'rack/attack/allow2ban'
autoload :Request, 'rack/attack/request'

class << self

Expand Down Expand Up @@ -43,14 +45,19 @@ def throttle(name, options, &block)
self.throttles[name] = Throttle.new(name, options, block)
end

def throttle_with_leaky_bucket(name, options, &block)
self.throttles_with_leaky_bucket[name] = ThrottleWithLeakyBucket.new(name, options, block)
end

def track(name, options = {}, &block)
self.tracks[name] = Track.new(name, options, block)
end

def safelists; @safelists ||= {}; end
def blocklists; @blocklists ||= {}; end
def throttles; @throttles ||= {}; end
def tracks; @tracks ||= {}; end
def throttles; @throttles ||= {}; end
def throttles_with_leaky_bucket; @throttles_with_leaky_bucket ||= {}; end
def tracks; @tracks ||= {}; end

def whitelists
warn "[DEPRECATION] 'Rack::Attack.whitelists' is deprecated. Please use 'safelists' instead."
Expand Down Expand Up @@ -90,6 +97,12 @@ def throttled?(req)
end
end

def throttled_with_leaky_bucket?(req)
throttles_with_leaky_bucket.any? do |_, throttle_with_leaky_bucket|
throttle_with_leaky_bucket[req]
end
end

def tracked?(req)
tracks.each_value do |tracker|
tracker[req]
Expand All @@ -105,7 +118,7 @@ def cache
end

def clear!
@safelists, @blocklists, @throttles, @tracks = {}, {}, {}, {}
@safelists, @blocklists, @throttles, @tracks, @throttles_with_leaky_bucket = {}, {}, {}, {}, {}
end

def blacklisted_response=(res)
Expand Down Expand Up @@ -142,6 +155,8 @@ def call(env)
self.class.blocklisted_response.call(env)
elsif throttled?(req)
self.class.throttled_response.call(env)
elsif throttled_with_leaky_bucket?(req)
self.class.throttled_response.call(env)
else
tracked?(req)
@app.call(env)
Expand All @@ -152,5 +167,6 @@ def call(env)
def_delegators self, :safelisted?,
:blocklisted?,
:throttled?,
:throttled_with_leaky_bucket?,
:tracked?
end
56 changes: 56 additions & 0 deletions lib/rack/attack/leaky_bucket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module Rack
class Attack
class LeakyBucket
attr_reader :value, :capacity, :leak, :last_updated_at

def initialize(capacity, leak, last_updated_at, value = 0)
raise ArgumentError, "wrong value for `leak`, must be larger than zero" unless leak > 0
raise ArgumentError, "wrong value for `capacity`, must be larger than zero" unless capacity > 0

@capacity = capacity.to_i
@leak = leak.to_f
@last_updated_at = (last_updated_at.to_f > 0 ? last_updated_at : Time.now).to_f
@value = value.to_f > 0 ? value.to_f : 0
@updated = false
end

def update_leak!
@value = current_value
@last_updated_at = Time.now.to_f
end

def current_value
seconds_since_last_update = Time.now.to_f - @last_updated_at
value = @value - (@leak * seconds_since_last_update)
value > 0 ? value : 0
end

def seconds_until_drained
current_value / @leak
end

def add(value_to_add)
update_leak!
@updated = true
@value += value_to_add
end

def full?
current_value + 1 > @capacity
end

def updated?
@updated
end

def serialize
"#{@value.to_f}|#{@last_updated_at.to_f}"
end

def self.unserialize(bucket_data, capacity, leak)
value, last_updated_at = (bucket_data || "0|#{Time.now.to_f}").split("|", 2)
new(capacity, leak, last_updated_at, value)
end
end
end
end
68 changes: 68 additions & 0 deletions lib/rack/attack/throttle_with_leaky_bucket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module Rack
class Attack
class ThrottleWithLeakyBucket
MANDATORY_OPTIONS = [:capacity, :leak]
attr_reader :name, :capacity, :leak, :block, :type

def initialize(name, options, block)
@name, @block = name, block
MANDATORY_OPTIONS.each do |opt|
raise ArgumentError.new("Must pass #{opt.inspect} option") unless options[opt]
end
@capacity = options[:capacity]
@leak = options[:leak]
@type = options.fetch(:type, :throttle_with_leaky_bucket)
end

def cache
Rack::Attack.cache
end

def [](req)
discriminator = block[req]
return false unless discriminator

# Normalize blocks to values
current_capacity = normalize_block(capacity, req)
current_leak = normalize_block(leak, req)

# Read the bucket data and unserialize it. We only update the bucket data
# if we've changed the value. We don't write to update the leaked amount
# since that can be calculated and since the TTL will remove the item when
# it has drained.
key = "#{name}:#{discriminator}"
bucket = LeakyBucket.unserialize(cache.read(key), current_capacity, current_leak)
throttled = bucket.full?
bucket.add(1) unless bucket.full?
store_bucket(key, bucket) if bucket.updated?

data = {
:bucket => bucket,
:leak => current_leak,
:capacity => current_capacity
}
(req.env['rack.attack.throttle_with_leaky_bucket_data'] ||= {})[name] = data

if throttled
req.env['rack.attack.matched'] = name
req.env['rack.attack.match_discriminator'] = discriminator
req.env['rack.attack.match_type'] = type
req.env['rack.attack.match_data'] = data
Rack::Attack.instrument(req)
end

throttled
end

private

def store_bucket(key, bucket)
cache.write(key, bucket.serialize, bucket.seconds_until_drained.ceil)
end

def normalize_block(value_or_block, *args_for_block)
value_or_block.respond_to?(:call) ? value_or_block.call(*args_for_block) : value_or_block
end
end
end
end
69 changes: 69 additions & 0 deletions spec/leaky_bucket_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require_relative "spec_helper"
require "active_support/core_ext/numeric/time"

describe "Rack::Attack::LeakyBucket" do
describe ".new(1, 1, Time.now, 0), empty bucket" do
it "isn't full" do
bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 0)
assert !bucket.full?, "Empty bucket reports as full"
end

it "becomes full when 1 is added" do
bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 0)
bucket.add(1)
assert bucket.full?, "Bucket that has value set to it's capacity should be full"
end
end

describe ".new(1, 1, Time.now, 1), full bucket" do
it "reports seconds_to_drain as 1" do
Time.stub :now, Time.now do
bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 1)
assert bucket.seconds_until_drained == 1.0
end
end

it "becomes empty after 1 second" do
bucket = Rack::Attack::LeakyBucket.new(1, 1, Time.now, 1)
Time.stub :now, 1.second.from_now do
assert !bucket.full?, "Bucket wasn't empty, instead had value = #{bucket.value}"
assert bucket.seconds_until_drained == 0, "Bucket reports seconds_until_drained = #{bucket.seconds_until_drained}"
end
end
end

describe ".unserialize" do
it "unserializes raw data correctly" do
Time.stub :now, Time.now do
bucket = Rack::Attack::LeakyBucket.unserialize("1|#{Time.now.to_f}", 1, 1)
assert_equal bucket.value, 1
assert_equal bucket.last_updated_at, Time.now.to_f
assert_equal bucket.leak, 1
assert_equal bucket.capacity, 1
assert bucket.full?, "Bucket isn't full"
end
end

it "handles nils correctly" do
Time.stub :now, Time.now do
bucket = Rack::Attack::LeakyBucket.unserialize(nil, 1, 1)
assert_equal bucket.value, 0
assert_equal bucket.last_updated_at, Time.now.to_f
assert_equal bucket.leak, 1
assert_equal bucket.capacity, 1
assert !bucket.full?
end
end

it "handles wrong values correctly" do
Time.stub :now, Time.now do
bucket = Rack::Attack::LeakyBucket.unserialize("-1|-132", 1, 1)
assert_equal bucket.value, 0
assert_equal bucket.last_updated_at, Time.now.to_f
assert_equal bucket.leak, 1
assert_equal bucket.capacity, 1
assert !bucket.full?
end
end
end
end
Loading

0 comments on commit bf74dcb

Please sign in to comment.