Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add consumer group #7980

Merged
merged 11 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions apisix/admin/consumer_group.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local consumers = require("apisix.consumer").consumers
local utils = require("apisix.admin.utils")
local schema_plugin = require("apisix.admin.plugins").check_schema
local type = type
local tostring = tostring
local ipairs = ipairs


local _M = {
need_v3_filter = true,
}


local function check_conf(id, conf, need_id)
if not conf then
return nil, {error_msg = "missing configurations"}
end

id = id or conf.id
if need_id and not id then
return nil, {error_msg = "missing id"}
end

if not need_id and id then
return nil, {error_msg = "wrong id, do not need it"}
end

if need_id and conf.id and tostring(conf.id) ~= tostring(id) then
return nil, {error_msg = "wrong id"}
end

conf.id = id

core.log.info("conf: ", core.json.delay_encode(conf))
local ok, err = core.schema.check(core.schema.consumer_group, conf)
if not ok then
return nil, {error_msg = "invalid configuration: " .. err}
end

local ok, err = schema_plugin(conf.plugins)
if not ok then
return nil, {error_msg = err}
end

return true
end


function _M.put(id, conf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a check in consumers. Let's make sure consumer.group_id points to an actual consumer group.

local ok, err = check_conf(id, conf, true)
if not ok then
return 400, err
end

local key = "/consumer_groups/" .. id

local ok, err = utils.inject_conf_with_prev_conf("consumer_group", key, conf)
if not ok then
return 503, {error_msg = err}
end

local res, err = core.etcd.set(key, conf)
if not res then
core.log.error("failed to put consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

return res.status, res.body
end


function _M.get(id)
local key = "/consumer_groups"
if id then
key = key .. "/" .. id
end
local res, err = core.etcd.get(key, not id)
if not res then
core.log.error("failed to get consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

utils.fix_count(res.body, id)
return res.status, res.body
end


function _M.delete(id)
if not id then
return 400, {error_msg = "missing consumer group id"}
end

local consumers, consumers_ver = consumers()
if consumers_ver and consumers then
for _, consumer in ipairs(consumers) do
if type(consumer) == "table" and consumer.value
and consumer.value.group_id
and tostring(consumer.value.group_id) == id then
return 400, {error_msg = "can not delete this consumer group,"
.. " consumer [" .. consumer.value.id
.. "] is still using it now"}
end
end
end

local key = "/consumer_groups/" .. id
local res, err = core.etcd.delete(key)
if not res then
core.log.error("failed to delete consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end


return res.status, res.body
end


function _M.patch(id, conf, sub_path)
if not id then
return 400, {error_msg = "missing consumer group id"}
end

if not conf then
return 400, {error_msg = "missing new configuration"}
end

if not sub_path or sub_path == "" then
if type(conf) ~= "table" then
return 400, {error_msg = "invalid configuration"}
end
end

local key = "/consumer_groups/" .. id
local res_old, err = core.etcd.get(key)
if not res_old then
core.log.error("failed to get consumer group [", key, "]: ", err)
return 503, {error_msg = err}
end

if res_old.status ~= 200 then
return res_old.status, res_old.body
end
core.log.info("key: ", key, " old value: ",
core.json.delay_encode(res_old, true))

local node_value = res_old.body.node.value
local modified_index = res_old.body.node.modifiedIndex

if sub_path and sub_path ~= "" then
local code, err, node_val = core.table.patch(node_value, sub_path, conf)
node_value = node_val
if code then
return code, err
end
utils.inject_timestamp(node_value, nil, true)
else
node_value = core.table.merge(node_value, conf)
utils.inject_timestamp(node_value, nil, conf)
end

core.log.info("new conf: ", core.json.delay_encode(node_value, true))

local ok, err = check_conf(id, node_value, true)
if not ok then
return 400, err
end

local res, err = core.etcd.atomic_set(key, node_value, nil, modified_index)
if not res then
core.log.error("failed to set new consumer group[", key, "]: ", err)
return 503, {error_msg = err}
end

return res.status, res.body
end


return _M
16 changes: 16 additions & 0 deletions apisix/admin/consumers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ local function check_conf(username, conf)
end
end

if conf.group_id then
local key = "/consumer_groups/" .. conf.group_id
local res, err = core.etcd.get(key)
if not res then
return nil, {error_msg = "failed to fetch consumer group info by "
.. "consumer group id [" .. conf.group_id .. "]: "
.. err}
end

if res.status ~= 200 then
return nil, {error_msg = "failed to fetch consumer group info by "
.. "consumer group id [" .. conf.group_id .. "], "
.. "response code: " .. res.status}
end
end

return conf.username
end

Expand Down
1 change: 1 addition & 0 deletions apisix/admin/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ local resources = {
stream_routes = require("apisix.admin.stream_routes"),
plugin_metadata = require("apisix.admin.plugin_metadata"),
plugin_configs = require("apisix.admin.plugin_config"),
consumer_groups = require("apisix.admin.consumer_group"),
}


Expand Down
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ return {
["/global_rules"] = true,
["/protos"] = true,
["/plugin_configs"] = true,
["/consumer_groups"] = true,
},
STREAM_ETCD_DIRECTORY = {
["/upstreams"] = true,
Expand Down
1 change: 1 addition & 0 deletions apisix/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ end
function _M.attach_consumer(ctx, consumer, conf)
ctx.consumer = consumer
ctx.consumer_name = consumer.consumer_name
ctx.consumer_group_id = consumer.group_id
ctx.consumer_ver = conf.conf_version
end

Expand Down
47 changes: 47 additions & 0 deletions apisix/consumer_group.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local plugin_checker = require("apisix.plugin").plugin_checker
local error = error


local consumer_groups


local _M = {
}


function _M.init_worker()
local err
consumer_groups, err = core.config.new("/consumer_groups", {
automatic = true,
item_schema = core.schema.consumer_group,
checker = plugin_checker,
})
if not consumer_groups then
error("failed to sync /consumer_groups: " .. err)
end
end


function _M.get(id)
return consumer_groups:get(id)
end


return _M
1 change: 1 addition & 0 deletions apisix/core/ctx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ do
local apisix_var_names = {
balancer_ip = true,
balancer_port = true,
consumer_group_id = true,
spacewander marked this conversation as resolved.
Show resolved Hide resolved
spacewander marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer_name = true,
route_id = true,
route_name = true,
Expand Down
14 changes: 14 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ local core = require("apisix.core")
local conf_server = require("apisix.conf_server")
local plugin = require("apisix.plugin")
local plugin_config = require("apisix.plugin_config")
local consumer_group = require("apisix.consumer_group")
local script = require("apisix.script")
local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
Expand Down Expand Up @@ -143,6 +144,7 @@ function _M.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()
consumer_group.init_worker()

apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
Expand Down Expand Up @@ -442,9 +444,21 @@ function _M.http_access_phase()
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
local changed
local group_conf

if api_ctx.consumer.group_id then
group_conf = consumer_group.get(api_ctx.consumer.group_id)
if not group_conf then
core.log.error("failed to fetch consumer group config by ",
"id: ", api_ctx.consumer.group_id)
return core.response.exit(503)
end
end

route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
group_conf,
api_ctx
)

Expand Down
Loading