Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xds): using data written by xds to control dp behavior #6759

Merged
merged 16 commits into from
Apr 7, 2022
3 changes: 2 additions & 1 deletion apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ http {
{% end %}

{% if config_center == "xds" then %}
lua_shared_dict xds-route-config 10m;
lua_shared_dict xds-config 10m;
lua_shared_dict xds-config-version 1m;
{% end %}

# for custom shared dict
Expand Down
280 changes: 268 additions & 12 deletions apisix/core/config_xds.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,72 @@
-- limitations under the License.
--

--- Get configuration form ngx.shared.DICT.
--- Get configuration form ngx.shared.DICT
--
-- @module core.config_xds

local base = require("resty.core.base")
local config_local = require("apisix.core.config_local")
local string = require("apisix.core.string")
local log = require("apisix.core.log")
local json = require("apisix.core.json")
local ngx_sleep = require("apisix.core.utils").sleep
local check_schema = require("apisix.core.schema").check
local new_tab = require("table.new")
local table = table
local insert_tab = table.insert
local error = error
local is_http = ngx.config.subsystem == "http"
local pcall = pcall
local tostring = tostring
local setmetatable = setmetatable
local io = io
local io_open = io.open
local io_close = io.close
local package = package
local new_tab = base.new_tab
local ipairs = ipairs
local type = type
local sub_str = string.sub
local ffi = require ("ffi")
local C = ffi.C
local route_config = ngx.shared["xds-route-config"]
local config = ngx.shared["xds-config"]
local conf_ver = ngx.shared["xds-config-version"]
local is_http = ngx.config.subsystem == "http"
local ngx_re_match = ngx.re.match
local ngx_re_gmatch = ngx.re.gmatch
local ngx_timer_every = ngx.timer.every
local ngx_timer_at = ngx.timer.at
local exiting = ngx.worker.exiting
local ngx_time = ngx.time

local xds_lib_name = "libxds.so"

local xds_lib_name = "libxds.so"

local process
if is_http then
process = require("ngx.process")
end


ffi.cdef[[
extern void initial(void* route_zone_ptr);
typedef unsigned int useconds_t;

extern void initial(void* config_zone, void* version_zone);
int usleep(useconds_t usec);
]]

local created_obj = {}

local _M = {
version = 0.1,
local_conf = config_local.local_conf,
}


local mt = {
__index = _M,
__tostring = function(self)
return " xds key: " .. self.key
end
}


-- todo: refactor this function in chash.lua and radixtree.lua
local function load_shared_lib(lib_name)
local cpath = package.cpath
Expand Down Expand Up @@ -101,18 +127,248 @@ local function load_libxds(lib_name)
table.concat(tried_paths, '\r\n', 1, #tried_paths))
end

local route_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(route_config[1])
local route_shd_cdata = ffi.cast("void*", route_zone)
xdsagent.initial(route_shd_cdata)
local config_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(config[1])
local config_shd_cdata = ffi.cast("void*", config_zone)

local conf_ver_zone = C.ngx_http_lua_ffi_shdict_udata_to_zone(conf_ver[1])
local conf_ver_shd_cdata = ffi.cast("void*", conf_ver_zone)

xdsagent.initial(config_shd_cdata, conf_ver_shd_cdata)
end


local latest_version
local function sync_data(self)
if self.conf_version == latest_version then
return true
end

if self.values then
for _, val in ipairs(self.values) do
if val and val.clean_handlers then
for _, clean_handler in ipairs(val.clean_handlers) do
clean_handler(val)
end
val.clean_handlers = nil
end
end
self.values = nil
self.values_hash = nil
end

local keys = config:get_keys(0)

if not keys or #keys <= 0 then
-- xds did not write any data to shdict
return false, "no keys"
end

self.values = new_tab(#keys, 0)
self.values_hash = new_tab(0, #keys)

for _, key in ipairs(keys) do
if string.has_prefix(key, self.key) then
local data_valid = true
local conf_str = config:get(key, 0)
local conf, err = json.decode(conf_str)
if not conf then
data_valid = false
log.error("decode the conf of [", key, "] failed, err: ", err,
", conf_str: ", conf_str)
end

if not self.single_item and type(conf) ~= "table" then
data_valid = false
log.error("invalid conf of [", key, "], conf: ", conf,
", it should be an object")
end

if data_valid and self.item_schema then
local ok, err = check_schema(self.item_schema, conf)
if not ok then
data_valid = false
log.error("failed to check the conf of [", key, "] err:", err)
end
end

if data_valid and self.checker then
local ok, err = self.checker(conf)
if not ok then
data_valid = false
log.error("failed to check the conf of [", key, "] err:", err)
end
end

if data_valid then
if not conf.id then
conf.id = sub_str(key, #self.key + 2, #key + 1)
log.warn("the id of [", key, "] is nil, use the id: ", conf.id)
end

local conf_item = {value = conf, modifiedIndex = latest_version,
key = key}
insert_tab(self.values, conf_item)
self.values_hash[conf.id] = #self.values
conf_item.clean_handlers = {}

if self.filter then
self.filter(conf_item)
end
end
end
end

self.conf_version = latest_version
return true
end


local function _automatic_fetch(premature, self)
if premature then
return
end

local i = 0
while not exiting() and self.running and i <= 32 do
i = i + 1
local ok, ok2, err = pcall(sync_data, self)
if not ok then
err = ok2
log.error("failed to fetch data from xds: ",
err, ", ", tostring(self))
ngx_sleep(3)
break
elseif not ok2 and err then
-- todo: handler other error
if err ~= "wait for more time" and err ~= "no keys" and self.last_err ~= err then
log.error("failed to fetch data from xds, ", err, ", ", tostring(self))
end

if err ~= self.last_err then
self.last_err = err
self.last_err_time = ngx_time()
else
if ngx_time() - self.last_err_time >= 30 then
self.last_err = nil
end
end
ngx_sleep(0.5)
elseif not ok2 then
ngx_sleep(0.05)
else
ngx_sleep(0.1)
end
end

if not exiting() and self.running then
ngx_timer_at(0, _automatic_fetch, self)
end
end


local function fetch_version(premature)
if premature then
return
end

local version = conf_ver:get("version")

if not version then
return
end

if version ~= latest_version then
latest_version = version
end
end


function _M.new(key, opts)
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local single_item = opts and opts.single_item
local checker = opts and opts.checker


local obj = setmetatable({
automatic = automatic,
item_schema = item_schema,
checker = checker,
sync_times = 0,
running = true,
conf_version = 0,
values = nil,
routes_hash = nil,
prev_index = nil,
last_err = nil,
last_err_time = nil,
key = key,
single_item = single_item,
filter = filter_fun,
}, mt)

if automatic then
if not key then
return nil, "missing `key` argument"
end

-- blocking until xds completes initial configuration
while true do
C.usleep(0.1)
fetch_version()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a sleep here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ngx.sleep can not used in init_worker_by_lua, is there another way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use another sleep?

int usleep(useconds_t usec);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

if latest_version then
break
end
end

local ok, ok2, err = pcall(sync_data, obj)
if not ok then
err = ok2
end

if err then
log.error("failed to fetch data from xds ",
err, ", ", key)
end

ngx_timer_at(0, _automatic_fetch, obj)
end

if key then
created_obj[key] = obj
end

return obj
end


function _M.get(self, key)
if not self.values_hash then
return
end

local arr_idx = self.values_hash[tostring(key)]
if not arr_idx then
return nil
end

return self.values[arr_idx]
end


function _M.fetch_created_obj(key)
return created_obj[key]
end


function _M.init_worker()
if process.type() == "privileged agent" then
load_libxds(xds_lib_name)
end

ngx_timer_every(1, fetch_version)

return true
end

Expand Down
12 changes: 6 additions & 6 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ function _M.http_init_worker()

require("apisix.debug").init_worker()

plugin.init_worker()
router.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()

if core.config.init_worker then
local ok, err = core.config.init_worker()
if not ok then
Expand All @@ -130,6 +124,12 @@ function _M.http_init_worker()
end
end

plugin.init_worker()
router.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()

apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()

Expand Down
3 changes: 2 additions & 1 deletion t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ _EOC_
lua_shared_dict ext-plugin 1m;
lua_shared_dict kubernetes 1m;
lua_shared_dict tars 1m;
lua_shared_dict xds-route-config 1m;
lua_shared_dict xds-config 1m;
lua_shared_dict xds-config-version 1m;

proxy_ssl_name \$upstream_host;
proxy_ssl_server_name on;
Expand Down
Loading