From f256cfb819bccc012a12556afdff9b3781d89c9f Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 01:14:57 +0800 Subject: [PATCH 01/12] feat(config_etcd): use only one http connection to watch, with chunked streaming --- apisix/core/config_etcd.lua | 316 ++++++++++++++++++++++++++++++------ 1 file changed, 270 insertions(+), 46 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 4946cc5c22c8..e665042d0bef 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -46,6 +46,8 @@ local error = error local rand = math.random local constants = require("apisix.constants") local health_check = require("resty.etcd.health_check") +local semaphore = require("ngx.semaphore") +local tablex = require("pl.tablex") local is_http = ngx.config.subsystem == "http" @@ -58,6 +60,7 @@ if not is_http then end local created_obj = {} local loaded_configuration = {} +local watch_ctx local _M = { @@ -75,6 +78,223 @@ local mt = { } +local get_etcd +do + local etcd_cli + + function get_etcd() + if etcd_cli ~= nil then + return etcd_cli + end + + local _, err + etcd_cli, _, err = etcd_apisix.get_etcd_syncer() + return etcd_cli, err + end +end + + +local function cancel_watch(http_cli) + local res, err = watch_ctx.cli:watchcancel(http_cli) + if res == 1 then + log.info("cancel watch connection success") + else + log.error("cancel watch failed: ", err) + end +end + + +local function handle_compacted(err) + if err == "compacted" then + -- update current etcd revision + while true do + local res, err2 = watch_ctx.cli:get(watch_ctx.prefix) + if not res then + log.error(err2) + ngx.sleep(3) + else + watch_ctx.rev = res.body.header.revision + break + end + end + end +end + + +local function produce_res(res, err) + -- append res and notify pending watchers + --log.warn("append res, res: ", require("inspect")(res), ", err: ", require("inspect")(err)) + table.insert(watch_ctx.res, {res=res, err=err}) + for _, sema in pairs(watch_ctx.sema) do + sema:post() + end + table.clear(watch_ctx.sema) +end + + +local function run_watch(premature) + if premature then + return + end + + local local_conf, err = config_local.local_conf() + if not local_conf then + error("no local conf: " .. err) + end + local prefix = local_conf.etcd.prefix + + local cli, err = get_etcd() + if not cli then + error("failed to create etcd instance: " .. string(err)) + end + + local rev = 0 + if loaded_configuration then + local _, res = next(loaded_configuration) + rev = tonumber(res.headers["X-Etcd-Index"]) + assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]') + end + + if rev == 0 then + local res, err = cli:get(prefix) + if err then + error(err) + end + rev = tonumber(res.body.header.revision) + assert(rev > 0, 'invalid res.body.header.revision') + end + + log.warn("main watcher started, revision=", rev) + watch_ctx.started = true + watch_ctx.prefix = prefix + watch_ctx.cli = cli + watch_ctx.rev = rev + + for _, sema in pairs(watch_ctx.wait_init) do + sema:post() + end + watch_ctx.wait_init = nil + + local opts = {} + opts.timeout = 60 -- second + opts.need_cancel = true + + ::restart_watch:: + while true do + opts.start_revision = watch_ctx.rev + 1 + local res_func, func_err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) + if not res_func then + log.error("watchdir: ", func_err) + handle_compacted(func_err) + produce_res(nil, func_err) + ngx.sleep(3) + goto restart_watch + end + + while true do + local res, err = res_func() + --log.warn("res_func: ", require("inspect")(res)) + --if err then log.warn("res_func_err: ", require("inspect")(err)) end + + if err == "closed" then + break + end + + if not res then + cancel_watch(http_cli) + handle_compacted(err) + produce_res(nil, err) + break + end + + -- in etcd v3, the 1st res of watch is watch info, useless to us. + -- try twice to skip create info + if not res.result or not res.result.events then + res, err = res_func() + end + + if err == "closed" then + break + end + + if not res then + cancel_watch(http_cli) + handle_compacted(err) + produce_res(nil, err) + break + end + + if type(res.result) ~= "table" then + cancel_watch(http_cli) + err = "failed to wait etcd dir" + if res.error and res.error.message then + err = err .. ": " .. res.error.message + end + handle_compacted(err) + produce_res(nil, err) + break + end + + -- cleanup + local min_idx = 0 + for _, idx in pairs(watch_ctx.idx) do + if (min_idx == 0) or (idx < min_idx) then + min_idx = idx + end + end + if min_idx > 10 then + for k, idx in pairs(watch_ctx.idx) do + watch_ctx[k] = idx-min_idx+1 + end + -- trim the res table + for i = 1,min_idx-1 do + table.remove(watch_ctx.res, i) + end + end + + local rev = tonumber(res.result.header.revision) + if rev > watch_ctx.rev then + watch_ctx.rev = rev + end + produce_res(res) + end + end +end + + +local function init_watch_ctx(key) + if not watch_ctx then + watch_ctx = { + idx = {}, + res = {}, + sema = {}, + wait_init = {}, + } + end + + if watch_ctx.started == nil then + watch_ctx.started = false + ngx_timer_at(0, run_watch) + end + + if watch_ctx.started == false then + -- wait until the main watcher is started + local sema, err = semaphore.new() + if not sema then + error(err) + end + watch_ctx.wait_init[key] = sema + while true do + local ok, err = sema:wait(60) + if ok then + break + end + log.error("sema wait, key: ", key, ", err: ", err) + end + end +end + + local function getkey(etcd_cli, key) if not etcd_cli then return nil, "not inited" @@ -157,45 +377,63 @@ local function flush_watching_streams(self) end -local function http_waitdir(etcd_cli, key, modified_index, timeout) - local opts = {} - opts.start_revision = modified_index - opts.timeout = timeout - opts.need_cancel = true - local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts) - if not res_func then - return nil, func_err +local function http_waitdir(self, etcd_cli, key, modified_index, timeout) + if not watch_ctx.idx[key] then + watch_ctx.idx[key] = 1 end - -- in etcd v3, the 1st res of watch is watch info, useless to us. - -- try twice to skip create info - local res, err = res_func() - if not res or not res.result or not res.result.events then - res, err = res_func() - end + ::iterate_events:: + for i = watch_ctx.idx[key],#watch_ctx.res do + local item = watch_ctx.res[i] + local res, err = item.res, item.err + if err then + watch_ctx.idx[key] = i+1 + return res, err + end - if http_cli then - local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli) - if res_cancel == 1 then - log.info("cancel watch connection success") - else - log.error("cancel watch failed: ", err_cancel) + local found = false + -- ignore res with revision smaller then self.prev_index + if tonumber(res.result.header.revision) > self.prev_index then + for _, evt in ipairs(res.result.events) do + if evt.kv.key:find(key) == 1 then + found = true + break + end + end end - end - if not res then - return nil, err + if found then + watch_ctx.idx[key] = i+1 + + local res2 = tablex.deepcopy(res) + table.clear(res2.result.events) + for _, evt in ipairs(res.result.events) do + if evt.kv.key:find(key) == 1 then + table.insert(res2.result.events, evt) + end + end + --log.warn("res2: ", require("inspect")(res2)) + return res2 + end end - if type(res.result) ~= "table" then - err = "failed to wait etcd dir" - if res.error and res.error.message then - err = err .. ": " .. res.error.message + -- if no events, wait via semaphore + if not self.watch_sema then + local sema, err = semaphore.new() + if not sema then + error(err) end - return nil, err + self.watch_sema = sema end - return res, err + watch_ctx.sema[key] = self.watch_sema + local ok, err = self.watch_sema:wait(60) + watch_ctx.sema[key] = nil + if ok or err == "timeout" then + goto iterate_events + else + log.error(err) + end end @@ -213,7 +451,7 @@ local function waitdir(self) if etcd_cli.use_grpc then res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout) else - res, err = http_waitdir(etcd_cli, key, modified_index, timeout) + res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout) end if not res then @@ -359,6 +597,8 @@ local function sync_data(self) return nil, "missing 'key' arguments" end + init_watch_ctx(self.key) + if self.need_reload then flush_watching_streams(self) @@ -555,22 +795,6 @@ function _M.getkey(self, key) end -local get_etcd -do - local etcd_cli - - function get_etcd() - if etcd_cli ~= nil then - return etcd_cli - end - - local _, err - etcd_cli, _, err = etcd_apisix.get_etcd_syncer() - return etcd_cli, err - end -end - - local function _automatic_fetch(premature, self) if premature then return From 4c4d64ae2cf2583d636f6f003042751e7f12a38f Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 01:55:38 +0800 Subject: [PATCH 02/12] fix PR --- apisix/core/config_etcd.lua | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index e665042d0bef..3d70680954a3 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -597,7 +597,9 @@ local function sync_data(self) return nil, "missing 'key' arguments" end - init_watch_ctx(self.key) + if not local_conf.etcd.use_grpc then + init_watch_ctx(self.key) + end if self.need_reload then flush_watching_streams(self) From 23a72878c94aa28ce79b055aaee290bfa6d86bed Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 02:04:15 +0800 Subject: [PATCH 03/12] fix PR --- apisix/core/config_etcd.lua | 2 +- ci/linux_openresty_runner.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 3d70680954a3..7d25c387eb92 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -597,7 +597,7 @@ local function sync_data(self) return nil, "missing 'key' arguments" end - if not local_conf.etcd.use_grpc then + if not self.use_grpc then init_watch_ctx(self.key) end diff --git a/ci/linux_openresty_runner.sh b/ci/linux_openresty_runner.sh index 2cdc87b218f3..877248913368 100755 --- a/ci/linux_openresty_runner.sh +++ b/ci/linux_openresty_runner.sh @@ -18,5 +18,5 @@ export OPENRESTY_VERSION=source -export TEST_CI_USE_GRPC=true +#export TEST_CI_USE_GRPC=true . ./ci/linux_openresty_common_runner.sh From 55bb0ae648f8dc2cc0dd74216f5a48718c82e48e Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 02:17:17 +0800 Subject: [PATCH 04/12] fix PR --- apisix/core/config_etcd.lua | 24 +++++---- t/core/etcd-sync.t | 68 ++------------------------ t/plugin/error-log-logger-skywalking.t | 4 +- 3 files changed, 22 insertions(+), 74 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 7d25c387eb92..93c1c9f8e5db 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -36,6 +36,7 @@ local setmetatable = setmetatable local ngx_sleep = require("apisix.core.utils").sleep local ngx_timer_at = ngx.timer.at local ngx_time = ngx.time +local ngx_sleep = ngx.sleep local sub_str = string.sub local tostring = tostring local tonumber = tonumber @@ -43,6 +44,9 @@ local xpcall = xpcall local debug = debug local string = string local error = error +local pairs = pairs +local next = next +local assert = assert local rand = math.random local constants = require("apisix.constants") local health_check = require("resty.etcd.health_check") @@ -111,7 +115,7 @@ local function handle_compacted(err) local res, err2 = watch_ctx.cli:get(watch_ctx.prefix) if not res then log.error(err2) - ngx.sleep(3) + ngx_sleep(3) else watch_ctx.rev = res.body.header.revision break @@ -124,7 +128,7 @@ end local function produce_res(res, err) -- append res and notify pending watchers --log.warn("append res, res: ", require("inspect")(res), ", err: ", require("inspect")(err)) - table.insert(watch_ctx.res, {res=res, err=err}) + insert_tab(watch_ctx.res, {res=res, err=err}) for _, sema in pairs(watch_ctx.sema) do sema:post() end @@ -151,8 +155,10 @@ local function run_watch(premature) local rev = 0 if loaded_configuration then local _, res = next(loaded_configuration) - rev = tonumber(res.headers["X-Etcd-Index"]) - assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]') + if res then + rev = tonumber(res.headers["X-Etcd-Index"]) + assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]') + end end if rev == 0 then @@ -187,7 +193,7 @@ local function run_watch(premature) log.error("watchdir: ", func_err) handle_compacted(func_err) produce_res(nil, func_err) - ngx.sleep(3) + ngx_sleep(3) goto restart_watch end @@ -409,7 +415,7 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) table.clear(res2.result.events) for _, evt in ipairs(res.result.events) do if evt.kv.key:find(key) == 1 then - table.insert(res2.result.events, evt) + insert_tab(res2.result.events, evt) end end --log.warn("res2: ", require("inspect")(res2)) @@ -597,7 +603,7 @@ local function sync_data(self) return nil, "missing 'key' arguments" end - if not self.use_grpc then + if not self.etcd_cli.use_grpc then init_watch_ctx(self.key) end @@ -741,7 +747,7 @@ local function sync_data(self) for i = 1, #values_original do local val = values_original[i] if val then - table.insert(self.values, val) + insert_tab(self.values, val) end end @@ -1045,7 +1051,7 @@ local function create_formatter(prefix) for _, item in ipairs(res.body.kvs) do if curr_dir_data then if core_str.has_prefix(item.key, curr_key) then - table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item)) + insert_tab(curr_dir_data, etcd_apisix.kvs_to_node(item)) goto CONTINUE end diff --git a/t/core/etcd-sync.t b/t/core/etcd-sync.t index e74ae19ec710..aef5e23619a9 100644 --- a/t/core/etcd-sync.t +++ b/t/core/etcd-sync.t @@ -22,65 +22,7 @@ run_tests; __DATA__ -=== TEST 1: minus timeout to watch repeatedly ---- yaml_config -deployment: - role: traditional - role_traditional: - config_provider: etcd - etcd: - # this test requires the HTTP long pull as the gRPC stream is shared and can't change - # default timeout in the fly - use_grpc: false - admin: - admin_key: null ---- config - location /t { - content_by_lua_block { - local core = require("apisix.core") - local t = require("lib.test_admin").test - - local consumers, _ = core.config.new("/consumers", { - automatic = true, - item_schema = core.schema.consumer, - timeout = 0.2 - }) - - ngx.sleep(0.6) - local idx = consumers.prev_index - - local code, body = t('/apisix/admin/consumers', - ngx.HTTP_PUT, - [[{ - "username": "jobs", - "plugins": { - "basic-auth": { - "username": "jobs", - "password": "123456" - } - } - }]]) - - ngx.sleep(2) - local new_idx = consumers.prev_index - core.log.info("idx:", idx, " new_idx: ", new_idx) - if new_idx > idx then - ngx.say("prev_index updated") - else - ngx.say("prev_index not update") - end - } - } ---- request -GET /t ---- response_body -prev_index updated ---- error_log eval -qr/(create watch stream for key|cancel watch connection success)/ - - - -=== TEST 2: using default timeout +=== TEST 1: using default timeout --- config location /t { content_by_lua_block { @@ -126,7 +68,7 @@ waitdir key -=== TEST 3: no update +=== TEST 2: no update --- config location /t { content_by_lua_block { @@ -162,7 +104,7 @@ prev_index not update -=== TEST 4: bad plugin configuration (validated via incremental sync) +=== TEST 3: bad plugin configuration (validated via incremental sync) --- config location /t { content_by_lua_block { @@ -182,7 +124,7 @@ property "uri" validation failed -=== TEST 5: bad plugin configuration (validated via full sync) +=== TEST 4: bad plugin configuration (validated via full sync) --- config location /t { content_by_lua_block { @@ -196,7 +138,7 @@ property "uri" validation failed -=== TEST 6: bad plugin configuration (validated without sync during start) +=== TEST 5: bad plugin configuration (validated without sync during start) --- extra_yaml_config disable_sync_configuration_during_start: true --- config diff --git a/t/plugin/error-log-logger-skywalking.t b/t/plugin/error-log-logger-skywalking.t index ffebf1cf4fa5..edb5003c0988 100644 --- a/t/plugin/error-log-logger-skywalking.t +++ b/t/plugin/error-log-logger-skywalking.t @@ -118,8 +118,8 @@ qr/Batch Processor\[error-log-logger\] failed to process entries: error while se --- request GET /tg --- response_body ---- error_log eval -qr/.*\[\{\"body\":\{\"text\":\{\"text\":\".*this is an error message for test.*\"\}\},\"endpoint\":\"\",\"service\":\"APISIX\",\"serviceInstance\":\"instance\".*/ +--- error_log +this is an error message for test --- wait: 5 From 2dcf9bc92d8b87c9d57962ae99aec6b98da5e6c5 Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 22:21:09 +0800 Subject: [PATCH 05/12] fix PR --- apisix/core/config_etcd.lua | 1 - t/fuzzing/public.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 93c1c9f8e5db..7ab96c0e2397 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -36,7 +36,6 @@ local setmetatable = setmetatable local ngx_sleep = require("apisix.core.utils").sleep local ngx_timer_at = ngx.timer.at local ngx_time = ngx.time -local ngx_sleep = ngx.sleep local sub_str = string.sub local tostring = tostring local tonumber = tonumber diff --git a/t/fuzzing/public.py b/t/fuzzing/public.py index 0897ec476bbe..2a0e38796d7b 100644 --- a/t/fuzzing/public.py +++ b/t/fuzzing/public.py @@ -41,7 +41,7 @@ def check_log(): cmds = ['cat %s | grep -a "error" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] if os.path.exists(boofuzz_log): - cmds.append('cat %s | grep -a "fail"'%boofuzz_log) + cmds.append('cat %s | grep -v "failed to fetch data from etcd: closed" | grep -a "fail"'%boofuzz_log) for cmd in cmds: r = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) err = r.stdout.read().strip() From 6fc46134cecfa87dad9fbd6cad30d537a4162d7b Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 22:49:00 +0800 Subject: [PATCH 06/12] fix PR --- t/fuzzing/public.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/fuzzing/public.py b/t/fuzzing/public.py index 2a0e38796d7b..e69737f39a68 100644 --- a/t/fuzzing/public.py +++ b/t/fuzzing/public.py @@ -39,9 +39,9 @@ def check_log(): apisix_errorlog = apisix_pwd() + "/logs/error.log" apisix_accesslog = apisix_pwd() + "/logs/access.log" - cmds = ['cat %s | grep -a "error" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] + cmds = ['cat %s | grep -a "error" | grep -v "failed to fetch data from etcd: closed" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] if os.path.exists(boofuzz_log): - cmds.append('cat %s | grep -v "failed to fetch data from etcd: closed" | grep -a "fail"'%boofuzz_log) + cmds.append('cat %s | grep -a "fail"'%boofuzz_log) for cmd in cmds: r = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) err = r.stdout.read().strip() From d09a6bbefa721c529c7e0e107fba911cb60ad9f8 Mon Sep 17 00:00:00 2001 From: kingluo Date: Thu, 11 May 2023 22:56:41 +0800 Subject: [PATCH 07/12] fix PR --- t/fuzzing/public.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/t/fuzzing/public.py b/t/fuzzing/public.py index e69737f39a68..49b095e71cc9 100644 --- a/t/fuzzing/public.py +++ b/t/fuzzing/public.py @@ -39,7 +39,10 @@ def check_log(): apisix_errorlog = apisix_pwd() + "/logs/error.log" apisix_accesslog = apisix_pwd() + "/logs/access.log" - cmds = ['cat %s | grep -a "error" | grep -v "failed to fetch data from etcd: closed" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] + cmds = ['cat %s | grep -a "error" \ + | grep -v "failed to fetch data from etcd: closed" \ + | grep -v "upstream timed out (110: Connection timed out) while reading upstream, client: unix:, server: , request: \"POST /v3/watch HTTP/1.1\"" \ + | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] if os.path.exists(boofuzz_log): cmds.append('cat %s | grep -a "fail"'%boofuzz_log) for cmd in cmds: From 36e2a76cd5b193700f20f2007bde7d7ed97b9ad2 Mon Sep 17 00:00:00 2001 From: kingluo Date: Sat, 13 May 2023 23:01:08 +0800 Subject: [PATCH 08/12] fix PR --- apisix/core/config_etcd.lua | 149 ++++++++++++++++-------------------- t/fuzzing/public.py | 5 +- 2 files changed, 68 insertions(+), 86 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 7ab96c0e2397..56883f121c49 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -27,6 +27,7 @@ local json = require("apisix.core.json") local etcd_apisix = require("apisix.core.etcd") local core_str = require("apisix.core.string") local new_tab = require("table.new") +local inspect = require("inspect") local check_schema = require("apisix.core.schema").check local exiting = ngx.worker.exiting local insert_tab = table.insert @@ -107,26 +108,9 @@ local function cancel_watch(http_cli) end -local function handle_compacted(err) - if err == "compacted" then - -- update current etcd revision - while true do - local res, err2 = watch_ctx.cli:get(watch_ctx.prefix) - if not res then - log.error(err2) - ngx_sleep(3) - else - watch_ctx.rev = res.body.header.revision - break - end - end - end -end - - +-- append res to the queue and notify pending watchers local function produce_res(res, err) - -- append res and notify pending watchers - --log.warn("append res, res: ", require("inspect")(res), ", err: ", require("inspect")(err)) + log.info("append res: ", inspect(res), ", err: ", inspect(err)) insert_tab(watch_ctx.res, {res=res, err=err}) for _, sema in pairs(watch_ctx.sema) do sema:post() @@ -144,10 +128,10 @@ local function run_watch(premature) if not local_conf then error("no local conf: " .. err) end - local prefix = local_conf.etcd.prefix + watch_ctx.prefix = local_conf.etcd.prefix .. "/" - local cli, err = get_etcd() - if not cli then + watch_ctx.cli, err = get_etcd() + if not watch_ctx.cli then error("failed to create etcd instance: " .. string(err)) end @@ -161,82 +145,75 @@ local function run_watch(premature) end if rev == 0 then - local res, err = cli:get(prefix) - if err then - error(err) + while true do + local res, err = watch_ctx.cli:get(watch_ctx.prefix) + if not res then + log.error("etcd get: ", err) + ngx_sleep(3) + else + watch_ctx.rev = tonumber(res.body.header.revision) + break + end end - rev = tonumber(res.body.header.revision) - assert(rev > 0, 'invalid res.body.header.revision') end - log.warn("main watcher started, revision=", rev) + watch_ctx.rev = rev + 1 watch_ctx.started = true - watch_ctx.prefix = prefix - watch_ctx.cli = cli - watch_ctx.rev = rev + log.warn("main etcd watcher started, revision=", watch_ctx.rev) for _, sema in pairs(watch_ctx.wait_init) do sema:post() end watch_ctx.wait_init = nil local opts = {} - opts.timeout = 60 -- second + opts.timeout = 50 -- second opts.need_cancel = true ::restart_watch:: while true do - opts.start_revision = watch_ctx.rev + 1 - local res_func, func_err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) + opts.start_revision = watch_ctx.rev + log.info("restart watchdir: start_revision=", opts.start_revision) + local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) if not res_func then - log.error("watchdir: ", func_err) - handle_compacted(func_err) - produce_res(nil, func_err) + log.error("watchdir: ", err) ngx_sleep(3) goto restart_watch end + ::watch_event:: while true do local res, err = res_func() - --log.warn("res_func: ", require("inspect")(res)) - --if err then log.warn("res_func_err: ", require("inspect")(err)) end - - if err == "closed" then - break - end + log.info("res_func: ", inspect(res)) if not res then + if err ~= "closed" and + err ~= "timeout" and + err ~= "broken pipe" then + log.error("wait watch event: ", err) + end cancel_watch(http_cli) - handle_compacted(err) - produce_res(nil, err) break end - -- in etcd v3, the 1st res of watch is watch info, useless to us. - -- try twice to skip create info - if not res.result or not res.result.events then - res, err = res_func() - end - - if err == "closed" then + if res.error then + log.error("wait watch event: ", inspect(res.error)) + cancel_watch(http_cli) break end - if not res then - cancel_watch(http_cli) - handle_compacted(err) - produce_res(nil, err) - break + if res.result.created then + goto watch_event end - if type(res.result) ~= "table" then - cancel_watch(http_cli) - err = "failed to wait etcd dir" - if res.error and res.error.message then - err = err .. ": " .. res.error.message + if res.result.canceled then + log.warn("watch canceled by etcd, res: ", inspect(res)) + if res.result.compact_revision then + watch_ctx.rev = tonumber(res.result.compact_revision) + log.warn("etcd compacted, compact_revision=", watch_ctx.rev) + produce_res(nil, "compacted") end - handle_compacted(err) - produce_res(nil, err) + cancel_watch(http_cli) break end @@ -247,19 +224,24 @@ local function run_watch(premature) min_idx = idx end end - if min_idx > 10 then + + for i = 1,min_idx-1 do + watch_ctx.res[i] = false + end + + if min_idx > 100 then for k, idx in pairs(watch_ctx.idx) do - watch_ctx[k] = idx-min_idx+1 + watch_ctx.idx[k] = idx-min_idx+1 end -- trim the res table for i = 1,min_idx-1 do - table.remove(watch_ctx.res, i) + table.remove(watch_ctx.res, 1) end end local rev = tonumber(res.result.header.revision) if rev > watch_ctx.rev then - watch_ctx.rev = rev + watch_ctx.rev = rev + 1 end produce_res(res) end @@ -274,11 +256,8 @@ local function init_watch_ctx(key) res = {}, sema = {}, wait_init = {}, + started = false, } - end - - if watch_ctx.started == nil then - watch_ctx.started = false ngx_timer_at(0, run_watch) end @@ -294,7 +273,7 @@ local function init_watch_ctx(key) if ok then break end - log.error("sema wait, key: ", key, ", err: ", err) + log.error("wait main watcher to start, key: ", key, ", err: ", err) end end end @@ -389,10 +368,15 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) ::iterate_events:: for i = watch_ctx.idx[key],#watch_ctx.res do + watch_ctx.idx[key] = i+1 + local item = watch_ctx.res[i] + if item == false then + goto iterate_events + end + local res, err = item.res, item.err if err then - watch_ctx.idx[key] = i+1 return res, err end @@ -408,8 +392,6 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) end if found then - watch_ctx.idx[key] = i+1 - local res2 = tablex.deepcopy(res) table.clear(res2.result.events) for _, evt in ipairs(res.result.events) do @@ -417,7 +399,7 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) insert_tab(res2.result.events, evt) end end - --log.warn("res2: ", require("inspect")(res2)) + log.info("http_waitdir: ", inspect(res2)) return res2 end end @@ -432,12 +414,15 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) end watch_ctx.sema[key] = self.watch_sema - local ok, err = self.watch_sema:wait(60) + local ok, err = self.watch_sema:wait(timeout or 60) watch_ctx.sema[key] = nil - if ok or err == "timeout" then + if ok then goto iterate_events else - log.error(err) + if err ~= "timeout" then + log.error("wait watch event, key=", key, ", err: ", err) + end + return nil, err end end @@ -746,7 +731,7 @@ local function sync_data(self) for i = 1, #values_original do local val = values_original[i] if val then - insert_tab(self.values, val) + table.insert(self.values, val) end end @@ -1050,7 +1035,7 @@ local function create_formatter(prefix) for _, item in ipairs(res.body.kvs) do if curr_dir_data then if core_str.has_prefix(item.key, curr_key) then - insert_tab(curr_dir_data, etcd_apisix.kvs_to_node(item)) + table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item)) goto CONTINUE end diff --git a/t/fuzzing/public.py b/t/fuzzing/public.py index 49b095e71cc9..0897ec476bbe 100644 --- a/t/fuzzing/public.py +++ b/t/fuzzing/public.py @@ -39,10 +39,7 @@ def check_log(): apisix_errorlog = apisix_pwd() + "/logs/error.log" apisix_accesslog = apisix_pwd() + "/logs/access.log" - cmds = ['cat %s | grep -a "error" \ - | grep -v "failed to fetch data from etcd: closed" \ - | grep -v "upstream timed out (110: Connection timed out) while reading upstream, client: unix:, server: , request: \"POST /v3/watch HTTP/1.1\"" \ - | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] + cmds = ['cat %s | grep -a "error" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog] if os.path.exists(boofuzz_log): cmds.append('cat %s | grep -a "fail"'%boofuzz_log) for cmd in cmds: From 760f9e6272edfe9012b7074cb2d27f9ed49033c3 Mon Sep 17 00:00:00 2001 From: kingluo Date: Mon, 22 May 2023 10:34:42 +0800 Subject: [PATCH 09/12] code formating fix --- apisix/core/config_etcd.lua | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 56883f121c49..6626d364411b 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -189,7 +189,8 @@ local function run_watch(premature) if not res then if err ~= "closed" and err ~= "timeout" and - err ~= "broken pipe" then + err ~= "broken pipe" + then log.error("wait watch event: ", err) end cancel_watch(http_cli) @@ -225,16 +226,16 @@ local function run_watch(premature) end end - for i = 1,min_idx-1 do + for i = 1, min_idx - 1 do watch_ctx.res[i] = false end if min_idx > 100 then for k, idx in pairs(watch_ctx.idx) do - watch_ctx.idx[k] = idx-min_idx+1 + watch_ctx.idx[k] = idx - min_idx + 1 end -- trim the res table - for i = 1,min_idx-1 do + for i = 1, min_idx - 1 do table.remove(watch_ctx.res, 1) end end @@ -367,8 +368,8 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) end ::iterate_events:: - for i = watch_ctx.idx[key],#watch_ctx.res do - watch_ctx.idx[key] = i+1 + for i = watch_ctx.idx[key], #watch_ctx.res do + watch_ctx.idx[key] = i + 1 local item = watch_ctx.res[i] if item == false then From a9de0f2f71aeeaacbd1662891b4df80f096b85e9 Mon Sep 17 00:00:00 2001 From: kingluo Date: Mon, 22 May 2023 11:55:34 +0800 Subject: [PATCH 10/12] log inspect for ngx.INFO --- apisix/core/config_etcd.lua | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 6626d364411b..9aa7af69c143 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -28,6 +28,8 @@ local etcd_apisix = require("apisix.core.etcd") local core_str = require("apisix.core.string") local new_tab = require("table.new") local inspect = require("inspect") +local errlog = require("ngx.errlog") +local log_level = errlog.get_sys_filter_level() local check_schema = require("apisix.core.schema").check local exiting = ngx.worker.exiting local insert_tab = table.insert @@ -110,7 +112,9 @@ end -- append res to the queue and notify pending watchers local function produce_res(res, err) - log.info("append res: ", inspect(res), ", err: ", inspect(err)) + if log_level >= ngx.INFO then + log.info("append res: ", inspect(res), ", err: ", inspect(err)) + end insert_tab(watch_ctx.res, {res=res, err=err}) for _, sema in pairs(watch_ctx.sema) do sema:post() @@ -184,7 +188,9 @@ local function run_watch(premature) ::watch_event:: while true do local res, err = res_func() - log.info("res_func: ", inspect(res)) + if log_level >= ngx.INFO then + log.info("res_func: ", inspect(res)) + end if not res then if err ~= "closed" and @@ -400,7 +406,9 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) insert_tab(res2.result.events, evt) end end - log.info("http_waitdir: ", inspect(res2)) + if log_level >= ngx.INFO then + log.info("http_waitdir: ", inspect(res2)) + end return res2 end end From 4eb055b6577518d3625ec5f22d84b0050596f02a Mon Sep 17 00:00:00 2001 From: kingluo Date: Mon, 22 May 2023 11:58:40 +0800 Subject: [PATCH 11/12] fix PR --- apisix/core/config_etcd.lua | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 9aa7af69c143..bba89cc5b036 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -30,6 +30,7 @@ local new_tab = require("table.new") local inspect = require("inspect") local errlog = require("ngx.errlog") local log_level = errlog.get_sys_filter_level() +local NGX_INFO = ngx.INFO local check_schema = require("apisix.core.schema").check local exiting = ngx.worker.exiting local insert_tab = table.insert @@ -112,7 +113,7 @@ end -- append res to the queue and notify pending watchers local function produce_res(res, err) - if log_level >= ngx.INFO then + if log_level >= NGX_INFO then log.info("append res: ", inspect(res), ", err: ", inspect(err)) end insert_tab(watch_ctx.res, {res=res, err=err}) @@ -188,7 +189,7 @@ local function run_watch(premature) ::watch_event:: while true do local res, err = res_func() - if log_level >= ngx.INFO then + if log_level >= NGX_INFO then log.info("res_func: ", inspect(res)) end @@ -406,7 +407,7 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) insert_tab(res2.result.events, evt) end end - if log_level >= ngx.INFO then + if log_level >= NGX_INFO then log.info("http_waitdir: ", inspect(res2)) end return res2 From 70f643b6b39b56a8941d9fd415327953001ff6fd Mon Sep 17 00:00:00 2001 From: kingluo Date: Tue, 23 May 2023 11:56:58 +0800 Subject: [PATCH 12/12] fix PR --- apisix/core/config_etcd.lua | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index bba89cc5b036..ecb76270452d 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -388,29 +388,25 @@ local function http_waitdir(self, etcd_cli, key, modified_index, timeout) return res, err end - local found = false -- ignore res with revision smaller then self.prev_index if tonumber(res.result.header.revision) > self.prev_index then + local res2 for _, evt in ipairs(res.result.events) do if evt.kv.key:find(key) == 1 then - found = true - break + if not res2 then + res2 = tablex.deepcopy(res) + table.clear(res2.result.events) + end + insert_tab(res2.result.events, evt) end end - end - if found then - local res2 = tablex.deepcopy(res) - table.clear(res2.result.events) - for _, evt in ipairs(res.result.events) do - if evt.kv.key:find(key) == 1 then - insert_tab(res2.result.events, evt) + if res2 then + if log_level >= NGX_INFO then + log.info("http_waitdir: ", inspect(res2)) end + return res2 end - if log_level >= NGX_INFO then - log.info("http_waitdir: ", inspect(res2)) - end - return res2 end end