Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config_etcd): use a single long http connection to watch all resources #9456

Merged
merged 12 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 267 additions & 45 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ local json = require("apisix.core.json")
local etcd_apisix = require("apisix.core.etcd")
local core_str = require("apisix.core.string")
local new_tab = require("table.new")
local inspect = require("inspect")
local errlog = require("ngx.errlog")
local log_level = errlog.get_sys_filter_level()
local NGX_INFO = ngx.INFO
local check_schema = require("apisix.core.schema").check
local exiting = ngx.worker.exiting
local insert_tab = table.insert
Expand All @@ -43,9 +47,14 @@ local xpcall = xpcall
local debug = debug
local string = string
local error = error
local pairs = pairs
local next = next
local assert = assert
local rand = math.random
local constants = require("apisix.constants")
local health_check = require("resty.etcd.health_check")
local semaphore = require("ngx.semaphore")
local tablex = require("pl.tablex")


local is_http = ngx.config.subsystem == "http"
Expand All @@ -58,6 +67,7 @@ if not is_http then
end
local created_obj = {}
local loaded_configuration = {}
local watch_ctx


local _M = {
Expand All @@ -75,6 +85,208 @@ local mt = {
}


local get_etcd
do
local etcd_cli

function get_etcd()
if etcd_cli ~= nil then
return etcd_cli
end

local _, err
etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
return etcd_cli, err
end
end


local function cancel_watch(http_cli)
local res, err = watch_ctx.cli:watchcancel(http_cli)
if res == 1 then
log.info("cancel watch connection success")
else
log.error("cancel watch failed: ", err)
end
end


-- append res to the queue and notify pending watchers
local function produce_res(res, err)
if log_level >= NGX_INFO then
log.info("append res: ", inspect(res), ", err: ", inspect(err))
end
Comment on lines +116 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use json.delay_encode here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the delay stuff has bug:

apisix/apisix/core/json.lua

Lines 116 to 120 in a943c03

function _M.delay_encode(data, force)
delay_tab.data = data
delay_tab.force = force
return delay_tab
end

It only uses a singleton table to log, but here I need two vars to log.
And inspect is more informational than json for debugging.

insert_tab(watch_ctx.res, {res=res, err=err})
for _, sema in pairs(watch_ctx.sema) do
sema:post()
end
table.clear(watch_ctx.sema)
end


local function run_watch(premature)
if premature then
return
end

local local_conf, err = config_local.local_conf()
if not local_conf then
error("no local conf: " .. err)
end
watch_ctx.prefix = local_conf.etcd.prefix .. "/"

watch_ctx.cli, err = get_etcd()
if not watch_ctx.cli then
error("failed to create etcd instance: " .. string(err))
end

local rev = 0
if loaded_configuration then
local _, res = next(loaded_configuration)
if res then
rev = tonumber(res.headers["X-Etcd-Index"])
assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
end
end

if rev == 0 then
while true do
local res, err = watch_ctx.cli:get(watch_ctx.prefix)
if not res then
log.error("etcd get: ", err)
ngx_sleep(3)
else
watch_ctx.rev = tonumber(res.body.header.revision)
break
end
end
end

watch_ctx.rev = rev + 1
watch_ctx.started = true

log.warn("main etcd watcher started, revision=", watch_ctx.rev)
for _, sema in pairs(watch_ctx.wait_init) do
sema:post()
end
watch_ctx.wait_init = nil

local opts = {}
opts.timeout = 50 -- second
Copy link
Contributor Author

@kingluo kingluo May 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice of 50 seconds is to make it smaller than the default proxy_read_timeout value, 60 seconds, so that nginx will not print error logs, such as:

2023/05/10 23:43:44 [error] 3668019#3668019: *809 upstream timed out (110: Connection timed out) while reading upstream, client: unix:, server: , request: "POST /v3/watch HTTP/1.1", upstream: "http://127.0.0.1:2379/v3/watch", host: "127.0.0.1"

opts.need_cancel = true

::restart_watch::
while true do
opts.start_revision = watch_ctx.rev
log.info("restart watchdir: start_revision=", opts.start_revision)
local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
if not res_func then
log.error("watchdir: ", err)
ngx_sleep(3)
goto restart_watch
end

::watch_event::
while true do
local res, err = res_func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very good design here, the server response is a stream, if we don't close the connection here, we could get the response event one by one

if log_level >= NGX_INFO then
log.info("res_func: ", inspect(res))
end

if not res then
if err ~= "closed" and
err ~= "timeout" and
err ~= "broken pipe"
then
log.error("wait watch event: ", err)
end
cancel_watch(http_cli)
break
end

if res.error then
log.error("wait watch event: ", inspect(res.error))
cancel_watch(http_cli)
break
end

if res.result.created then
goto watch_event
end

if res.result.canceled then
log.warn("watch canceled by etcd, res: ", inspect(res))
if res.result.compact_revision then
watch_ctx.rev = tonumber(res.result.compact_revision)
log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
produce_res(nil, "compacted")
end
cancel_watch(http_cli)
break
end

-- cleanup
local min_idx = 0
for _, idx in pairs(watch_ctx.idx) do
if (min_idx == 0) or (idx < min_idx) then
min_idx = idx
end
end

for i = 1, min_idx - 1 do
watch_ctx.res[i] = false
end

if min_idx > 100 then
for k, idx in pairs(watch_ctx.idx) do
watch_ctx.idx[k] = idx - min_idx + 1
end
-- trim the res table
for i = 1, min_idx - 1 do
table.remove(watch_ctx.res, 1)
end
end

local rev = tonumber(res.result.header.revision)
if rev > watch_ctx.rev then
watch_ctx.rev = rev + 1
end
produce_res(res)
end
end
end


local function init_watch_ctx(key)
if not watch_ctx then
watch_ctx = {
idx = {},
res = {},
sema = {},
wait_init = {},
started = false,
}
ngx_timer_at(0, run_watch)
end

if watch_ctx.started == false then
-- wait until the main watcher is started
local sema, err = semaphore.new()
if not sema then
error(err)
end
watch_ctx.wait_init[key] = sema
while true do
local ok, err = sema:wait(60)
if ok then
break
end
log.error("wait main watcher to start, key: ", key, ", err: ", err)
end
end
end


local function getkey(etcd_cli, key)
if not etcd_cli then
return nil, "not inited"
Expand Down Expand Up @@ -157,45 +369,67 @@ local function flush_watching_streams(self)
end


local function http_waitdir(etcd_cli, key, modified_index, timeout)
local opts = {}
opts.start_revision = modified_index
opts.timeout = timeout
opts.need_cancel = true
local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
if not res_func then
return nil, func_err
local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
if not watch_ctx.idx[key] then
watch_ctx.idx[key] = 1
monkeyDluffy6017 marked this conversation as resolved.
Show resolved Hide resolved
end

-- in etcd v3, the 1st res of watch is watch info, useless to us.
-- try twice to skip create info
local res, err = res_func()
if not res or not res.result or not res.result.events then
res, err = res_func()
end
::iterate_events::
for i = watch_ctx.idx[key], #watch_ctx.res do
watch_ctx.idx[key] = i + 1

if http_cli then
local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
if res_cancel == 1 then
log.info("cancel watch connection success")
else
log.error("cancel watch failed: ", err_cancel)
local item = watch_ctx.res[i]
if item == false then
goto iterate_events
end

local res, err = item.res, item.err
if err then
return res, err
end

-- ignore res with revision smaller then self.prev_index
if tonumber(res.result.header.revision) > self.prev_index then
local res2
for _, evt in ipairs(res.result.events) do
if evt.kv.key:find(key) == 1 then
if not res2 then
res2 = tablex.deepcopy(res)
table.clear(res2.result.events)
end
insert_tab(res2.result.events, evt)
end
end

if res2 then
if log_level >= NGX_INFO then
log.info("http_waitdir: ", inspect(res2))
end
return res2
end
end
end

if not res then
return nil, err
-- if no events, wait via semaphore
if not self.watch_sema then
local sema, err = semaphore.new()
if not sema then
error(err)
end
self.watch_sema = sema
end

if type(res.result) ~= "table" then
err = "failed to wait etcd dir"
if res.error and res.error.message then
err = err .. ": " .. res.error.message
watch_ctx.sema[key] = self.watch_sema
local ok, err = self.watch_sema:wait(timeout or 60)
watch_ctx.sema[key] = nil
if ok then
goto iterate_events
else
if err ~= "timeout" then
log.error("wait watch event, key=", key, ", err: ", err)
end
return nil, err
end

return res, err
end


Expand All @@ -213,7 +447,7 @@ local function waitdir(self)
if etcd_cli.use_grpc then
res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout)
else
res, err = http_waitdir(etcd_cli, key, modified_index, timeout)
res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout)
end

if not res then
Expand Down Expand Up @@ -359,6 +593,10 @@ local function sync_data(self)
return nil, "missing 'key' arguments"
end

if not self.etcd_cli.use_grpc then
init_watch_ctx(self.key)
end

if self.need_reload then
flush_watching_streams(self)

Expand Down Expand Up @@ -555,22 +793,6 @@ function _M.getkey(self, key)
end


local get_etcd
do
local etcd_cli

function get_etcd()
if etcd_cli ~= nil then
return etcd_cli
end

local _, err
etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
return etcd_cli, err
end
end


local function _automatic_fetch(premature, self)
if premature then
return
Expand Down
2 changes: 1 addition & 1 deletion ci/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@


export OPENRESTY_VERSION=source
export TEST_CI_USE_GRPC=true
#export TEST_CI_USE_GRPC=true
Copy link
Contributor Author

@kingluo kingluo May 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to start focusing on etcd's http tests.

. ./ci/linux_openresty_common_runner.sh
Loading