Skip to content

Commit

Permalink
feat(config_etcd): use a single long http connection to watch all res…
Browse files Browse the repository at this point in the history
…ources (#9456)
  • Loading branch information
kingluo authored May 23, 2023
1 parent 7bbdf0c commit 4377d05
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 111 deletions.
312 changes: 267 additions & 45 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ local json = require("apisix.core.json")
local etcd_apisix = require("apisix.core.etcd")
local core_str = require("apisix.core.string")
local new_tab = require("table.new")
local inspect = require("inspect")
local errlog = require("ngx.errlog")
local log_level = errlog.get_sys_filter_level()
local NGX_INFO = ngx.INFO
local check_schema = require("apisix.core.schema").check
local exiting = ngx.worker.exiting
local insert_tab = table.insert
Expand All @@ -43,9 +47,14 @@ local xpcall = xpcall
local debug = debug
local string = string
local error = error
local pairs = pairs
local next = next
local assert = assert
local rand = math.random
local constants = require("apisix.constants")
local health_check = require("resty.etcd.health_check")
local semaphore = require("ngx.semaphore")
local tablex = require("pl.tablex")


local is_http = ngx.config.subsystem == "http"
Expand All @@ -58,6 +67,7 @@ if not is_http then
end
local created_obj = {}
local loaded_configuration = {}
local watch_ctx


local _M = {
Expand All @@ -75,6 +85,208 @@ local mt = {
}


local get_etcd
do
local etcd_cli

function get_etcd()
if etcd_cli ~= nil then
return etcd_cli
end

local _, err
etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
return etcd_cli, err
end
end


local function cancel_watch(http_cli)
local res, err = watch_ctx.cli:watchcancel(http_cli)
if res == 1 then
log.info("cancel watch connection success")
else
log.error("cancel watch failed: ", err)
end
end


-- append res to the queue and notify pending watchers
local function produce_res(res, err)
if log_level >= NGX_INFO then
log.info("append res: ", inspect(res), ", err: ", inspect(err))
end
insert_tab(watch_ctx.res, {res=res, err=err})
for _, sema in pairs(watch_ctx.sema) do
sema:post()
end
table.clear(watch_ctx.sema)
end


local function run_watch(premature)
if premature then
return
end

local local_conf, err = config_local.local_conf()
if not local_conf then
error("no local conf: " .. err)
end
watch_ctx.prefix = local_conf.etcd.prefix .. "/"

watch_ctx.cli, err = get_etcd()
if not watch_ctx.cli then
error("failed to create etcd instance: " .. string(err))
end

local rev = 0
if loaded_configuration then
local _, res = next(loaded_configuration)
if res then
rev = tonumber(res.headers["X-Etcd-Index"])
assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
end
end

if rev == 0 then
while true do
local res, err = watch_ctx.cli:get(watch_ctx.prefix)
if not res then
log.error("etcd get: ", err)
ngx_sleep(3)
else
watch_ctx.rev = tonumber(res.body.header.revision)
break
end
end
end

watch_ctx.rev = rev + 1
watch_ctx.started = true

log.warn("main etcd watcher started, revision=", watch_ctx.rev)
for _, sema in pairs(watch_ctx.wait_init) do
sema:post()
end
watch_ctx.wait_init = nil

local opts = {}
opts.timeout = 50 -- second
opts.need_cancel = true

::restart_watch::
while true do
opts.start_revision = watch_ctx.rev
log.info("restart watchdir: start_revision=", opts.start_revision)
local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
if not res_func then
log.error("watchdir: ", err)
ngx_sleep(3)
goto restart_watch
end

::watch_event::
while true do
local res, err = res_func()
if log_level >= NGX_INFO then
log.info("res_func: ", inspect(res))
end

if not res then
if err ~= "closed" and
err ~= "timeout" and
err ~= "broken pipe"
then
log.error("wait watch event: ", err)
end
cancel_watch(http_cli)
break
end

if res.error then
log.error("wait watch event: ", inspect(res.error))
cancel_watch(http_cli)
break
end

if res.result.created then
goto watch_event
end

if res.result.canceled then
log.warn("watch canceled by etcd, res: ", inspect(res))
if res.result.compact_revision then
watch_ctx.rev = tonumber(res.result.compact_revision)
log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
produce_res(nil, "compacted")
end
cancel_watch(http_cli)
break
end

-- cleanup
local min_idx = 0
for _, idx in pairs(watch_ctx.idx) do
if (min_idx == 0) or (idx < min_idx) then
min_idx = idx
end
end

for i = 1, min_idx - 1 do
watch_ctx.res[i] = false
end

if min_idx > 100 then
for k, idx in pairs(watch_ctx.idx) do
watch_ctx.idx[k] = idx - min_idx + 1
end
-- trim the res table
for i = 1, min_idx - 1 do
table.remove(watch_ctx.res, 1)
end
end

local rev = tonumber(res.result.header.revision)
if rev > watch_ctx.rev then
watch_ctx.rev = rev + 1
end
produce_res(res)
end
end
end


local function init_watch_ctx(key)
if not watch_ctx then
watch_ctx = {
idx = {},
res = {},
sema = {},
wait_init = {},
started = false,
}
ngx_timer_at(0, run_watch)
end

if watch_ctx.started == false then
-- wait until the main watcher is started
local sema, err = semaphore.new()
if not sema then
error(err)
end
watch_ctx.wait_init[key] = sema
while true do
local ok, err = sema:wait(60)
if ok then
break
end
log.error("wait main watcher to start, key: ", key, ", err: ", err)
end
end
end


local function getkey(etcd_cli, key)
if not etcd_cli then
return nil, "not inited"
Expand Down Expand Up @@ -157,45 +369,67 @@ local function flush_watching_streams(self)
end


local function http_waitdir(etcd_cli, key, modified_index, timeout)
local opts = {}
opts.start_revision = modified_index
opts.timeout = timeout
opts.need_cancel = true
local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
if not res_func then
return nil, func_err
local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
if not watch_ctx.idx[key] then
watch_ctx.idx[key] = 1
end

-- in etcd v3, the 1st res of watch is watch info, useless to us.
-- try twice to skip create info
local res, err = res_func()
if not res or not res.result or not res.result.events then
res, err = res_func()
end
::iterate_events::
for i = watch_ctx.idx[key], #watch_ctx.res do
watch_ctx.idx[key] = i + 1

if http_cli then
local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
if res_cancel == 1 then
log.info("cancel watch connection success")
else
log.error("cancel watch failed: ", err_cancel)
local item = watch_ctx.res[i]
if item == false then
goto iterate_events
end

local res, err = item.res, item.err
if err then
return res, err
end

-- ignore res with revision smaller then self.prev_index
if tonumber(res.result.header.revision) > self.prev_index then
local res2
for _, evt in ipairs(res.result.events) do
if evt.kv.key:find(key) == 1 then
if not res2 then
res2 = tablex.deepcopy(res)
table.clear(res2.result.events)
end
insert_tab(res2.result.events, evt)
end
end

if res2 then
if log_level >= NGX_INFO then
log.info("http_waitdir: ", inspect(res2))
end
return res2
end
end
end

if not res then
return nil, err
-- if no events, wait via semaphore
if not self.watch_sema then
local sema, err = semaphore.new()
if not sema then
error(err)
end
self.watch_sema = sema
end

if type(res.result) ~= "table" then
err = "failed to wait etcd dir"
if res.error and res.error.message then
err = err .. ": " .. res.error.message
watch_ctx.sema[key] = self.watch_sema
local ok, err = self.watch_sema:wait(timeout or 60)
watch_ctx.sema[key] = nil
if ok then
goto iterate_events
else
if err ~= "timeout" then
log.error("wait watch event, key=", key, ", err: ", err)
end
return nil, err
end

return res, err
end


Expand All @@ -213,7 +447,7 @@ local function waitdir(self)
if etcd_cli.use_grpc then
res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout)
else
res, err = http_waitdir(etcd_cli, key, modified_index, timeout)
res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout)
end

if not res then
Expand Down Expand Up @@ -359,6 +593,10 @@ local function sync_data(self)
return nil, "missing 'key' arguments"
end

if not self.etcd_cli.use_grpc then
init_watch_ctx(self.key)
end

if self.need_reload then
flush_watching_streams(self)

Expand Down Expand Up @@ -555,22 +793,6 @@ function _M.getkey(self, key)
end


local get_etcd
do
local etcd_cli

function get_etcd()
if etcd_cli ~= nil then
return etcd_cli
end

local _, err
etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
return etcd_cli, err
end
end


local function _automatic_fetch(premature, self)
if premature then
return
Expand Down
2 changes: 1 addition & 1 deletion ci/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@


export OPENRESTY_VERSION=source
export TEST_CI_USE_GRPC=true
#export TEST_CI_USE_GRPC=true
. ./ci/linux_openresty_common_runner.sh
Loading

0 comments on commit 4377d05

Please sign in to comment.