Skip to content

Commit

Permalink
feat: add consumer group (#7980)
Browse files Browse the repository at this point in the history
  • Loading branch information
kingluo authored Oct 11, 2022
1 parent c25e112 commit 7e2cc4f
Show file tree
Hide file tree
Showing 20 changed files with 1,488 additions and 4 deletions.
195 changes: 195 additions & 0 deletions apisix/admin/consumer_group.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local consumers = require("apisix.consumer").consumers
local utils = require("apisix.admin.utils")
local schema_plugin = require("apisix.admin.plugins").check_schema
local type = type
local tostring = tostring
local ipairs = ipairs


local _M = {
need_v3_filter = true,
}


local function check_conf(id, conf, need_id)
if not conf then
return nil, {error_msg = "missing configurations"}
end

id = id or conf.id
if need_id and not id then
return nil, {error_msg = "missing id"}
end

if not need_id and id then
return nil, {error_msg = "wrong id, do not need it"}
end

if need_id and conf.id and tostring(conf.id) ~= tostring(id) then
return nil, {error_msg = "wrong id"}
end

conf.id = id

core.log.info("conf: ", core.json.delay_encode(conf))
local ok, err = core.schema.check(core.schema.consumer_group, conf)
if not ok then
return nil, {error_msg = "invalid configuration: " .. err}
end

local ok, err = schema_plugin(conf.plugins)
if not ok then
return nil, {error_msg = err}
end

return true
end


function _M.put(id, conf)
local ok, err = check_conf(id, conf, true)
if not ok then
return 400, err
end

local key = "/consumer_groups/" .. id

local ok, err = utils.inject_conf_with_prev_conf("consumer_group", key, conf)
if not ok then
return 503, {error_msg = err}
end

local res, err = core.etcd.set(key, conf)
if not res then
core.log.error("failed to put consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

return res.status, res.body
end


function _M.get(id)
local key = "/consumer_groups"
if id then
key = key .. "/" .. id
end
local res, err = core.etcd.get(key, not id)
if not res then
core.log.error("failed to get consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

utils.fix_count(res.body, id)
return res.status, res.body
end


function _M.delete(id)
if not id then
return 400, {error_msg = "missing consumer group id"}
end

local consumers, consumers_ver = consumers()
if consumers_ver and consumers then
for _, consumer in ipairs(consumers) do
if type(consumer) == "table" and consumer.value
and consumer.value.group_id
and tostring(consumer.value.group_id) == id then
return 400, {error_msg = "can not delete this consumer group,"
.. " consumer [" .. consumer.value.id
.. "] is still using it now"}
end
end
end

local key = "/consumer_groups/" .. id
local res, err = core.etcd.delete(key)
if not res then
core.log.error("failed to delete consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end


return res.status, res.body
end


function _M.patch(id, conf, sub_path)
if not id then
return 400, {error_msg = "missing consumer group id"}
end

if not conf then
return 400, {error_msg = "missing new configuration"}
end

if not sub_path or sub_path == "" then
if type(conf) ~= "table" then
return 400, {error_msg = "invalid configuration"}
end
end

local key = "/consumer_groups/" .. id
local res_old, err = core.etcd.get(key)
if not res_old then
core.log.error("failed to get consumer group [", key, "]: ", err)
return 503, {error_msg = err}
end

if res_old.status ~= 200 then
return res_old.status, res_old.body
end
core.log.info("key: ", key, " old value: ",
core.json.delay_encode(res_old, true))

local node_value = res_old.body.node.value
local modified_index = res_old.body.node.modifiedIndex

if sub_path and sub_path ~= "" then
local code, err, node_val = core.table.patch(node_value, sub_path, conf)
node_value = node_val
if code then
return code, err
end
utils.inject_timestamp(node_value, nil, true)
else
node_value = core.table.merge(node_value, conf)
utils.inject_timestamp(node_value, nil, conf)
end

core.log.info("new conf: ", core.json.delay_encode(node_value, true))

local ok, err = check_conf(id, node_value, true)
if not ok then
return 400, err
end

local res, err = core.etcd.atomic_set(key, node_value, nil, modified_index)
if not res then
core.log.error("failed to set new consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

return res.status, res.body
end


return _M
16 changes: 16 additions & 0 deletions apisix/admin/consumers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ local function check_conf(username, conf)
end
end

if conf.group_id then
local key = "/consumer_groups/" .. conf.group_id
local res, err = core.etcd.get(key)
if not res then
return nil, {error_msg = "failed to fetch consumer group info by "
.. "consumer group id [" .. conf.group_id .. "]: "
.. err}
end

if res.status ~= 200 then
return nil, {error_msg = "failed to fetch consumer group info by "
.. "consumer group id [" .. conf.group_id .. "], "
.. "response code: " .. res.status}
end
end

return conf.username
end

Expand Down
1 change: 1 addition & 0 deletions apisix/admin/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ local resources = {
stream_routes = require("apisix.admin.stream_routes"),
plugin_metadata = require("apisix.admin.plugin_metadata"),
plugin_configs = require("apisix.admin.plugin_config"),
consumer_groups = require("apisix.admin.consumer_group"),
}


Expand Down
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ return {
["/global_rules"] = true,
["/protos"] = true,
["/plugin_configs"] = true,
["/consumer_groups"] = true,
},
STREAM_ETCD_DIRECTORY = {
["/upstreams"] = true,
Expand Down
1 change: 1 addition & 0 deletions apisix/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ end
function _M.attach_consumer(ctx, consumer, conf)
ctx.consumer = consumer
ctx.consumer_name = consumer.consumer_name
ctx.consumer_group_id = consumer.group_id
ctx.consumer_ver = conf.conf_version
end

Expand Down
47 changes: 47 additions & 0 deletions apisix/consumer_group.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local plugin_checker = require("apisix.plugin").plugin_checker
local error = error


local consumer_groups


local _M = {
}


function _M.init_worker()
local err
consumer_groups, err = core.config.new("/consumer_groups", {
automatic = true,
item_schema = core.schema.consumer_group,
checker = plugin_checker,
})
if not consumer_groups then
error("failed to sync /consumer_groups: " .. err)
end
end


function _M.get(id)
return consumer_groups:get(id)
end


return _M
1 change: 1 addition & 0 deletions apisix/core/ctx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ do
local apisix_var_names = {
balancer_ip = true,
balancer_port = true,
consumer_group_id = true,
consumer_name = true,
route_id = true,
route_name = true,
Expand Down
14 changes: 14 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ local core = require("apisix.core")
local conf_server = require("apisix.conf_server")
local plugin = require("apisix.plugin")
local plugin_config = require("apisix.plugin_config")
local consumer_group = require("apisix.consumer_group")
local script = require("apisix.script")
local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
Expand Down Expand Up @@ -147,6 +148,7 @@ function _M.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()
consumer_group.init_worker()

apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
Expand Down Expand Up @@ -446,9 +448,21 @@ function _M.http_access_phase()
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
local changed
local group_conf

if api_ctx.consumer.group_id then
group_conf = consumer_group.get(api_ctx.consumer.group_id)
if not group_conf then
core.log.error("failed to fetch consumer group config by ",
"id: ", api_ctx.consumer.group_id)
return core.response.exit(503)
end
end

route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
group_conf,
api_ctx
)

Expand Down
Loading

0 comments on commit 7e2cc4f

Please sign in to comment.