From 6ecb349afec090293e1a6bfda551dc2131d3b066 Mon Sep 17 00:00:00 2001 From: Pablo Ubal <134373651+pubalokta@users.noreply.github.com> Date: Wed, 16 Oct 2024 10:45:38 +0200 Subject: [PATCH] feat: add fixed_window bucket option (#79) * feat: add fixed_window bucket option * activate fixed_window through param so we can control the rollout using feature flags * add fixed window on ERL * disabled CI as docker-compose is erroring out * adjust reset_ms based on fixed_window * fixed CI to use docker compose instead of docker-compose * fix take_elevated for fixed_window * tests to make sure the last_drip ain't modified within the same interval in fixed window * fixed window param takes precedence over bucket config * added fixed_window to README.md * improved description in readme * fixed returning modified current_timestamp and reset_ms correctly calculated for fixed window in takeeleavated * added isFixedWindowEnabled tests and clarified README.md * fixed flaky test * clarified README.md and added tests * missing trail line * nit:trailing comma --------- Co-authored-by: panga --- README.md | 28 ++++++ lib/db.js | 6 +- lib/take.lua | 15 ++- lib/take_elevated.lua | 46 +++++++-- lib/utils.js | 23 ++++- package.json | 1 + test/db.standalonemode.tests.js | 1 - test/db.tests.js | 169 +++++++++++++++++++++++++++++++- test/utils.tests.js | 41 +++++++- 9 files changed, 312 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 49fb2aa..a8d79c6 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ It's a fork from [LimitDB](https://github.com/limitd/limitdb). - [Breaking changes from `Limitdb`](#breaking-changes-from-limitdb) - [TAKE](#take) - [TAKEELEVATED](#takeelevated) + - [Use of fixed window on Take and TakeElevated](#use-of-fixed-window-on-take-and-takeelevated) - [PUT](#put) - [Overriding Configuration at Runtime](#overriding-configuration-at-runtime) - [Overriding Configuration at Runtime with ERL](#overriding-configuration-at-runtime-with-erl) @@ -82,6 +83,7 @@ const limitd = new Limitd({ - `unlimited` (boolean = false): unlimited requests (skip take). - `skip_n_calls` (number): take will go to redis every `n` calls instead of going in every take. - `elevated_limits` (object): elevated limits configuration that kicks in when the bucket is empty. Please refer to the [ERL section](#ERL-Elevated-Rate-Limits) for more details. +- `fixed_window` (boolean = false): refill at specified interval instead of granular. You can also define your rates using `per_second`, `per_minute`, `per_hour`, `per_day`. So `per_second: 1` is equivalent to `per_interval: 1, interval: 1000`. @@ -320,6 +322,32 @@ if erl_triggered // quota left in the quotaKey bucket if !erl_triggered // ERL wasn't triggered in this call, so we haven't identified the remaining quota. ``` +### Use of fixed window in Take and TakeElevated +By default, the bucket uses the sliding window algorithm to refill tokens. For example, if the bucket is set to 100 tokens per second, it refills 1 token every 10 milliseconds (1000ms / 100 tokens per second). + +With the fixed window algorithm, the bucket refills at the specified interval. For instance, if set to 100 tokens per second, it refills 100 tokens every second. + +To use the fixed window algorithm on `Take` or `TakeElevated`, set the `fixed_window` property in the bucket configuration to `true` (default is `false`). This will refill the bucket at the specified interval + +Additionally, you can use the `fixed_window` flag in the configOverride parameter. This acts as a feature flag for safe deployment, but it cannot activate the fixed window algorithm if the bucket configuration is set to false. + +Both the bucket configuration and the configOverride parameter must be set to true to activate the fixed window algorithm. If the configOverride parameter is not provided, it defaults to true, and the activation depends on the bucket configuration. + +The following table describes how the fixed window bucket configuration and the fixed window param interact to activate the fixed window algorithm. + +| fixed_window bucket config | fixed_window param | Fixed Window Enabled | +|----------------------------|--------------------|----------------------| +| true | true | Yes | +| true | false | No | +| true | not provided | Yes | +| false | true | No | +| false | false | No | +| false | not provided | No | +| not provided | true | No | +| not provided | false | No | +| not provided | not provided | No | + + ## PUT You can manually reset a fill a bucket using PUT: diff --git a/lib/db.js b/lib/db.js index f7aaefc..2a8a749 100644 --- a/lib/db.js +++ b/lib/db.js @@ -7,7 +7,7 @@ const utils = require('./utils'); const Redis = require('ioredis'); const { validateParams, validateERLParams } = require('./validation'); const DBPing = require("./db_ping"); -const { calculateQuotaExpiration, resolveElevatedParams } = require('./utils'); +const { calculateQuotaExpiration, resolveElevatedParams, isFixedWindowEnabled } = require('./utils'); const EventEmitter = require('events').EventEmitter; const TAKE_LUA = fs.readFileSync(`${__dirname}/take.lua`, "utf8"); @@ -264,12 +264,14 @@ class LimitDBRedis extends EventEmitter { */ take(params, callback) { this._doTake(params, callback, (key, bucketKeyConfig, count) => { + const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window); this.redis.take(key, bucketKeyConfig.ms_per_interval || 0, bucketKeyConfig.size, count, Math.ceil(bucketKeyConfig.ttl || this.globalTTL), bucketKeyConfig.drip_interval || 0, + useFixedWindow ? bucketKeyConfig.interval : 0, (err, results) => { if (err) { return callback(err); @@ -308,12 +310,14 @@ class LimitDBRedis extends EventEmitter { this._doTake(params, callback, (key, bucketKeyConfig, count) => { const elevated_limits = resolveElevatedParams(erlParams, bucketKeyConfig, key, this.prefix); const erl_quota_expiration = calculateQuotaExpiration(elevated_limits); + const useFixedWindow = isFixedWindowEnabled(bucketKeyConfig.fixed_window, params.fixed_window); this.redis.takeElevated(key, elevated_limits.erl_is_active_key, elevated_limits.erl_quota_key, bucketKeyConfig.ms_per_interval || 0, bucketKeyConfig.size, count, Math.ceil(bucketKeyConfig.ttl || this.globalTTL), bucketKeyConfig.drip_interval || 0, + useFixedWindow ? bucketKeyConfig.interval : 0, elevated_limits.ms_per_interval, elevated_limits.size, elevated_limits.erl_activation_period_seconds, diff --git a/lib/take.lua b/lib/take.lua index 14e8d7f..0099d65 100644 --- a/lib/take.lua +++ b/lib/take.lua @@ -4,9 +4,11 @@ local new_content = tonumber(ARGV[2]) local tokens_to_take = tonumber(ARGV[3]) local ttl = tonumber(ARGV[4]) local drip_interval = tonumber(ARGV[5]) +local fixed_window = tonumber(ARGV[6]) local current_time = redis.call('TIME') local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000 +local redis_timestamp_ms = current_timestamp_ms local current = redis.pcall('HMGET', KEYS[1], 'd', 'r') @@ -18,6 +20,13 @@ if current[1] and tokens_per_ms then -- drip bucket local last_drip = current[1] local content = current[2] + + if fixed_window > 0 then + -- fixed window for granting new tokens + local interval_correction = (current_timestamp_ms - last_drip) % fixed_window + current_timestamp_ms = current_timestamp_ms - interval_correction + end + local delta_ms = math.max(current_timestamp_ms - last_drip, 0) local drip_amount = delta_ms * tokens_per_ms new_content = math.min(content + drip_amount, bucket_size) @@ -41,8 +50,10 @@ redis.call('HMSET', KEYS[1], redis.call('EXPIRE', KEYS[1], ttl) local reset_ms = 0 -if drip_interval > 0 then +if fixed_window > 0 then + reset_ms = current_timestamp_ms + fixed_window +elseif drip_interval > 0 then reset_ms = math.ceil(current_timestamp_ms + (bucket_size - new_content) * drip_interval) end -return { new_content, enough_tokens, current_timestamp_ms, reset_ms } +return { new_content, enough_tokens, redis_timestamp_ms, reset_ms } diff --git a/lib/take_elevated.lua b/lib/take_elevated.lua index 51104fb..e7b8dbb 100644 --- a/lib/take_elevated.lua +++ b/lib/take_elevated.lua @@ -3,12 +3,13 @@ local bucket_size = tonumber(ARGV[2]) local tokens_to_take = tonumber(ARGV[3]) local ttl = tonumber(ARGV[4]) local drip_interval = tonumber(ARGV[5]) -local erl_tokens_per_ms = tonumber(ARGV[6]) -local erl_bucket_size = tonumber(ARGV[7]) -local erl_activation_period_seconds = tonumber(ARGV[8]) -local erl_quota = tonumber(ARGV[9]) -local erl_quota_expiration_epoch = tonumber(ARGV[10]) -local erl_configured_for_bucket = tonumber(ARGV[11]) == 1 +local fixed_window = tonumber(ARGV[6]) +local erl_tokens_per_ms = tonumber(ARGV[7]) +local erl_bucket_size = tonumber(ARGV[8]) +local erl_activation_period_seconds = tonumber(ARGV[9]) +local erl_quota = tonumber(ARGV[10]) +local erl_quota_expiration_epoch = tonumber(ARGV[11]) +local erl_configured_for_bucket = tonumber(ARGV[12]) == 1 -- the key to use for pulling last bucket state from redis local lastBucketStateKey = KEYS[1] @@ -29,12 +30,36 @@ end -- get current time from redis, to be used in new bucket size calculations later local current_time = redis.call('TIME') local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000 +local redis_timestamp_ms = current_timestamp_ms + +local function adjustCurrentTimestampForFixedWindow(current_timestamp_ms) + if current[1] and tokens_per_ms then + -- drip bucket + local last_drip = current[1] + + if fixed_window > 0 then + -- fixed window for granting new tokens + local interval_correction = (current_timestamp_ms - last_drip) % fixed_window + current_timestamp_ms = current_timestamp_ms - interval_correction + end + + return current_timestamp_ms + end + return current_timestamp_ms +end local function calculateNewBucketContent(current, tokens_per_ms, bucket_size, current_timestamp_ms) if current[1] and tokens_per_ms then -- drip bucket local last_drip = current[1] local content = current[2] + + if fixed_window > 0 then + -- fixed window for granting new tokens + local interval_correction = (current_timestamp_ms - last_drip) % fixed_window + current_timestamp_ms = current_timestamp_ms - interval_correction + end + local delta_ms = math.max(current_timestamp_ms - last_drip, 0) local drip_amount = delta_ms * tokens_per_ms return math.min(content + drip_amount, bucket_size) @@ -71,6 +96,9 @@ local function takeERLQuota(erl_quota_key, erl_quota, erl_quota_expiration_epoch return previously_used_quota end +-- adjust current timestamp for fixed window +current_timestamp_ms = adjustCurrentTimestampForFixedWindow(current_timestamp_ms) + -- Enable verbatim replication to ensure redis sends script's source code to all masters -- managing the sharded database in a clustered deployment. -- https://redis.io/docs/interact/programmability/eval-intro/#:~:text=scripts%20debugger.-,Script%20replication,-In%20standalone%20deployments @@ -124,7 +152,9 @@ redis.call('HMSET', lastBucketStateKey, redis.call('EXPIRE', lastBucketStateKey, ttl) local reset_ms = 0 -if drip_interval > 0 then +if fixed_window > 0 then + reset_ms = current_timestamp_ms + fixed_window +elseif drip_interval > 0 then if is_erl_activated == 1 then reset_ms = math.ceil(current_timestamp_ms + (erl_bucket_size - bucket_content_after_take) * drip_interval) else @@ -133,4 +163,4 @@ if drip_interval > 0 then end -- Return the current quota -return { bucket_content_after_take, enough_tokens, current_timestamp_ms, reset_ms, erl_triggered, is_erl_activated, erl_quota_left } +return { bucket_content_after_take, enough_tokens, redis_timestamp_ms, reset_ms, erl_triggered, is_erl_activated, erl_quota_left } diff --git a/lib/utils.js b/lib/utils.js index a040b02..a2f31d9 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -23,7 +23,8 @@ function normalizeTemporals(params) { 'interval', 'size', 'unlimited', - 'skip_n_calls' + 'skip_n_calls', + 'fixed_window' ]); INTERVAL_SHORTCUTS.forEach(intervalShortcut => { @@ -219,6 +220,23 @@ function replicateHashtag(baseKey, prefix, key) { } } +/** isFixedWindowEnabled + * | fixed_window bucket config | fixed_window param | Fixed Window Enabled | + * |----------------------------|--------------------|----------------------| + * | true | true | Yes | + * | true | false | No | + * | true | not provided | Yes | + * | false | true | No | + * | false | false | No | + * | false | not provided | No | + * | not provided | true | No | + * | not provided | false | No | + * | not provided | not provided | No | + */ +function isFixedWindowEnabled(fixedWindowFromConfig, fixedWindowFromParam) { + return fixedWindowFromConfig === true && (fixedWindowFromParam === true || fixedWindowFromParam === undefined); +} + module.exports = { buildBuckets, buildBucket, @@ -232,5 +250,6 @@ module.exports = { endOfMonthTimestamp, calculateQuotaExpiration, resolveElevatedParams, - replicateHashtag + replicateHashtag, + isFixedWindowEnabled, }; diff --git a/package.json b/package.json index 31dc96e..bfa74d2 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "mocha": "^5.2.0", "mockdate": "^3.0.5", "nyc": "^14.1.1", + "sinon": "^19.0.2", "toxiproxy-node-client": "^2.0.6" } } diff --git a/test/db.standalonemode.tests.js b/test/db.standalonemode.tests.js index d662994..fb8784e 100644 --- a/test/db.standalonemode.tests.js +++ b/test/db.standalonemode.tests.js @@ -174,7 +174,6 @@ describe('when using LimitDB', () => { } if (err) { - console.log(err, err.message); done(err); } }); diff --git a/test/db.tests.js b/test/db.tests.js index fa2702e..17cd8cd 100644 --- a/test/db.tests.js +++ b/test/db.tests.js @@ -4,6 +4,7 @@ const async = require('async'); const _ = require('lodash'); const assert = require('chai').assert; const { endOfMonthTimestamp, replicateHashtag } = require('../lib/utils'); +const sinon = require('sinon'); const buckets = { ip: { @@ -39,6 +40,14 @@ const buckets = { '9.8.7.6': { size: 200, }, + '123.123.123.123': { + per_second: 1000, + fixed_window: true + }, + '124.124.124.124': { + per_second: 1000, + fixed_window: false + } } }, user: { @@ -449,7 +458,7 @@ module.exports.tests = (clientCreator) => { } assert.ok(result.conformant); assert.equal(result.remaining, 0); - assert.closeTo(result.reset, now / 1000 + 1800, 1); + assert.closeTo(result.reset, now / 1000 + 1800, 3); assert.closeTo(result.delta_reset_ms, (result.limit - result.remaining) * 3600000/buckets.ip.overrides['10.0.0.1'].per_hour, 1); assert.equal(result.limit, 1); done(); @@ -843,6 +852,162 @@ module.exports.tests = (clientCreator) => { }); }); + [ + { + name: 'take', + takeFunc: (takeParams, cb) => db.take(takeParams, cb), + takeStub: () => db.redis.take, + fixedWindowParamPosition: 6, + }, + { + name: 'takeElevated', + takeFunc: (takeParams, cb) => db.takeElevated(takeParams, cb), + takeStub: () => db.redis.takeElevated, + fixedWindowParamPosition: 8, + } + ].forEach(({ name, takeFunc, takeStub, fixedWindowParamPosition }) => { + describe(`fixed window for ${name}`, () => { + const redisHMGetPromise = (key, fields) => new Promise((resolve, reject) => { + db.redis.hmget(key, fields, (err, value) => { + if (err) { + return reject(err); + } + resolve(value); + }); + }); + + describe(`when calling the lua script`, () => { + it(`should use fixed window when asked`, (done) => { + const interval = 1000; + const key = '123.123.123.123'; + takeFunc({ type: 'ip', key, count: 1000, fixed_window: true }, (err, response) => { + if (err) return done(err); + assert.ok(response.conformant); + assert.equal(response.remaining, 0); + assert.closeTo(response.delta_reset_ms, interval, 100); + assert.equal(response.limit, 1000); + redisHMGetPromise(`ip:${key}`, ['d', 'r']).then((value) => { + const lastDrip = value[0]; + setTimeout(() => { + takeFunc({ type: 'ip', key, count: 1, fixed_window: true }, (err, response) => { + assert.notOk(response.conformant); + assert.equal(response.remaining, 0); + assert.closeTo(response.delta_reset_ms, interval/2, 100); + assert.equal(response.limit, 1000); + redisHMGetPromise(`ip:${key}`, ['d', 'r']).then((value) => { + assert.equal(value[0], lastDrip, 'last drip should not have changed'); + setTimeout(() => { + takeFunc({ type: 'ip', key, count: 1, fixed_window: true }, (err, response) => { + assert.ok(response.conformant); + assert.equal(response.remaining, 999); + assert.closeTo(response.delta_reset_ms, interval, 100); + assert.equal(response.limit, 1000); + redisHMGetPromise(`ip:${key}`, ['d', 'r']).then((value) => { + assert.notEqual(value[0], lastDrip, 'last drip should have changed'); + done(); + }); + }); + }, interval / 2); + }); + }); + }, interval / 2); + }); + }); + }); + }); + + describe('when checking the arguments used to call the script', () => { + let mockedRedis; + let realRedis; + beforeEach((done) => { + realRedis = db.redis; + const currentTime = Date.now() / 1000; + mockedRedis = { + take: sinon.stub().callsFake((key, tokensPerMs, size, count, ttl, dripInterval, fixedWindowInterval, callback) => { + callback(null, ['0', '1', currentTime.toString(), (currentTime + 100).toString()]); + }), + takeElevated: sinon.stub().callsFake((key, erlActiveKey, erlQuotaKey, tokensPerMs, size, count, ttl, dripInterval, fixedWindowInterval, erlTokensPerMs, erlSize, erlPeriod, erlQuota, erlQuotaExp, erlConfigured, callback) => { + const currentTime = Date.now() / 1000; + callback(null, ['0', '1', currentTime.toString(), (currentTime + 100).toString(), '0', '0', '0']); + }) + }; + db.redis = mockedRedis; + done(); + }); + + afterEach((done) => { + db.redis = realRedis; + done(); + }); + + describe('when fixed_window is enabled in the bucket config', () => { + it('should pass fixed window interval = 1000 when fixed_window param is true', (done) => { + const params = { type: 'ip', key: '123.123.123.123', count: 1, fixed_window: true }; + takeFunc(params, (err, response) => { + if (err) { + return done(err); + } + sinon.assert.calledOnce(takeStub()); + assert.equal(takeStub().getCall(0).args[fixedWindowParamPosition], 1000); + done(); + }); + }); + + it('should pass fixed window interval = 0 when fixed_window param is false', (done) => { + const params = { type: 'ip', key: '123.123.123.123', count: 1, fixed_window: false }; + takeFunc(params, (err, response) => { + if (err) { + return done(err); + } + sinon.assert.calledOnce(takeStub()); + assert.equal(takeStub().getCall(0).args[fixedWindowParamPosition], 0); + done(); + }); + }); + + it('should pass fixed window interval = 1000 when fixed_window param is not provided', (done) => { + const params = { type: 'ip', key: '123.123.123.123', count: 1 }; + takeFunc(params, (err, response) => { + if (err) { + return done(err); + } + sinon.assert.calledOnce(takeStub()); + assert.equal(takeStub().getCall(0).args[fixedWindowParamPosition], 1000); + done(); + }); + }); + }); + + describe('when fixed_window is disabled in the bucket config', () => { + [ + { + fixed_window: true, + }, + { + fixed_window: false, + }, + { + fixed_window: undefined, + }, + ].forEach(({ fixed_window }) => { + it(`should pass fixed window interval = 0 when fixed_window param is ${fixed_window}`, (done) => { + const params = { type: 'ip', key: '124.124.124.124', count: 1 }; + takeFunc(params, (err, response) => { + if (err) { + return done(err); + } + sinon.assert.calledOnce(takeStub()); + assert.equal(takeStub().getCall(0).args[fixedWindowParamPosition], 0); + done(); + }); + }); + }) + }); + }); + }); + }); + + describe('elevated limits specific tests', () => { const takeElevatedPromise = (params) => new Promise((resolve, reject) => { db.takeElevated(params, (err, response) => { @@ -1259,7 +1424,7 @@ module.exports.tests = (clientCreator) => { assert.isTrue(result.elevated_limits.activated); assert.isTrue(result.elevated_limits.erl_configured_for_bucket) assert.equal(result.limit, 5); - assert.equal(result.remaining, 3); + assert.isAbove(result.remaining, 1); done(); }); }); diff --git a/test/utils.tests.js b/test/utils.tests.js index 36765c2..6243814 100644 --- a/test/utils.tests.js +++ b/test/utils.tests.js @@ -5,7 +5,7 @@ const chaiExclude = require('chai-exclude'); chai.use(chaiExclude); const assert = chai.assert; -const { getERLParams, calculateQuotaExpiration, normalizeType, resolveElevatedParams, replicateHashtag } = require('../lib/utils'); +const { getERLParams, calculateQuotaExpiration, normalizeType, resolveElevatedParams, replicateHashtag, isFixedWindowEnabled } = require('../lib/utils'); const { set, reset } = require('mockdate'); const { expect } = require('chai'); @@ -393,5 +393,42 @@ describe('utils', () => { assert.equal(result, `${elevatedBucketName}:{{${key}}`); }); }); - }) + }); + + describe('isFixedWindowEnabled', () => { + it('should return true when fixed_window bucket config is true and fixed_window param is true', () => { + const result = isFixedWindowEnabled(true, true); + assert.isTrue(result); + }); + + it('should return false when fixed_window bucket config is true and fixed_window param is false', () => { + const result = isFixedWindowEnabled(true, false); + assert.isFalse(result); + }); + + it('should return true when fixed_window bucket config is true and fixed_window param is not provided', () => { + const result = isFixedWindowEnabled(true, undefined); + assert.isTrue(result); + }); + + it('should return false when fixed_window bucket config is false and fixed_window param is true', () => { + const result = isFixedWindowEnabled(false, true); + assert.isFalse(result); + }); + + it('should return false when fixed_window bucket config is false and fixed_window param is false', () => { + const result = isFixedWindowEnabled(false, false); + assert.isFalse(result); + }); + + it('should return false when fixed_window bucket config is false and fixed_window param is not provided', () => { + const result = isFixedWindowEnabled(false, undefined); + assert.isFalse(result); + }); + + it('should return false when fixed_window bucket config is not present and fixed_window param is true', () => { + const result = isFixedWindowEnabled(undefined, true); + assert.isFalse(result); + }); + }); });