Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflow): support limit count action #7771

Merged
merged 11 commits into from
Aug 29, 2022
255 changes: 4 additions & 251 deletions apisix/plugins/limit-count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,271 +14,24 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local limit_local_new = require("resty.limit.count").new
local core = require("apisix.core")
local apisix_plugin = require("apisix.plugin")
local tab_insert = table.insert
local ipairs = ipairs
local pairs = pairs

local limit_count = require("apisix.plugins.limit-count.init")

local plugin_name = "limit-count"
local limit_redis_cluster_new
local limit_redis_new
do
local redis_src = "apisix.plugins.limit-count.limit-count-redis"
limit_redis_new = require(redis_src).new

local cluster_src = "apisix.plugins.limit-count.limit-count-redis-cluster"
limit_redis_cluster_new = require(cluster_src).new
end
local lrucache = core.lrucache.new({
type = 'plugin', serial_creating = true,
})
local group_conf_lru = core.lrucache.new({
type = 'plugin',
})


local policy_to_additional_properties = {
redis = {
properties = {
redis_host = {
type = "string", minLength = 2
},
redis_port = {
type = "integer", minimum = 1, default = 6379,
},
redis_password = {
type = "string", minLength = 0,
},
redis_database = {
type = "integer", minimum = 0, default = 0,
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
},
required = {"redis_host"},
},
["redis-cluster"] = {
properties = {
redis_cluster_nodes = {
type = "array",
minItems = 2,
items = {
type = "string", minLength = 2, maxLength = 100
},
},
redis_password = {
type = "string", minLength = 0,
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
redis_cluster_name = {
type = "string",
},
},
required = {"redis_cluster_nodes", "redis_cluster_name"},
},
}
local schema = {
type = "object",
properties = {
count = {type = "integer", exclusiveMinimum = 0},
time_window = {type = "integer", exclusiveMinimum = 0},
group = {type = "string"},
key = {type = "string", default = "remote_addr"},
key_type = {type = "string",
enum = {"var", "var_combination", "constant"},
default = "var",
},
rejected_code = {
type = "integer", minimum = 200, maximum = 599, default = 503
},
rejected_msg = {
type = "string", minLength = 1
},
policy = {
type = "string",
enum = {"local", "redis", "redis-cluster"},
default = "local",
},
allow_degradation = {type = "boolean", default = false},
show_limit_quota_header = {type = "boolean", default = true}
},
required = {"count", "time_window"},
["if"] = {
properties = {
policy = {
enum = {"redis"},
},
},
},
["then"] = policy_to_additional_properties.redis,
["else"] = {
["if"] = {
properties = {
policy = {
enum = {"redis-cluster"},
},
},
},
["then"] = policy_to_additional_properties["redis-cluster"],
}
}

local schema_copy = core.table.deepcopy(schema)

local _M = {
version = 0.4,
priority = 1002,
name = plugin_name,
schema = schema,
schema = limit_count.schema,
}


local function group_conf(conf)
return conf
end


function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)
if not ok then
return false, err
end

if conf.group then
local fields = {}
-- When the goup field is configured,
-- we will use schema_copy to get the whitelist of properties,
-- so that we can avoid getting injected properties.
for k in pairs(schema_copy.properties) do
tab_insert(fields, k)
end
local extra = policy_to_additional_properties[conf.policy]
if extra then
for k in pairs(extra.properties) do
tab_insert(fields, k)
end
end

local prev_conf = group_conf_lru(conf.group, "", group_conf, conf)

for _, field in ipairs(fields) do
if not core.table.deep_eq(prev_conf[field], conf[field]) then
core.log.error("previous limit-conn group ", prev_conf.group,
" conf: ", core.json.encode(prev_conf))
core.log.error("current limit-conn group ", conf.group,
" conf: ", core.json.encode(conf))
return false, "group conf mismatched"
end
end
end

return true
end


local function create_limit_obj(conf)
core.log.info("create new limit-count plugin instance")

if not conf.policy or conf.policy == "local" then
return limit_local_new("plugin-" .. plugin_name, conf.count,
conf.time_window)
end

if conf.policy == "redis" then
return limit_redis_new("plugin-" .. plugin_name,
conf.count, conf.time_window, conf)
end

if conf.policy == "redis-cluster" then
return limit_redis_cluster_new("plugin-" .. plugin_name, conf.count,
conf.time_window, conf)
end

return nil
return limit_count.check_schema(conf)
end


function _M.access(conf, ctx)
core.log.info("ver: ", ctx.conf_version)

local lim, err
if not conf.group then
lim, err = core.lrucache.plugin_ctx(lrucache, ctx, conf.policy, create_limit_obj, conf)
else
lim, err = lrucache(conf.group, "", create_limit_obj, conf)
end

if not lim then
core.log.error("failed to fetch limit.count object: ", err)
if conf.allow_degradation then
return
end
return 500
end

local conf_key = conf.key
local key
if conf.key_type == "var_combination" then
local err, n_resolved
key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var)
if err then
core.log.error("could not resolve vars in ", conf_key, " error: ", err)
end

if n_resolved == 0 then
key = nil
end
elseif conf.key_type == "constant" then
key = conf_key
else
key = ctx.var[conf_key]
end

if key == nil then
core.log.info("The value of the configured key is empty, use client IP instead")
-- When the value of key is empty, use client IP instead
key = ctx.var["remote_addr"]
end

-- here we add a separator ':' to mark the boundary of the prefix and the key itself
if not conf.group then
-- Here we use plugin-level conf version to prevent the counter from being resetting
-- because of the change elsewhere.
-- A route which reuses a previous route's ID will inherits its counter.
key = ctx.conf_type .. ctx.conf_id .. ':' .. apisix_plugin.conf_version(conf) .. ':' .. key
else
key = conf.group .. ':' .. key
end

core.log.info("limit key: ", key)

local delay, remaining = lim:incoming(key, true)
if not delay then
local err = remaining
if err == "rejected" then
if conf.rejected_msg then
return conf.rejected_code, { error_msg = conf.rejected_msg }
end
return conf.rejected_code
end

core.log.error("failed to limit count: ", err)
if conf.allow_degradation then
return
end
return 500, {error_msg = "failed to limit count"}
end

if conf.show_limit_quota_header then
core.response.set_header("X-RateLimit-Limit", conf.count,
"X-RateLimit-Remaining", remaining)
end
return limit_count.rate_limit(conf, ctx)
end


Expand Down
Loading