Skip to content

Commit

Permalink
feat: load etcd configuration when apisix starts
Browse files Browse the repository at this point in the history
Fix #3370
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Mar 10, 2021
1 parent 8609cd5 commit a625b48
Show file tree
Hide file tree
Showing 12 changed files with 541 additions and 18 deletions.
14 changes: 10 additions & 4 deletions apisix/cli/etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

local base64_encode = require("base64").encode
local dkjson = require("dkjson")
local constants = require("apisix.constants")
local util = require("apisix.cli.util")
local file = require("apisix.cli.file")
local http = require("socket.http")
Expand All @@ -25,6 +26,7 @@ local ltn12 = require("ltn12")

local type = type
local ipairs = ipairs
local pairs = pairs
local print = print
local tonumber = tonumber
local str_format = string.format
Expand Down Expand Up @@ -248,11 +250,15 @@ function _M.init(env, args)
end


for _, dir_name in ipairs({"/routes", "/upstreams", "/services",
"/plugins", "/consumers", "/ssl",
"/global_rules", "/stream_routes", "/proto",
"/plugin_metadata", "/plugin_configs"}) do
local dirs = {}
for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
dirs[name] = true
end
for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
dirs[name] = true
end

for dir_name in pairs(dirs) do
local key = (etcd_conf.prefix or "") .. dir_name .. "/"

local put_url = host .. "/v3/kv/put"
Expand Down
38 changes: 38 additions & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
return {
HTTP_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/plugins"] = true,
["/ssl"] = true,
["/stream_routes"] = true,
["/plugin_metadata"] = true,
["/routes"] = true,
["/services"] = true,
["/consumers"] = true,
["/global_rules"] = true,
["/proto"] = true,
["/plugin_configs"] = true,
},
STREAM_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/plugins"] = true,
["/ssl"] = true,
["/stream_routes"] = true,
["/plugin_metadata"] = true,
},
}
192 changes: 190 additions & 2 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local config_local = require("apisix.core.config_local")
local log = require("apisix.core.log")
local json = require("apisix.core.json")
local etcd_apisix = require("apisix.core.etcd")
local core_str = require("apisix.core.string")
local etcd = require("resty.etcd")
local new_tab = require("table.new")
local clone_tab = require("table.clone")
Expand All @@ -39,7 +40,12 @@ local xpcall = xpcall
local debug = debug
local error = error
local rand = math.random
local constants = require("apisix.constants")


local is_http = ngx.config.subsystem == "http"
local created_obj = {}
local loaded_configuration = {}


local _M = {
Expand Down Expand Up @@ -81,7 +87,7 @@ local function getkey(etcd_cli, key)
end


local function readdir(etcd_cli, key)
local function readdir(etcd_cli, key, formatter)
if not etcd_cli then
return nil, "not inited"
end
Expand All @@ -96,7 +102,7 @@ local function readdir(etcd_cli, key)
return nil, "failed to read etcd dir"
end

res, err = etcd_apisix.get_format(res, key .. '/', true)
res, err = etcd_apisix.get_format(res, key .. '/', true, formatter)
if not res then
return nil, err
end
Expand Down Expand Up @@ -608,6 +614,113 @@ function _M.new(key, opts)
return nil, "missing `key` argument"
end

local self = obj
if loaded_configuration[key] then
local res = loaded_configuration[key]
loaded_configuration[key] = nil -- tried to load

local dir_res, headers = res.body, res.headers
local changed = false

if self.single_item then
self.values = new_tab(1, 0)
self.values_hash = new_tab(0, 1)

local item = dir_res
local data_valid = item.value ~= nil

if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.encode(item.value))
end
end

if data_valid and self.checker then
data_valid, err = self.checker(item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item.value))
end
end

if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[self.key] = #self.values

item.clean_handlers = {}

if self.filter then
self.filter(item)
end
end

self:upgrade_version(item.modifiedIndex)

else
if not dir_res.nodes then
dir_res.nodes = {}
end

self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)

for _, item in ipairs(dir_res.nodes) do
local key = short_key(self, item.key)
local data_valid = true
if type(item.value) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. key,
"], val: ", item.value,
", it should be an object")
end

if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.encode(item.value))
end
end

if data_valid and self.checker then
data_valid, err = self.checker(item.value)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item.value))
end
end

if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[key] = #self.values

item.value.id = key
item.clean_handlers = {}

if self.filter then
self.filter(item)
end
end

self:upgrade_version(item.modifiedIndex)
end
end

if headers then
self:upgrade_version(headers["X-Etcd-Index"])
end

if changed then
self.conf_version = self.conf_version + 1
end

self.need_reload = false
end

ngx_timer_at(0, _automatic_fetch, obj)

else
Expand Down Expand Up @@ -665,4 +778,79 @@ function _M.server_version(self)
end


local function create_formatter(prefix)
return function (res)
res.body.nodes = {}

local dirs
if is_http then
dirs = constants.HTTP_ETCD_DIRECTORY
else
dirs = constants.STREAM_ETCD_DIRECTORY
end

local curr_dir_data
local curr_key
for _, item in ipairs(res.body.kvs) do
if curr_dir_data then
if core_str.has_prefix(item.key, curr_key) then
table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item))
goto CONTINUE
end

curr_dir_data = nil
end

local key = sub_str(item.key, #prefix + 1)
if dirs[key] then
-- single item
loaded_configuration[key] = {
body = etcd_apisix.kvs_to_node(item),
headers = res.headers,
}
else
local key = sub_str(item.key, #prefix + 1, #item.key - 1)
-- ensure the same key hasn't been handled as single item
if dirs[key] and not loaded_configuration[key] then
loaded_configuration[key] = {
body = {
nodes = {},
},
headers = res.headers,
}
curr_dir_data = loaded_configuration[key].body.nodes
curr_key = item.key
end
end

::CONTINUE::
end

return res
end
end


function _M.init()
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end

local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end

local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
if not res then
return nil, err
end

return true
end


return _M
1 change: 1 addition & 0 deletions apisix/core/config_yaml.lua
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ end

function _M.init()
read_apisix_yaml()
return true
end


Expand Down
8 changes: 7 additions & 1 deletion apisix/core/etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ end
_M.new = new


-- convert ETCD v3 entry to v2 one
local function kvs_to_node(kvs)
local node = {}
node.key = kvs.key
Expand All @@ -63,6 +64,7 @@ local function kvs_to_node(kvs)
node.modifiedIndex = tonumber(kvs.mod_revision)
return node
end
_M.kvs_to_node = kvs_to_node

local function kvs_to_nodes(res)
res.body.node.dir = true
Expand All @@ -84,7 +86,7 @@ end

-- When `is_dir` is true, returns the value of both the dir key and its descendants.
-- Otherwise, return the value of key only.
function _M.get_format(res, real_key, is_dir)
function _M.get_format(res, real_key, is_dir, formatter)
if res.body.error == "etcdserver: user name is empty" then
return nil, "insufficient credentials code: 401"
end
Expand All @@ -97,6 +99,10 @@ function _M.get_format(res, real_key, is_dir)

res.body.action = "get"

if formatter then
return formatter(res)
end

if not is_dir then
local key = res.body.kvs[1].key
if key ~= real_key then
Expand Down
15 changes: 11 additions & 4 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- limitations under the License.
--
local require = require
require("apisix.patch").patch()
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local plugin_config = require("apisix.plugin_config")
Expand Down Expand Up @@ -80,8 +81,11 @@ function _M.http_init(args)
core.log.error("failed to enable privileged_agent: ", err)
end

if core.config == require("apisix.core.config_yaml") then
core.config.init()
if core.config.init then
local ok, err = core.config.init()
if not ok then
core.log.error("failed to load the configuration: ", err)
end
end
end

Expand Down Expand Up @@ -699,8 +703,11 @@ end
function _M.stream_init()
core.log.info("enter stream_init")

if core.config == require("apisix.core.config_yaml") then
core.config.init()
if core.config.init then
local ok, err = core.config.init()
if not ok then
core.log.error("failed to load the configuration: ", err)
end
end
end

Expand Down
Loading

0 comments on commit a625b48

Please sign in to comment.