From 873081ed70364278d2944bee4be0fa81aaf3c44b Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 14:13:42 +0800 Subject: [PATCH 01/27] consul health endpoint --- apisix/discovery/consul/init.lua | 313 ++++++++++++++++++++++--------- 1 file changed, 220 insertions(+), 93 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 686ab4120f18..5742aef1a607 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -32,6 +32,10 @@ local ngx_timer_every = ngx.timer.every local log = core.log local json_delay_encode = core.json.delay_encode local ngx_worker_id = ngx.worker.id +local thread_spawn = ngx.thread.spawn +local thread_wait = ngx.thread.wait +local thread_kill = ngx.thread.kill +local math_random = math.random local all_services = core.table.new(0, 5) local default_service @@ -44,6 +48,9 @@ local events_list local consul_services local default_skip_services = {"consul"} +local default_random_seed = 5 +local default_catalog_error_index = -1 +local default_health_error_index = -2 local _M = { version = 0.2, @@ -175,138 +182,255 @@ local function get_retry_delay(retry_delay) return retry_delay end +local function watch_catalog(consul_server) + local c = resty_consul:new({ + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + default_args = consul_server.default_catalog_args, + }) + + local watch_result, watch_err = c:get(consul_server.consul_watch_catalog_url) + local watch_error_info = (watch_err ~= nil and watch_err) + or ((watch_result ~= nil and watch_result.status ~= 200) + and watch_result.status) + if watch_error_info then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got watch result: ", json_delay_encode(watch_result, true), + ", with error: ", watch_error_info) + + return default_catalog_error_index + end + log.info("<------> watch_catalog: ", json_delay_encode(watch_result, true)) + return watch_result.headers['X-Consul-Index'] +end + +local function watch_health(consul_server) + local c = resty_consul:new({ + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + default_args = consul_server.default_health_args, + }) + + local watch_result, watch_err = c:get(consul_server.consul_watch_health_url) + local watch_error_info = (watch_err ~= nil and watch_err) + or ((watch_result ~= nil and watch_result.status ~= 200) + and watch_result.status) + if watch_error_info then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_health_url, + ", got watch result: ", json_delay_encode(watch_result, true), + ", with error: ", watch_error_info) + + return default_health_error_index + end + log.info("------> watch_health: ", json_delay_encode(watch_result, true)) + return watch_result.headers['X-Consul-Index'] +end + +local function check_keepalive(consul_server, retry_delay) + if consul_server.keepalive then + local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay) + if not ok then + log.error("create ngx_timer_at got error: ", err) + return + end + end +end + + +local function update_index(consul_server, catalog_index, health_index) + local c_index = 0 + local h_index = 0 + if catalog_index ~= nil then + c_index = tonumber(catalog_index) + end + + if health_index ~= nil then + h_index = tonumber(health_index) + end + + if c_index > 0 then + consul_server.catalog_index = c_index + -- only long connect type use index + if consul_server.keepalive then + consul_server.default_catalog_args.index = c_index + end + end + + if h_index > 0 then + consul_server.health_index = h_index + -- only long connect type use index + if consul_server.keepalive then + consul_server.default_health_args.index = h_index + end + end +end function _M.connect(premature, consul_server, retry_delay) if premature then return end + local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server) + if not catalog_thread then + log.error("failed to spawn thread watch catalog: ", spawn_catalog_err) + local random_delay = math_random(default_random_seed) + log.warn("failed to spawn thread watch catalog, retry connecting consul after ", random_delay, " seconds") + core_sleep(random_delay) + + check_keepalive(consul_server, retry_delay) + return + end + + local health_thread, err = thread_spawn(watch_health, consul_server) + if not health_thread then + log.error("failed to spawn thread watch health: ", err) + local random_delay = math_random(default_random_seed) + log.warn("failed to spawn thread watch health, retry connecting consul after ", random_delay, " seconds") + core_sleep(random_delay) + + check_keepalive(consul_server, retry_delay) + return + end + + local thread_wait_ok, catalog_index, health_index = thread_wait(catalog_thread, health_thread) + thread_kill(health_thread) + thread_kill(catalog_thread) + if not thread_wait_ok then + log.error("failed to wait thread: ", err, ", catalog_index: ", catalog_index, ", catalog_index: ", catalog_index) + local random_delay = math_random(default_random_seed) + log.warn("failed to wait thread, retry connecting consul after ", random_delay, " seconds") + core_sleep(random_delay) + + check_keepalive(consul_server, retry_delay) + return + end + local consul_client = resty_consul:new({ host = consul_server.host, port = consul_server.port, connect_timeout = consul_server.connect_timeout, read_timeout = consul_server.read_timeout, - default_args = consul_server.default_args, }) - log.info("consul_server: ", json_delay_encode(consul_server, true)) - local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url) - local watch_error_info = (watch_err ~= nil and watch_err) - or ((watch_result ~= nil and watch_result.status ~= 200) - and watch_result.status) + local catalog_res, catalog_err = consul_client:get(consul_server.consul_watch_catalog_url) + local watch_error_info = (catalog_err ~= nil and catalog_err) + or ((catalog_res ~= nil and catalog_res.status ~= 200) + and catalog_res.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, - " by sub url: ", consul_server.consul_watch_sub_url, - ", got watch result: ", json_delay_encode(watch_result, true), - ", with error: ", watch_error_info) + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got catalog result: ", json_delay_encode(catalog_res, true), + ", with error: ", watch_error_info) retry_delay = get_retry_delay(retry_delay) - log.warn("retry connecting consul after ", retry_delay, " seconds") + log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) - goto ERR + check_keepalive(consul_server, retry_delay) + return end log.info("connect consul: ", consul_server.consul_server_url, - ", watch_result status: ", watch_result.status, - ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'], - ", consul_server.index: ", consul_server.index, - ", consul_server: ", json_delay_encode(consul_server, true)) - - -- if current index different last index then update service - if consul_server.index ~= watch_result.headers['X-Consul-Index'] then - local up_services = core.table.new(0, #watch_result.body) - local consul_client_svc = resty_consul:new({ - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - }) - for service_name, _ in pairs(watch_result.body) do - -- check if the service_name is 'skip service' - if skip_service_map[service_name] then - goto CONTINUE - end - -- get node from service - local svc_url = consul_server.consul_sub_url .. "/" .. service_name - local result, err = consul_client_svc:get(svc_url) - local error_info = (err ~= nil and err) or - ((result ~= nil and result.status ~= 200) and result.status) - if error_info then - log.error("connect consul: ", consul_server.consul_server_url, + ", catalog_result status: ", catalog_res.status, + ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], + ", consul_server.index: ", consul_server.index, + ", consul_server: ", json_delay_encode(consul_server, true)) + + local up_services = core.table.new(0, #catalog_res.body) + for service_name, _ in pairs(catalog_res.body) do + -- check if the service_name is 'skip service' + if skip_service_map[service_name] then + goto CONTINUE + end + -- get node from service + local svc_url = consul_server.consul_sub_url .. "/" .. service_name + if consul_client == nil then + consul_client = resty_consul:new({ + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + }) + end + local result, get_err = consul_client:get(svc_url) + local error_info = (get_err ~= nil and get_err) or + ((result ~= nil and result.status ~= 200) and result.status) + if error_info then + log.error("connect consul: ", consul_server.consul_server_url, ", by service url: ", svc_url, ", with error: ", error_info) - goto CONTINUE - end + goto CONTINUE + end - -- decode body, decode json, update service, error handling - if result.body then - log.notice("service url: ", svc_url, + -- decode body, decode json, update service, error handling + if result.body then + log.notice("service url: ", svc_url, ", header: ", json_delay_encode(result.headers, true), ", body: ", json_delay_encode(result.body, true)) - -- add services to table - local nodes = up_services[service_name] - for _, node in ipairs(result.body) do - local svc_address, svc_port = node.ServiceAddress, node.ServicePort - if not svc_address then - svc_address = node.Address - end - -- if nodes is nil, new nodes table and set to up_services - if not nodes then - nodes = core.table.new(1, 0) - up_services[service_name] = nodes - end - -- add node to nodes table - core.table.insert(nodes, { - host = svc_address, - port = tonumber(svc_port), - weight = default_weight, - }) + -- add services to table + local nodes = up_services[service_name] + for _, node in ipairs(result.body) do + if not node.Service then + goto CONTINUE + end + local svc_address, svc_port = node.Service.Address, node.Service.Port + if not svc_address then + svc_address = node.Address end - up_services[service_name] = nodes + -- if nodes is nil, new nodes table and set to up_services + if not nodes then + nodes = core.table.new(1, 0) + up_services[service_name] = nodes + end + -- add node to nodes table + core.table.insert(nodes, { + host = svc_address, + port = tonumber(svc_port), + weight = default_weight, + }) end - :: CONTINUE :: + up_services[service_name] = nodes end + :: CONTINUE :: + end - update_all_services(consul_server.consul_server_url, up_services) - - --update events - local ok, post_err = events.post(events_list._source, events_list.updating, all_services) - if not ok then - log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) - end + update_all_services(consul_server.consul_server_url, up_services) - if dump_params then - ngx_timer_at(0, write_dump_services) - end + --update events + local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) + if not post_ok then + log.error("post_event failure with ", events_list._source, ", update all services error: ", post_err) + end - consul_server.index = watch_result.headers['X-Consul-Index'] - -- only long connect type use index - if consul_server.keepalive then - consul_server.default_args.index = watch_result.headers['X-Consul-Index'] - end + if dump_params then + ngx_timer_at(0, write_dump_services) end + update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) + :: ERR :: - local keepalive = consul_server.keepalive - if keepalive then - local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay) - if not ok then - log.error("create ngx_timer_at got error: ", err) - return - end - end + check_keepalive(consul_server, retry_delay) end local function format_consul_params(consul_conf) local consul_server_list = core.table.new(0, #consul_conf.servers) - local args + local catalog_args, health_args if consul_conf.keepalive == false then - args = {} + catalog_args = {} + health_args = {} elseif consul_conf.keepalive then - args = { + catalog_args = { + wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0 + index = 0, + } + health_args = { wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0 index = 0, } @@ -325,13 +449,16 @@ local function format_consul_params(consul_conf) port = port, connect_timeout = consul_conf.timeout.connect, read_timeout = consul_conf.timeout.read, - consul_sub_url = "/catalog/service", - consul_watch_sub_url = "/catalog/services", + consul_watch_catalog_url = "/catalog/services", + consul_sub_url = "/health/service", + consul_watch_health_url = "/health/state/any", consul_server_url = v .. "/v1", weight = consul_conf.weight, keepalive = consul_conf.keepalive, - default_args = args, - index = 0, + default_catalog_args = catalog_args, + default_health_args = health_args, + health_index = 0, + catalog_index = 0, fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul }) end From 6e828ce40a965d29c9d22de4d12372961bc1a28f Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 15:12:31 +0800 Subject: [PATCH 02/27] consul health endpoint --- apisix/discovery/consul/init.lua | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 5742aef1a607..9202fbadbb29 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -203,7 +203,6 @@ local function watch_catalog(consul_server) return default_catalog_error_index end - log.info("<------> watch_catalog: ", json_delay_encode(watch_result, true)) return watch_result.headers['X-Consul-Index'] end @@ -228,7 +227,6 @@ local function watch_health(consul_server) return default_health_error_index end - log.info("------> watch_health: ", json_delay_encode(watch_result, true)) return watch_result.headers['X-Consul-Index'] end @@ -298,11 +296,11 @@ function _M.connect(premature, consul_server, retry_delay) return end - local thread_wait_ok, catalog_index, health_index = thread_wait(catalog_thread, health_thread) + local thread_wait_ok, wait_res = thread_wait(catalog_thread, health_thread) thread_kill(health_thread) thread_kill(catalog_thread) if not thread_wait_ok then - log.error("failed to wait thread: ", err, ", catalog_index: ", catalog_index, ", catalog_index: ", catalog_index) + log.error("failed to wait thread: ", err, ", wait_res: ", wait_res) local random_delay = math_random(default_random_seed) log.warn("failed to wait thread, retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) @@ -411,6 +409,12 @@ function _M.connect(premature, consul_server, retry_delay) ngx_timer_at(0, write_dump_services) end + -- get health index + local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) + local health_index + if (health_err == nil ) and (health_res ~= nil and health_res.status == 200) then + health_index = health_res.headers['X-Consul-Index'] + end update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) :: ERR :: From 7e220b932a332f4dc0a6e99e58c0710c35959477 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 15:13:00 +0800 Subject: [PATCH 03/27] consul health endpoint --- apisix/discovery/consul/init.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 9202fbadbb29..323756a2339a 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -53,7 +53,7 @@ local default_catalog_error_index = -1 local default_health_error_index = -2 local _M = { - version = 0.2, + version = 0.3, } From 0c51f423ebc33ca334737b4d51a2a575a8e25abf Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 15:35:31 +0800 Subject: [PATCH 04/27] consul health endpoint --- apisix/discovery/consul/init.lua | 10 +++--- t/discovery/consul_dump.t | 52 ++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 323756a2339a..91f6c5684acd 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -278,7 +278,8 @@ function _M.connect(premature, consul_server, retry_delay) if not catalog_thread then log.error("failed to spawn thread watch catalog: ", spawn_catalog_err) local random_delay = math_random(default_random_seed) - log.warn("failed to spawn thread watch catalog, retry connecting consul after ", random_delay, " seconds") + log.warn("failed to spawn thread watch catalog, retry connecting consul after ", + random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) @@ -289,7 +290,8 @@ function _M.connect(premature, consul_server, retry_delay) if not health_thread then log.error("failed to spawn thread watch health: ", err) local random_delay = math_random(default_random_seed) - log.warn("failed to spawn thread watch health, retry connecting consul after ", random_delay, " seconds") + log.warn("failed to spawn thread watch health, retry connecting consul after ", + random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) @@ -402,7 +404,8 @@ function _M.connect(premature, consul_server, retry_delay) --update events local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) if not post_ok then - log.error("post_event failure with ", events_list._source, ", update all services error: ", post_err) + log.error("post_event failure with ", events_list._source, + ", update all services error: ", post_err) end if dump_params then @@ -417,7 +420,6 @@ function _M.connect(premature, consul_server, retry_delay) end update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) - :: ERR :: check_keepalive(consul_server, retry_delay) end diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t index 366c76a989fb..27864f34a60e 100644 --- a/t/discovery/consul_dump.t +++ b/t/discovery/consul_dump.t @@ -451,3 +451,55 @@ discovery: GET /bonjour --- response_body {"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}],"service_b":[{"host":"127.0.0.1","port":30517,"weight":1}]} + + + +=== TEST 16: prepare nodes with consul health check +--- config +location /v1/agent { + proxy_pass http://127.0.0.1:8500; +} +--- request eval +[ + "PUT /v1/agent/service/deregister/service_a1", + "PUT /v1/agent/service/deregister/service_a2", + "PUT /v1/agent/service/deregister/service_b1", + "PUT /v1/agent/service/deregister/service_b2", + "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://baidu.com\",\"interval\": \"1s\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://127.0.0.1:8002\",\"interval\": \"1s\"}],\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":8002,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", +] +--- response_body eval +--- error_code eval +[200, 200, 200, 200, 200, 200] + + + +=== TEST 17: show dump services with consul health check +--- yaml_config +apisix: + node_listen: 1984 + enable_control: true +discovery: + consul: + servers: + - "http://127.0.0.1:8500" + dump: + path: "consul.dump" + load_on_init: false +--- config + location /t { + content_by_lua_block { + local json = require("toolkit.json") + local t = require("lib.test_admin") + ngx.sleep(2) + local code, body, res = t.test('/v1/discovery/consul/show_dump_file', + ngx.HTTP_GET) + local entity = json.decode(res) + ngx.say(json.encode(entity.services)) + } + } +--- timeout: 3 +--- request +GET /t +--- response_body +{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}],"service_b":[{"host":"127.0.0.1","port":8002,"weight":1}]} From 8bb557ebc5a35984dcf3e690169acd926acf07ad Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 18:47:16 +0800 Subject: [PATCH 05/27] consul health endpoint --- apisix/discovery/consul/init.lua | 33 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 91f6c5684acd..33c7dba9fd8b 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -197,9 +197,9 @@ local function watch_catalog(consul_server) and watch_result.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, - " by sub url: ", consul_server.consul_watch_catalog_url, - ", got watch result: ", json_delay_encode(watch_result, true), - ", with error: ", watch_error_info) + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got watch result: ", json_delay_encode(watch_result, true), + ", with error: ", watch_error_info) return default_catalog_error_index end @@ -221,9 +221,9 @@ local function watch_health(consul_server) and watch_result.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, - " by sub url: ", consul_server.consul_watch_health_url, - ", got watch result: ", json_delay_encode(watch_result, true), - ", with error: ", watch_error_info) + " by sub url: ", consul_server.consul_watch_health_url, + ", got watch result: ", json_delay_encode(watch_result, true), + ", with error: ", watch_error_info) return default_health_error_index end @@ -288,6 +288,7 @@ function _M.connect(premature, consul_server, retry_delay) local health_thread, err = thread_spawn(watch_health, consul_server) if not health_thread then + thread_kill(catalog_thread) log.error("failed to spawn thread watch health: ", err) local random_delay = math_random(default_random_seed) log.warn("failed to spawn thread watch health, retry connecting consul after ", @@ -324,9 +325,9 @@ function _M.connect(premature, consul_server, retry_delay) and catalog_res.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, - " by sub url: ", consul_server.consul_watch_catalog_url, - ", got catalog result: ", json_delay_encode(catalog_res, true), - ", with error: ", watch_error_info) + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got catalog result: ", json_delay_encode(catalog_res, true), + ", with error: ", watch_error_info) retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") @@ -337,10 +338,10 @@ function _M.connect(premature, consul_server, retry_delay) end log.info("connect consul: ", consul_server.consul_server_url, - ", catalog_result status: ", catalog_res.status, - ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], - ", consul_server.index: ", consul_server.index, - ", consul_server: ", json_delay_encode(consul_server, true)) + ", catalog_result status: ", catalog_res.status, + ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], + ", consul_server.index: ", consul_server.index, + ", consul_server: ", json_delay_encode(consul_server, true)) local up_services = core.table.new(0, #catalog_res.body) for service_name, _ in pairs(catalog_res.body) do @@ -363,15 +364,15 @@ function _M.connect(premature, consul_server, retry_delay) ((result ~= nil and result.status ~= 200) and result.status) if error_info then log.error("connect consul: ", consul_server.consul_server_url, - ", by service url: ", svc_url, ", with error: ", error_info) + ", by service url: ", svc_url, ", with error: ", error_info) goto CONTINUE end -- decode body, decode json, update service, error handling if result.body then log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) + ", header: ", json_delay_encode(result.headers, true), + ", body: ", json_delay_encode(result.body, true)) -- add services to table local nodes = up_services[service_name] for _, node in ipairs(result.body) do From 0c95c0b9af4d121fcad49922c5b55284638dc42f Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 20:33:10 +0800 Subject: [PATCH 06/27] consul health endpoint --- apisix/discovery/consul/init.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 33c7dba9fd8b..aa44ea40f149 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -359,7 +359,7 @@ function _M.connect(premature, consul_server, retry_delay) read_timeout = consul_server.read_timeout, }) end - local result, get_err = consul_client:get(svc_url) + local result, get_err = consul_client:get(svc_url, {passing = true}) local error_info = (get_err ~= nil and get_err) or ((result ~= nil and result.status ~= 200) and result.status) if error_info then From 1f913f1367f0117cf7ca5fc6c36d2b79f8325990 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 30 Mar 2023 21:18:48 +0800 Subject: [PATCH 07/27] consul health endpoint --- t/discovery/consul_dump.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t index 27864f34a60e..c74e5b1500bd 100644 --- a/t/discovery/consul_dump.t +++ b/t/discovery/consul_dump.t @@ -502,4 +502,4 @@ discovery: --- request GET /t --- response_body -{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}],"service_b":[{"host":"127.0.0.1","port":8002,"weight":1}]} +{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}]} From c1c71e3506bb71191ec31ad51e9dc96e56f62ffb Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 31 Mar 2023 08:52:19 +0800 Subject: [PATCH 08/27] consul health endpoint --- apisix/discovery/consul/init.lua | 140 ++++++++++++++++--------------- 1 file changed, 72 insertions(+), 68 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index aa44ea40f149..67a69183f80f 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -343,83 +343,87 @@ function _M.connect(premature, consul_server, retry_delay) ", consul_server.index: ", consul_server.index, ", consul_server: ", json_delay_encode(consul_server, true)) - local up_services = core.table.new(0, #catalog_res.body) - for service_name, _ in pairs(catalog_res.body) do - -- check if the service_name is 'skip service' - if skip_service_map[service_name] then - goto CONTINUE - end - -- get node from service - local svc_url = consul_server.consul_sub_url .. "/" .. service_name - if consul_client == nil then - consul_client = resty_consul:new({ - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - }) - end - local result, get_err = consul_client:get(svc_url, {passing = true}) - local error_info = (get_err ~= nil and get_err) or - ((result ~= nil and result.status ~= 200) and result.status) - if error_info then - log.error("connect consul: ", consul_server.consul_server_url, - ", by service url: ", svc_url, ", with error: ", error_info) - goto CONTINUE - end + -- if current index different last index then update service + if consul_server.catalog_index ~= catalog_res.headers['X-Consul-Index'] then + local up_services = core.table.new(0, #catalog_res.body) + for service_name, _ in pairs(catalog_res.body) do + -- check if the service_name is 'skip service' + if skip_service_map[service_name] then + goto CONTINUE + end + -- get node from service + local svc_url = consul_server.consul_sub_url .. "/" .. service_name + if consul_client == nil then + consul_client = resty_consul:new({ + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + }) + end + local result, get_err = consul_client:get(svc_url, {passing = true}) + local error_info = (get_err ~= nil and get_err) or + ((result ~= nil and result.status ~= 200) and result.status) + if error_info then + log.error("connect consul: ", consul_server.consul_server_url, + ", by service url: ", svc_url, ", with error: ", error_info) + goto CONTINUE + end - -- decode body, decode json, update service, error handling - if result.body then - log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) - -- add services to table - local nodes = up_services[service_name] - for _, node in ipairs(result.body) do - if not node.Service then - goto CONTINUE - end - local svc_address, svc_port = node.Service.Address, node.Service.Port - if not svc_address then - svc_address = node.Address + -- decode body, decode json, update service, error handling + if result.body then + log.notice("service url: ", svc_url, + ", header: ", json_delay_encode(result.headers, true), + ", body: ", json_delay_encode(result.body, true)) + -- add services to table + local nodes = up_services[service_name] + for _, node in ipairs(result.body) do + if not node.Service then + goto CONTINUE + end + local svc_address, svc_port = node.Service.Address, node.Service.Port + if not svc_address then + svc_address = node.Address + end + -- if nodes is nil, new nodes table and set to up_services + if not nodes then + nodes = core.table.new(1, 0) + up_services[service_name] = nodes + end + -- add node to nodes table + core.table.insert(nodes, { + host = svc_address, + port = tonumber(svc_port), + weight = default_weight, + }) end - -- if nodes is nil, new nodes table and set to up_services - if not nodes then - nodes = core.table.new(1, 0) - up_services[service_name] = nodes - end - -- add node to nodes table - core.table.insert(nodes, { - host = svc_address, - port = tonumber(svc_port), - weight = default_weight, - }) + up_services[service_name] = nodes end - up_services[service_name] = nodes + :: CONTINUE :: end - :: CONTINUE :: - end - update_all_services(consul_server.consul_server_url, up_services) + update_all_services(consul_server.consul_server_url, up_services) - --update events - local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) - if not post_ok then - log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) - end + --update events + local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) + if not post_ok then + log.error("post_event failure with ", events_list._source, + ", update all services error: ", post_err) + end - if dump_params then - ngx_timer_at(0, write_dump_services) - end + if dump_params then + ngx_timer_at(0, write_dump_services) + end - -- get health index - local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) - local health_index - if (health_err == nil ) and (health_res ~= nil and health_res.status == 200) then - health_index = health_res.headers['X-Consul-Index'] + -- get health index + local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) + local health_index + if health_err == nil and (health_res ~= nil and health_res.status == 200) + and (consul_server.health_index ~= health_res.headers['X-Consul-Index']) then + health_index = health_res.headers['X-Consul-Index'] + end + update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) end - update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) check_keepalive(consul_server, retry_delay) end From d51afaee55451c0c3d0ab4e603802feb9b782a71 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 31 Mar 2023 08:58:08 +0800 Subject: [PATCH 09/27] consul health endpoint --- apisix/discovery/consul/init.lua | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 67a69183f80f..90793fc1a42d 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -353,28 +353,20 @@ function _M.connect(premature, consul_server, retry_delay) end -- get node from service local svc_url = consul_server.consul_sub_url .. "/" .. service_name - if consul_client == nil then - consul_client = resty_consul:new({ - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - }) - end local result, get_err = consul_client:get(svc_url, {passing = true}) local error_info = (get_err ~= nil and get_err) or ((result ~= nil and result.status ~= 200) and result.status) if error_info then log.error("connect consul: ", consul_server.consul_server_url, - ", by service url: ", svc_url, ", with error: ", error_info) + ", by service url: ", svc_url, ", with error: ", error_info) goto CONTINUE end -- decode body, decode json, update service, error handling if result.body then log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) + ", header: ", json_delay_encode(result.headers, true), + ", body: ", json_delay_encode(result.body, true)) -- add services to table local nodes = up_services[service_name] for _, node in ipairs(result.body) do @@ -408,7 +400,7 @@ function _M.connect(premature, consul_server, retry_delay) local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) if not post_ok then log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) + ", update all services error: ", post_err) end if dump_params then From 3e7589a1d8eb0d2c7e2638589bfe4474bf2d0d2d Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 31 Mar 2023 09:03:47 +0800 Subject: [PATCH 10/27] consul health endpoint --- apisix/discovery/consul/init.lua | 123 +++++++++++++++---------------- 1 file changed, 60 insertions(+), 63 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 90793fc1a42d..795d16bd18d5 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -344,78 +344,75 @@ function _M.connect(premature, consul_server, retry_delay) ", consul_server: ", json_delay_encode(consul_server, true)) -- if current index different last index then update service - if consul_server.catalog_index ~= catalog_res.headers['X-Consul-Index'] then - local up_services = core.table.new(0, #catalog_res.body) - for service_name, _ in pairs(catalog_res.body) do - -- check if the service_name is 'skip service' - if skip_service_map[service_name] then - goto CONTINUE - end - -- get node from service - local svc_url = consul_server.consul_sub_url .. "/" .. service_name - local result, get_err = consul_client:get(svc_url, {passing = true}) - local error_info = (get_err ~= nil and get_err) or - ((result ~= nil and result.status ~= 200) and result.status) - if error_info then - log.error("connect consul: ", consul_server.consul_server_url, - ", by service url: ", svc_url, ", with error: ", error_info) - goto CONTINUE - end + local up_services = core.table.new(0, #catalog_res.body) + for service_name, _ in pairs(catalog_res.body) do + -- check if the service_name is 'skip service' + if skip_service_map[service_name] then + goto CONTINUE + end + -- get node from service + local svc_url = consul_server.consul_sub_url .. "/" .. service_name + local result, get_err = consul_client:get(svc_url, {passing = true}) + local error_info = (get_err ~= nil and get_err) or + ((result ~= nil and result.status ~= 200) and result.status) + if error_info then + log.error("connect consul: ", consul_server.consul_server_url, + ", by service url: ", svc_url, ", with error: ", error_info) + goto CONTINUE + end - -- decode body, decode json, update service, error handling - if result.body then - log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) - -- add services to table - local nodes = up_services[service_name] - for _, node in ipairs(result.body) do - if not node.Service then - goto CONTINUE - end - local svc_address, svc_port = node.Service.Address, node.Service.Port - if not svc_address then - svc_address = node.Address - end - -- if nodes is nil, new nodes table and set to up_services - if not nodes then - nodes = core.table.new(1, 0) - up_services[service_name] = nodes - end - -- add node to nodes table - core.table.insert(nodes, { - host = svc_address, - port = tonumber(svc_port), - weight = default_weight, - }) + -- decode body, decode json, update service, error handling + if result.body then + log.notice("service url: ", svc_url, + ", header: ", json_delay_encode(result.headers, true), + ", body: ", json_delay_encode(result.body, true)) + -- add services to table + local nodes = up_services[service_name] + for _, node in ipairs(result.body) do + if not node.Service then + goto CONTINUE + end + local svc_address, svc_port = node.Service.Address, node.Service.Port + if not svc_address then + svc_address = node.Address + end + -- if nodes is nil, new nodes table and set to up_services + if not nodes then + nodes = core.table.new(1, 0) + up_services[service_name] = nodes end - up_services[service_name] = nodes + -- add node to nodes table + core.table.insert(nodes, { + host = svc_address, + port = tonumber(svc_port), + weight = default_weight, + }) end - :: CONTINUE :: + up_services[service_name] = nodes end + :: CONTINUE :: + end - update_all_services(consul_server.consul_server_url, up_services) + update_all_services(consul_server.consul_server_url, up_services) - --update events - local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) - if not post_ok then - log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) - end + --update events + local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) + if not post_ok then + log.error("post_event failure with ", events_list._source, + ", update all services error: ", post_err) + end - if dump_params then - ngx_timer_at(0, write_dump_services) - end + if dump_params then + ngx_timer_at(0, write_dump_services) + end - -- get health index - local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) - local health_index - if health_err == nil and (health_res ~= nil and health_res.status == 200) - and (consul_server.health_index ~= health_res.headers['X-Consul-Index']) then - health_index = health_res.headers['X-Consul-Index'] - end - update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) + -- get health index + local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) + local health_index + if health_err == nil and (health_res ~= nil and health_res.status == 200) then + health_index = health_res.headers['X-Consul-Index'] end + update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) check_keepalive(consul_server, retry_delay) end From 8f89e97a26d58aae0af3c0cb7061865c123bf92d Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Sat, 1 Apr 2023 15:33:00 +0800 Subject: [PATCH 11/27] add consul catalog change unit test case --- apisix/discovery/consul/init.lua | 13 +++--- t/discovery/consul.t | 75 ++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 795d16bd18d5..6d20ef0d7a3f 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -79,7 +79,7 @@ function _M.nodes(service_name) local resp_list = all_services[service_name] if not resp_list then - log.error("fetch nodes failed by ", service_name, ", return default service") + log.error("fetch nodes failed by ", service_name, ", return default service, all svcs: ", json_delay_encode(all_services, true)) return default_service and {default_service} end @@ -156,19 +156,22 @@ local function write_dump_services() if not succ then log.error("write dump into file got error: ", err) end + log.warn("_______ write dump file successfully: ", data) end local function show_dump_file() if not dump_params then + log.error("$$$$$$$$ dump_params is nil") return 503, "dump params is nil" end local data, err = util.read_file(dump_params.path) if not data then + log.error("$$$$$$$$ read dump file failed: ", err) return 503, err end - + log.error("$$$$$ dump file successfully") return 200, data end @@ -203,6 +206,8 @@ local function watch_catalog(consul_server) return default_catalog_error_index end + + log.warn("CCCCCCCCCCCCCC watch_catalog: ", watch_result.headers['X-Consul-Index']) return watch_result.headers['X-Consul-Index'] end @@ -227,6 +232,7 @@ local function watch_health(consul_server) return default_health_error_index end + log.warn("HHHHHHHHHHHHHHHH watch_result: ", watch_result.headers['X-Consul-Index']) return watch_result.headers['X-Consul-Index'] end @@ -373,9 +379,6 @@ function _M.connect(premature, consul_server, retry_delay) goto CONTINUE end local svc_address, svc_port = node.Service.Address, node.Service.Port - if not svc_address then - svc_address = node.Address - end -- if nodes is nil, new nodes table and set to up_services if not nodes then nodes = core.table.new(1, 0) diff --git a/t/discovery/consul.t b/t/discovery/consul.t index 39c5ab287b50..b1d1e03d22ca 100644 --- a/t/discovery/consul.t +++ b/t/discovery/consul.t @@ -576,3 +576,78 @@ GET /thc [{"host":"127.0.0.1","port":30513,"priority":0,"weight":1},{"host":"127.0.0.1","port":30514,"priority":0,"weight":1}] [{"host":"127.0.0.1","port":30513,"priority":0,"weight":1},{"host":"127.0.0.1","port":30514,"priority":0,"weight":1}] --- ignore_error_log + + + +=== TEST 13: test consul catalog service change +--- yaml_config +apisix: + node_listen: 1984 +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +discovery: + consul: + servers: + - "http://127.0.0.1:8500" + keepalive: false + fetch_interval: 3 + default_service: + host: "127.0.0.1" + port: 20999 +#END +--- apisix_yaml +routes: + - + uri: /* + upstream: + service_name: service_a + discovery_type: consul + type: roundrobin +#END +--- config +location /v1/agent { + proxy_pass http://127.0.0.1:8500; +} + +location /sleep { + content_by_lua_block { + local args = ngx.req.get_uri_args() + local sec = args.sec or "2" + ngx.sleep(tonumber(sec)) + ngx.say("ok") + } +} +--- timeout: 6 +--- request eval +[ + "PUT /v1/agent/service/deregister/service_a1", + "GET /sleep?sec=3", + "GET /hello", + "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://baidu.com\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "GET /sleep?sec=5", + "GET /hello", + "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://127.0.0.1:32400\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "GET /sleep?sec=5", + "GET /hello", + "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://127.0.0.1:8500\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "GET /sleep?sec=5", + "GET /hello", +] +--- response_body_like eval +[ + qr//, + qr/ok\n/, + qr/missing consul services\n/, + qr//, + qr/ok\n/, + qr/server 1\n/, + qr//, + qr/ok\n/, + qr/missing consul services\n/, + qr//, + qr/ok\n/, + qr/server 1\n/, +] +--- ignore_error_log From d4c37e47b7cfb1c016cccdb7bd0e3f4ddd7239d8 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Sun, 2 Apr 2023 15:19:19 +0800 Subject: [PATCH 12/27] consul health endpoint --- apisix/discovery/consul/init.lua | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 6d20ef0d7a3f..6621d3e61142 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -79,7 +79,7 @@ function _M.nodes(service_name) local resp_list = all_services[service_name] if not resp_list then - log.error("fetch nodes failed by ", service_name, ", return default service, all svcs: ", json_delay_encode(all_services, true)) + log.error("fetch nodes failed by ", service_name, ", return default service") return default_service and {default_service} end @@ -156,22 +156,18 @@ local function write_dump_services() if not succ then log.error("write dump into file got error: ", err) end - log.warn("_______ write dump file successfully: ", data) end local function show_dump_file() if not dump_params then - log.error("$$$$$$$$ dump_params is nil") return 503, "dump params is nil" end local data, err = util.read_file(dump_params.path) if not data then - log.error("$$$$$$$$ read dump file failed: ", err) return 503, err end - log.error("$$$$$ dump file successfully") return 200, data end @@ -207,7 +203,6 @@ local function watch_catalog(consul_server) return default_catalog_error_index end - log.warn("CCCCCCCCCCCCCC watch_catalog: ", watch_result.headers['X-Consul-Index']) return watch_result.headers['X-Consul-Index'] end @@ -232,7 +227,6 @@ local function watch_health(consul_server) return default_health_error_index end - log.warn("HHHHHHHHHHHHHHHH watch_result: ", watch_result.headers['X-Consul-Index']) return watch_result.headers['X-Consul-Index'] end From 9e3f4ca41534208ca4e63dccfacba7de4a8437c8 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 5 Apr 2023 00:00:05 +0800 Subject: [PATCH 13/27] consul health endpoint --- apisix/discovery/consul/init.lua | 140 ++++++++++++++++++------------- 1 file changed, 80 insertions(+), 60 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 6621d3e61142..cb5b30e1841b 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -227,6 +227,7 @@ local function watch_health(consul_server) return default_health_error_index end + return watch_result.headers['X-Consul-Index'] end @@ -320,14 +321,14 @@ function _M.connect(premature, consul_server, retry_delay) }) local catalog_res, catalog_err = consul_client:get(consul_server.consul_watch_catalog_url) - local watch_error_info = (catalog_err ~= nil and catalog_err) + local catalog_error_info = (catalog_err ~= nil and catalog_err) or ((catalog_res ~= nil and catalog_res.status ~= 200) and catalog_res.status) - if watch_error_info then + if catalog_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got catalog result: ", json_delay_encode(catalog_res, true), - ", with error: ", watch_error_info) + ", with error: ", catalog_error_info) retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") @@ -337,6 +338,24 @@ function _M.connect(premature, consul_server, retry_delay) return end + -- get health index + local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) + local health_error_info = (health_err ~= nil and health_err) + or ((health_res ~= nil and health_res.status ~= 200) + and health_res.status) + if health_error_info then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_health_url, + ", got health result: ", json_delay_encode(health_res, true), + ", with error: ", health_error_info) + + retry_delay = get_retry_delay(retry_delay) + log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") + core_sleep(retry_delay) + + goto ERROR + end + log.info("connect consul: ", consul_server.consul_server_url, ", catalog_result status: ", catalog_res.status, ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], @@ -344,73 +363,74 @@ function _M.connect(premature, consul_server, retry_delay) ", consul_server: ", json_delay_encode(consul_server, true)) -- if current index different last index then update service - local up_services = core.table.new(0, #catalog_res.body) - for service_name, _ in pairs(catalog_res.body) do - -- check if the service_name is 'skip service' - if skip_service_map[service_name] then - goto CONTINUE - end - -- get node from service - local svc_url = consul_server.consul_sub_url .. "/" .. service_name - local result, get_err = consul_client:get(svc_url, {passing = true}) - local error_info = (get_err ~= nil and get_err) or - ((result ~= nil and result.status ~= 200) and result.status) - if error_info then - log.error("connect consul: ", consul_server.consul_server_url, - ", by service url: ", svc_url, ", with error: ", error_info) - goto CONTINUE - end + if (consul_server.catalog_index ~= catalog_res.headers['X-Consul-Index']) + or (consul_server.health_index ~= health_res.headers['X-Consul-Index']) then + local up_services = core.table.new(0, #catalog_res.body) + for service_name, _ in pairs(catalog_res.body) do + -- check if the service_name is 'skip service' + if skip_service_map[service_name] then + goto CONTINUE + end + -- get node from service + local svc_url = consul_server.consul_sub_url .. "/" .. service_name + local result, get_err = consul_client:get(svc_url, {passing = true}) + local error_info = (get_err ~= nil and get_err) or + ((result ~= nil and result.status ~= 200) and result.status) + if error_info then + log.error("connect consul: ", consul_server.consul_server_url, + ", by service url: ", svc_url, ", with error: ", error_info) + goto CONTINUE + end - -- decode body, decode json, update service, error handling - if result.body then - log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) - -- add services to table - local nodes = up_services[service_name] - for _, node in ipairs(result.body) do - if not node.Service then - goto CONTINUE - end - local svc_address, svc_port = node.Service.Address, node.Service.Port - -- if nodes is nil, new nodes table and set to up_services - if not nodes then - nodes = core.table.new(1, 0) - up_services[service_name] = nodes + -- decode body, decode json, update service, error handling + if result.body then + log.notice("service url: ", svc_url, + ", header: ", json_delay_encode(result.headers, true), + ", body: ", json_delay_encode(result.body, true)) + -- add services to table + local nodes = up_services[service_name] + for _, node in ipairs(result.body) do + if not node.Service then + goto CONTINUE + end + local svc_address, svc_port = node.Service.Address, node.Service.Port + -- if nodes is nil, new nodes table and set to up_services + if not nodes then + nodes = core.table.new(1, 0) + up_services[service_name] = nodes + end + -- add node to nodes table + core.table.insert(nodes, { + host = svc_address, + port = tonumber(svc_port), + weight = default_weight, + }) end - -- add node to nodes table - core.table.insert(nodes, { - host = svc_address, - port = tonumber(svc_port), - weight = default_weight, - }) + up_services[service_name] = nodes end - up_services[service_name] = nodes + :: CONTINUE :: end - :: CONTINUE :: - end - update_all_services(consul_server.consul_server_url, up_services) + update_all_services(consul_server.consul_server_url, up_services) - --update events - local post_ok, post_err = events.post(events_list._source, events_list.updating, all_services) - if not post_ok then - log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) - end + --update events + local post_ok, post_err = events.post(events_list._source, + events_list.updating, all_services) + if not post_ok then + log.error("post_event failure with ", events_list._source, + ", update all services error: ", post_err) + end - if dump_params then - ngx_timer_at(0, write_dump_services) - end + if dump_params then + ngx_timer_at(0, write_dump_services) + end - -- get health index - local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) - local health_index - if health_err == nil and (health_res ~= nil and health_res.status == 200) then - health_index = health_res.headers['X-Consul-Index'] + update_index(consul_server, + catalog_res.headers['X-Consul-Index'], + health_res.headers['X-Consul-Index']) end - update_index(consul_server, catalog_res.headers['X-Consul-Index'], health_index) + :: ERROR :: check_keepalive(consul_server, retry_delay) end From 138fd3772d559421dbf0fd635aa153e45dce34a1 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 5 Apr 2023 22:37:11 +0800 Subject: [PATCH 14/27] consul health endpoint --- apisix/discovery/consul/init.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index cb5b30e1841b..6c3d6cf7fe05 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -363,8 +363,8 @@ function _M.connect(premature, consul_server, retry_delay) ", consul_server: ", json_delay_encode(consul_server, true)) -- if current index different last index then update service - if (consul_server.catalog_index ~= catalog_res.headers['X-Consul-Index']) - or (consul_server.health_index ~= health_res.headers['X-Consul-Index']) then + if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index'])) + or (consul_server.health_index ~= tonumber(health_res.headers['X-Consul-Index'])) then local up_services = core.table.new(0, #catalog_res.body) for service_name, _ in pairs(catalog_res.body) do -- check if the service_name is 'skip service' From c2d87fc93f9179c15702cf0efded3877d39a7093 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 7 Apr 2023 08:01:29 +0000 Subject: [PATCH 15/27] add pcall --- apisix/discovery/consul/init.lua | 44 +++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 6c3d6cf7fe05..755ebaa51b38 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -36,6 +36,7 @@ local thread_spawn = ngx.thread.spawn local thread_wait = ngx.thread.wait local thread_kill = ngx.thread.kill local math_random = math.random +local pcall = pcall local all_services = core.table.new(0, 5) local default_service @@ -51,6 +52,7 @@ local default_skip_services = {"consul"} local default_random_seed = 5 local default_catalog_error_index = -1 local default_health_error_index = -2 +local max_retry_time = 256 local _M = { version = 0.3, @@ -172,7 +174,7 @@ local function show_dump_file() end local function get_retry_delay(retry_delay) - if not retry_delay then + if not retry_delay or retry_delay >= max_retry_time then retry_delay = 1 else retry_delay = retry_delay * 4 @@ -202,7 +204,6 @@ local function watch_catalog(consul_server) return default_catalog_error_index end - return watch_result.headers['X-Consul-Index'] end @@ -227,7 +228,6 @@ local function watch_health(consul_server) return default_health_error_index end - return watch_result.headers['X-Consul-Index'] end @@ -293,7 +293,7 @@ function _M.connect(premature, consul_server, retry_delay) log.error("failed to spawn thread watch health: ", err) local random_delay = math_random(default_random_seed) log.warn("failed to spawn thread watch health, retry connecting consul after ", - random_delay, " seconds") + random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) @@ -301,8 +301,8 @@ function _M.connect(premature, consul_server, retry_delay) end local thread_wait_ok, wait_res = thread_wait(catalog_thread, health_thread) - thread_kill(health_thread) thread_kill(catalog_thread) + thread_kill(health_thread) if not thread_wait_ok then log.error("failed to wait thread: ", err, ", wait_res: ", wait_res) local random_delay = math_random(default_random_seed) @@ -320,11 +320,20 @@ function _M.connect(premature, consul_server, retry_delay) read_timeout = consul_server.read_timeout, }) - local catalog_res, catalog_err = consul_client:get(consul_server.consul_watch_catalog_url) + local catalog_success, catalog_res, catalog_err = pcall(function() + return consul_client:get(consul_server.consul_watch_catalog_url) + end) + if not catalog_success then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_catalog_url, + ", got catalog result: ", json_delay_encode(catalog_res, true)) + check_keepalive(consul_server, retry_delay) + return + end local catalog_error_info = (catalog_err ~= nil and catalog_err) or ((catalog_res ~= nil and catalog_res.status ~= 200) and catalog_res.status) - if catalog_error_info then + if catalog_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got catalog result: ", json_delay_encode(catalog_res, true), @@ -339,7 +348,16 @@ function _M.connect(premature, consul_server, retry_delay) end -- get health index - local health_res, health_err = consul_client:get(consul_server.consul_watch_health_url) + local success, health_res, health_err = pcall(function() + return consul_client:get(consul_server.consul_watch_health_url) + end) + if not success then + log.error("connect consul: ", consul_server.consul_server_url, + " by sub url: ", consul_server.consul_watch_health_url, + ", got health result: ", json_delay_encode(health_res, true)) + check_keepalive(consul_server, retry_delay) + return + end local health_error_info = (health_err ~= nil and health_err) or ((health_res ~= nil and health_res.status ~= 200) and health_res.status) @@ -373,10 +391,12 @@ function _M.connect(premature, consul_server, retry_delay) end -- get node from service local svc_url = consul_server.consul_sub_url .. "/" .. service_name - local result, get_err = consul_client:get(svc_url, {passing = true}) + local svc_success, result, get_err = pcall(function() + return consul_client:get(svc_url, {passing = true}) + end) local error_info = (get_err ~= nil and get_err) or ((result ~= nil and result.status ~= 200) and result.status) - if error_info then + if not svc_success or error_info then log.error("connect consul: ", consul_server.consul_server_url, ", by service url: ", svc_url, ", with error: ", error_info) goto CONTINUE @@ -418,7 +438,7 @@ function _M.connect(premature, consul_server, retry_delay) events_list.updating, all_services) if not post_ok then log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) + ", update all services error: ", post_err) end if dump_params then @@ -562,4 +582,4 @@ function _M.control_api() end -return _M +return _M \ No newline at end of file From ceabd236bf2907aca49b6e2af4cc9fd8f8fc874d Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 7 Apr 2023 17:19:41 +0800 Subject: [PATCH 16/27] remove space --- apisix/discovery/consul/init.lua | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 755ebaa51b38..f436fc31ae65 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -137,7 +137,7 @@ local function read_dump_services() local now_time = ngx.time() log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ", dump_params.expire, ", now_time: ", now_time) - if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then + if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then log.warn("dump file: ", dump_params.path, " had expired, ignored it") return end @@ -154,7 +154,7 @@ local function write_dump_services() expire = dump_params.expire, -- later need handle it } local data = core.json.encode(entity) - local succ, err = util.write_file(dump_params.path, data) + local succ, err = util.write_file(dump_params.path, data) if not succ then log.error("write dump into file got error: ", err) end @@ -270,6 +270,16 @@ local function update_index(consul_server, catalog_index, health_index) end end +local function is_not_empty(value) + if value == nil or value == null + or (type(value) == "table" and not next(value)) + or (type(value) == "string" and value == "") then + return false + end + + return true +end + function _M.connect(premature, consul_server, retry_delay) if premature then return @@ -333,7 +343,7 @@ function _M.connect(premature, consul_server, retry_delay) local catalog_error_info = (catalog_err ~= nil and catalog_err) or ((catalog_res ~= nil and catalog_res.status ~= 200) and catalog_res.status) - if catalog_error_info then + if catalog_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, ", got catalog result: ", json_delay_encode(catalog_res, true), @@ -403,13 +413,14 @@ function _M.connect(premature, consul_server, retry_delay) end -- decode body, decode json, update service, error handling - if result.body then + -- check result body is not nil and not empty + if is_not_empty(result.body) then log.notice("service url: ", svc_url, ", header: ", json_delay_encode(result.headers, true), ", body: ", json_delay_encode(result.body, true)) -- add services to table local nodes = up_services[service_name] - for _, node in ipairs(result.body) do + for _, node in ipairs(result.body) do if not node.Service then goto CONTINUE end @@ -582,4 +593,4 @@ function _M.control_api() end -return _M \ No newline at end of file +return _M From f43a7c8685fa044f33a660e49de1551f90f379b4 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Fri, 7 Apr 2023 17:24:11 +0800 Subject: [PATCH 17/27] remove space --- apisix/discovery/consul/init.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index f436fc31ae65..2d052965c189 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -449,7 +449,7 @@ function _M.connect(premature, consul_server, retry_delay) events_list.updating, all_services) if not post_ok then log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) + ", update all services error: ", post_err) end if dump_params then From 39ebd4b08702dcbbcbafc7be75d593dd55b838e3 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 12 Apr 2023 23:52:06 +0800 Subject: [PATCH 18/27] add double check index wether has change --- apisix/discovery/consul/init.lua | 149 ++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 54 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 2d052965c189..65ee1852dbf7 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -37,6 +37,7 @@ local thread_wait = ngx.thread.wait local thread_kill = ngx.thread.kill local math_random = math.random local pcall = pcall +local null = ngx.null local all_services = core.table.new(0, 5) local default_service @@ -52,6 +53,8 @@ local default_skip_services = {"consul"} local default_random_seed = 5 local default_catalog_error_index = -1 local default_health_error_index = -2 +local watch_type_catalog = 1 +local watch_type_health = 2 local max_retry_time = 256 local _M = { @@ -184,15 +187,30 @@ local function get_retry_delay(retry_delay) end local function watch_catalog(consul_server) - local c = resty_consul:new({ - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - default_args = consul_server.default_catalog_args, - }) + local opts + if consul_server.keepalive then + opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + default_args = { + wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 + index = consul_server.catalog_index, + }, + } + else + opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + } + end + local client = resty_consul:new(opts) - local watch_result, watch_err = c:get(consul_server.consul_watch_catalog_url) + ::RETRY:: + local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url) local watch_error_info = (watch_err ~= nil and watch_err) or ((watch_result ~= nil and watch_result.status ~= 200) and watch_result.status) @@ -202,21 +220,43 @@ local function watch_catalog(consul_server) ", got watch result: ", json_delay_encode(watch_result, true), ", with error: ", watch_error_info) - return default_catalog_error_index + return watch_type_catalog, default_catalog_error_index + end + if consul_server.catalog_index > 0 + and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then + local random_delay = math_random(default_random_seed) + log.warn("watch catalog has no change, retry call consul after ", random_delay, " seconds") + core_sleep(random_delay) + goto RETRY end - return watch_result.headers['X-Consul-Index'] + return watch_type_catalog, watch_result.headers['X-Consul-Index'] end local function watch_health(consul_server) - local c = resty_consul:new({ - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - default_args = consul_server.default_health_args, - }) + local opts + if consul_server.keepalive then + opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + default_args = { + wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 + index = consul_server.health_index, + }, + } + else + opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + } + end + local client = resty_consul:new(opts) - local watch_result, watch_err = c:get(consul_server.consul_watch_health_url) + ::RETRY:: + local watch_result, watch_err = client:get(consul_server.consul_watch_health_url) local watch_error_info = (watch_err ~= nil and watch_err) or ((watch_result ~= nil and watch_result.status ~= 200) and watch_result.status) @@ -226,9 +266,16 @@ local function watch_health(consul_server) ", got watch result: ", json_delay_encode(watch_result, true), ", with error: ", watch_error_info) - return default_health_error_index + return watch_type_health, default_health_error_index + end + if consul_server.health_index > 0 + and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then + local random_delay = math_random(default_random_seed) + log.warn("watch health has no change, retry call consul after ", random_delay, " seconds") + core_sleep(random_delay) + goto RETRY end - return watch_result.headers['X-Consul-Index'] + return watch_type_health, watch_result.headers['X-Consul-Index'] end local function check_keepalive(consul_server, retry_delay) @@ -241,7 +288,6 @@ local function check_keepalive(consul_server, retry_delay) end end - local function update_index(consul_server, catalog_index, health_index) local c_index = 0 local h_index = 0 @@ -255,18 +301,10 @@ local function update_index(consul_server, catalog_index, health_index) if c_index > 0 then consul_server.catalog_index = c_index - -- only long connect type use index - if consul_server.keepalive then - consul_server.default_catalog_args.index = c_index - end end if h_index > 0 then consul_server.health_index = h_index - -- only long connect type use index - if consul_server.keepalive then - consul_server.default_health_args.index = h_index - end end end @@ -280,6 +318,24 @@ local function is_not_empty(value) return true end +local function watch_result_is_valid(watch_type, index, catalog_index, health_index) + if index <= 0 then + return false + end + + if watch_type == watch_type_catalog then + if index == catalog_index then + return false + end + else + if index == health_index then + return false + end + end + + return true +end + function _M.connect(premature, consul_server, retry_delay) if premature then return @@ -310,11 +366,11 @@ function _M.connect(premature, consul_server, retry_delay) return end - local thread_wait_ok, wait_res = thread_wait(catalog_thread, health_thread) + local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, health_thread) thread_kill(catalog_thread) thread_kill(health_thread) if not thread_wait_ok then - log.error("failed to wait thread: ", err, ", wait_res: ", wait_res) + log.error("failed to wait thread: ", watch_type) local random_delay = math_random(default_random_seed) log.warn("failed to wait thread, retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) @@ -323,13 +379,19 @@ function _M.connect(premature, consul_server, retry_delay) return end + -- double check index has changed + if not watch_result_is_valid(tonumber(watch_type), + tonumber(index), consul_server.catalog_index, consul_server.health_index) then + check_keepalive(consul_server, retry_delay) + return + end + local consul_client = resty_consul:new({ host = consul_server.host, port = consul_server.port, connect_timeout = consul_server.connect_timeout, read_timeout = consul_server.read_timeout, }) - local catalog_success, catalog_res, catalog_err = pcall(function() return consul_client:get(consul_server.consul_watch_catalog_url) end) @@ -415,9 +477,6 @@ function _M.connect(premature, consul_server, retry_delay) -- decode body, decode json, update service, error handling -- check result body is not nil and not empty if is_not_empty(result.body) then - log.notice("service url: ", svc_url, - ", header: ", json_delay_encode(result.headers, true), - ", body: ", json_delay_encode(result.body, true)) -- add services to table local nodes = up_services[service_name] for _, node in ipairs(result.body) do @@ -468,21 +527,6 @@ end local function format_consul_params(consul_conf) local consul_server_list = core.table.new(0, #consul_conf.servers) - local catalog_args, health_args - - if consul_conf.keepalive == false then - catalog_args = {} - health_args = {} - elseif consul_conf.keepalive then - catalog_args = { - wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0 - index = 0, - } - health_args = { - wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0 - index = 0, - } - end for _, v in pairs(consul_conf.servers) do local scheme, host, port, path = unpack(http.parse_uri(nil, v)) @@ -491,26 +535,23 @@ local function format_consul_params(consul_conf) elseif path ~= "/" or core.string.has_suffix(v, '/') then return nil, "invalid consul server address, the valid format: http://address:port" end - core.table.insert(consul_server_list, { host = host, port = port, connect_timeout = consul_conf.timeout.connect, read_timeout = consul_conf.timeout.read, + wait_timeout = consul_conf.timeout.wait, consul_watch_catalog_url = "/catalog/services", consul_sub_url = "/health/service", consul_watch_health_url = "/health/state/any", consul_server_url = v .. "/v1", weight = consul_conf.weight, keepalive = consul_conf.keepalive, - default_catalog_args = catalog_args, - default_health_args = health_args, health_index = 0, catalog_index = 0, fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul }) end - return consul_server_list, nil end From 1db097a9a6abc93a144c211ceac4a21f41f4ffda Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 13 Apr 2023 03:04:23 +0000 Subject: [PATCH 19/27] fix lint error --- apisix/discovery/consul/init.lua | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 65ee1852dbf7..f75fd648d767 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -38,6 +38,8 @@ local thread_kill = ngx.thread.kill local math_random = math.random local pcall = pcall local null = ngx.null +local type = type +local next = next local all_services = core.table.new(0, 5) local default_service From 72412fd8da7c8150577d20251891bf27dd69ad53 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Mon, 17 Apr 2023 23:28:20 +0800 Subject: [PATCH 20/27] fix test unit error --- apisix/discovery/consul/init.lua | 5 +++++ t/discovery/consul.t | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index f75fd648d767..cbbf52912822 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -384,6 +384,11 @@ function _M.connect(premature, consul_server, retry_delay) -- double check index has changed if not watch_result_is_valid(tonumber(watch_type), tonumber(index), consul_server.catalog_index, consul_server.health_index) then + + retry_delay = get_retry_delay(retry_delay) + log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") + core_sleep(retry_delay) + check_keepalive(consul_server, retry_delay) return end diff --git a/t/discovery/consul.t b/t/discovery/consul.t index ed0e0cdad4cc..57a6ab5b4fbd 100644 --- a/t/discovery/consul.t +++ b/t/discovery/consul.t @@ -631,13 +631,13 @@ location /sleep { "PUT /v1/agent/service/deregister/service_a1", "GET /sleep?sec=3", "GET /hello", - "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://baidu.com\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "PUT /v1/agent/service/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", "GET /sleep?sec=5", "GET /hello", - "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://127.0.0.1:32400\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "PUT /v1/agent/service/deregister/service_a1", "GET /sleep?sec=5", "GET /hello", - "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": \"http://127.0.0.1:8500\",\"interval\": \"1s\", \"checkID\":\"service_a1_check\",\"name\":\"service_a_check_name\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", + "PUT /v1/agent/service/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}", "GET /sleep?sec=5", "GET /hello", ] From b5485ecd76d753df311b815142d6459a1bbb707c Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 26 Apr 2023 05:48:45 +0000 Subject: [PATCH 21/27] fix cr --- apisix/discovery/consul/init.lua | 70 ++++++++++++-------------------- 1 file changed, 27 insertions(+), 43 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index cbbf52912822..da27ebbb9d10 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -188,28 +188,33 @@ local function get_retry_delay(retry_delay) return retry_delay end -local function watch_catalog(consul_server) - local opts +local function get_opts(consul_server, is_catalog) + local opts = { + host = consul_server.host, + port = consul_server.port, + connect_timeout = consul_server.connect_timeout, + read_timeout = consul_server.read_timeout, + } if consul_server.keepalive then - opts = { - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - default_args = { + if is_catalog then + opts.default_args = { wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 index = consul_server.catalog_index, }, - } - else - opts = { - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - } + else + opts.default_args = { + wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 + index = consul_server.health_index, + }, + end end - local client = resty_consul:new(opts) + + return opts +end + +local function watch_catalog(consul_server) + + local client = resty_consul:new(get_opts(consul_server, true)) ::RETRY:: local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url) @@ -235,27 +240,8 @@ local function watch_catalog(consul_server) end local function watch_health(consul_server) - local opts - if consul_server.keepalive then - opts = { - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - default_args = { - wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 - index = consul_server.health_index, - }, - } - else - opts = { - host = consul_server.host, - port = consul_server.port, - connect_timeout = consul_server.connect_timeout, - read_timeout = consul_server.read_timeout, - } - end - local client = resty_consul:new(opts) + + local client = resty_consul:new(get_opts(consul_server, false)) ::RETRY:: local watch_result, watch_err = client:get(consul_server.consul_watch_health_url) @@ -345,9 +331,8 @@ function _M.connect(premature, consul_server, retry_delay) local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server) if not catalog_thread then - log.error("failed to spawn thread watch catalog: ", spawn_catalog_err) local random_delay = math_random(default_random_seed) - log.warn("failed to spawn thread watch catalog, retry connecting consul after ", + log.error("failed to spawn thread watch catalog: ", spawn_catalog_err, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) @@ -358,10 +343,9 @@ function _M.connect(premature, consul_server, retry_delay) local health_thread, err = thread_spawn(watch_health, consul_server) if not health_thread then thread_kill(catalog_thread) - log.error("failed to spawn thread watch health: ", err) local random_delay = math_random(default_random_seed) - log.warn("failed to spawn thread watch health, retry connecting consul after ", - random_delay, " seconds") + log.error("failed to spawn thread watch health: ", err, ", retry connecting consul after ", + random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) From 715a360a09418ba7a97ca1c8ef01c5ed867f66a3 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 26 Apr 2023 05:50:58 +0000 Subject: [PATCH 22/27] fix cr --- apisix/discovery/consul/init.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index da27ebbb9d10..7d167400d082 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -232,7 +232,7 @@ local function watch_catalog(consul_server) if consul_server.catalog_index > 0 and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_seed) - log.warn("watch catalog has no change, retry call consul after ", random_delay, " seconds") + log.info("watch catalog has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY end @@ -259,7 +259,7 @@ local function watch_health(consul_server) if consul_server.health_index > 0 and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_seed) - log.warn("watch health has no change, retry call consul after ", random_delay, " seconds") + log.info("watch health has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY end From d7ce821ab2b684686673015df2678b280f16e7ff Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 27 Apr 2023 17:46:09 +0800 Subject: [PATCH 23/27] fix cr --- apisix/discovery/consul/init.lua | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 7d167400d082..c275a445e0cc 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -200,12 +200,12 @@ local function get_opts(consul_server, is_catalog) opts.default_args = { wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 index = consul_server.catalog_index, - }, + } else opts.default_args = { wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 index = consul_server.health_index, - }, + } end end @@ -213,7 +213,7 @@ local function get_opts(consul_server, is_catalog) end local function watch_catalog(consul_server) - + local client = resty_consul:new(get_opts(consul_server, true)) ::RETRY:: @@ -332,8 +332,8 @@ function _M.connect(premature, consul_server, retry_delay) local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server) if not catalog_thread then local random_delay = math_random(default_random_seed) - log.error("failed to spawn thread watch catalog: ", spawn_catalog_err, ", retry connecting consul after ", - random_delay, " seconds") + log.error("failed to spawn thread watch catalog: ", spawn_catalog_err, + ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) From 1c50e52c663d83c483d3ec8581c58ba1893bf43e Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Sun, 30 Apr 2023 11:44:49 +0800 Subject: [PATCH 24/27] fix cr --- apisix/discovery/consul/init.lua | 2 -- 1 file changed, 2 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index c275a445e0cc..2580506ebfbb 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -213,7 +213,6 @@ local function get_opts(consul_server, is_catalog) end local function watch_catalog(consul_server) - local client = resty_consul:new(get_opts(consul_server, true)) ::RETRY:: @@ -240,7 +239,6 @@ local function watch_catalog(consul_server) end local function watch_health(consul_server) - local client = resty_consul:new(get_opts(consul_server, false)) ::RETRY:: From e35577e261d5b3f5a0f8a461a909e8f819973e44 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Wed, 3 May 2023 18:11:49 +0800 Subject: [PATCH 25/27] fix cr --- apisix/discovery/consul/init.lua | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 2580506ebfbb..e29b443576f8 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -178,6 +178,7 @@ local function show_dump_file() return 200, data end + local function get_retry_delay(retry_delay) if not retry_delay or retry_delay >= max_retry_time then retry_delay = 1 @@ -188,6 +189,7 @@ local function get_retry_delay(retry_delay) return retry_delay end + local function get_opts(consul_server, is_catalog) local opts = { host = consul_server.host, @@ -212,6 +214,7 @@ local function get_opts(consul_server, is_catalog) return opts end + local function watch_catalog(consul_server) local client = resty_consul:new(get_opts(consul_server, true)) @@ -228,6 +231,7 @@ local function watch_catalog(consul_server) return watch_type_catalog, default_catalog_error_index end + if consul_server.catalog_index > 0 and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_seed) @@ -235,9 +239,11 @@ local function watch_catalog(consul_server) core_sleep(random_delay) goto RETRY end + return watch_type_catalog, watch_result.headers['X-Consul-Index'] end + local function watch_health(consul_server) local client = resty_consul:new(get_opts(consul_server, false)) @@ -254,6 +260,7 @@ local function watch_health(consul_server) return watch_type_health, default_health_error_index end + if consul_server.health_index > 0 and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then local random_delay = math_random(default_random_seed) @@ -261,9 +268,11 @@ local function watch_health(consul_server) core_sleep(random_delay) goto RETRY end + return watch_type_health, watch_result.headers['X-Consul-Index'] end + local function check_keepalive(consul_server, retry_delay) if consul_server.keepalive then local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay) @@ -274,6 +283,7 @@ local function check_keepalive(consul_server, retry_delay) end end + local function update_index(consul_server, catalog_index, health_index) local c_index = 0 local h_index = 0 @@ -294,6 +304,7 @@ local function update_index(consul_server, catalog_index, health_index) end end + local function is_not_empty(value) if value == nil or value == null or (type(value) == "table" and not next(value)) @@ -304,6 +315,7 @@ local function is_not_empty(value) return true end + local function watch_result_is_valid(watch_type, index, catalog_index, health_index) if index <= 0 then return false @@ -322,6 +334,7 @@ local function watch_result_is_valid(watch_type, index, catalog_index, health_in return true end + function _M.connect(premature, consul_server, retry_delay) if premature then return @@ -354,9 +367,9 @@ function _M.connect(premature, consul_server, retry_delay) thread_kill(catalog_thread) thread_kill(health_thread) if not thread_wait_ok then - log.error("failed to wait thread: ", watch_type) local random_delay = math_random(default_random_seed) - log.warn("failed to wait thread, retry connecting consul after ", random_delay, " seconds") + log.error("failed to wait thread: ", watch_type, ", retry connecting consul after ", + random_delay, " seconds") core_sleep(random_delay) check_keepalive(consul_server, retry_delay) @@ -366,7 +379,6 @@ function _M.connect(premature, consul_server, retry_delay) -- double check index has changed if not watch_result_is_valid(tonumber(watch_type), tonumber(index), consul_server.catalog_index, consul_server.health_index) then - retry_delay = get_retry_delay(retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) @@ -432,7 +444,8 @@ function _M.connect(premature, consul_server, retry_delay) log.warn("get all svcs got err, retry connecting consul after ", retry_delay, " seconds") core_sleep(retry_delay) - goto ERROR + check_keepalive(consul_server, retry_delay) + return end log.info("connect consul: ", consul_server.consul_server_url, @@ -441,7 +454,7 @@ function _M.connect(premature, consul_server, retry_delay) ", consul_server.index: ", consul_server.index, ", consul_server: ", json_delay_encode(consul_server, true)) - -- if current index different last index then update service + -- if the current index is different from the last index, then update the service if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index'])) or (consul_server.health_index ~= tonumber(health_res.headers['X-Consul-Index'])) then local up_services = core.table.new(0, #catalog_res.body) @@ -450,6 +463,7 @@ function _M.connect(premature, consul_server, retry_delay) if skip_service_map[service_name] then goto CONTINUE end + -- get node from service local svc_url = consul_server.consul_sub_url .. "/" .. service_name local svc_success, result, get_err = pcall(function() @@ -472,6 +486,7 @@ function _M.connect(premature, consul_server, retry_delay) if not node.Service then goto CONTINUE end + local svc_address, svc_port = node.Service.Address, node.Service.Port -- if nodes is nil, new nodes table and set to up_services if not nodes then @@ -509,7 +524,6 @@ function _M.connect(premature, consul_server, retry_delay) health_res.headers['X-Consul-Index']) end - :: ERROR :: check_keepalive(consul_server, retry_delay) end From a590840fb9661444cae4d05886e72e9f4311188d Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 4 May 2023 09:54:23 +0800 Subject: [PATCH 26/27] fix cr --- apisix/discovery/consul/init.lua | 48 +++++++++++++++++--------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index e29b443576f8..fd274f8621f5 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -175,6 +175,7 @@ local function show_dump_file() if not data then return 503, err end + return 200, data end @@ -197,18 +198,20 @@ local function get_opts(consul_server, is_catalog) connect_timeout = consul_server.connect_timeout, read_timeout = consul_server.read_timeout, } - if consul_server.keepalive then - if is_catalog then - opts.default_args = { - wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 - index = consul_server.catalog_index, - } - else - opts.default_args = { - wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 - index = consul_server.health_index, - } - end + if not consul_server.keepalive then + return opts + end + + if is_catalog then + opts.default_args = { + wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 + index = consul_server.catalog_index, + } + else + opts.default_args = { + wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0 + index = consul_server.health_index, + } end return opts @@ -221,12 +224,12 @@ local function watch_catalog(consul_server) ::RETRY:: local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url) local watch_error_info = (watch_err ~= nil and watch_err) - or ((watch_result ~= nil and watch_result.status ~= 200) - and watch_result.status) + or ((watch_result ~= nil and watch_result.status ~= 200) + and watch_result.status) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, - ", got watch result: ", json_delay_encode(watch_result, true), + ", got watch result: ", json_delay_encode(watch_result), ", with error: ", watch_error_info) return watch_type_catalog, default_catalog_error_index @@ -255,7 +258,7 @@ local function watch_health(consul_server) if watch_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, - ", got watch result: ", json_delay_encode(watch_result, true), + ", got watch result: ", json_delay_encode(watch_result), ", with error: ", watch_error_info) return watch_type_health, default_health_error_index @@ -308,7 +311,8 @@ end local function is_not_empty(value) if value == nil or value == null or (type(value) == "table" and not next(value)) - or (type(value) == "string" and value == "") then + or (type(value) == "string" and value == "") + then return false end @@ -399,7 +403,7 @@ function _M.connect(premature, consul_server, retry_delay) if not catalog_success then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, - ", got catalog result: ", json_delay_encode(catalog_res, true)) + ", got catalog result: ", json_delay_encode(catalog_res)) check_keepalive(consul_server, retry_delay) return end @@ -409,7 +413,7 @@ function _M.connect(premature, consul_server, retry_delay) if catalog_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_catalog_url, - ", got catalog result: ", json_delay_encode(catalog_res, true), + ", got catalog result: ", json_delay_encode(catalog_res), ", with error: ", catalog_error_info) retry_delay = get_retry_delay(retry_delay) @@ -427,7 +431,7 @@ function _M.connect(premature, consul_server, retry_delay) if not success then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, - ", got health result: ", json_delay_encode(health_res, true)) + ", got health result: ", json_delay_encode(health_res)) check_keepalive(consul_server, retry_delay) return end @@ -437,7 +441,7 @@ function _M.connect(premature, consul_server, retry_delay) if health_error_info then log.error("connect consul: ", consul_server.consul_server_url, " by sub url: ", consul_server.consul_watch_health_url, - ", got health result: ", json_delay_encode(health_res, true), + ", got health result: ", json_delay_encode(health_res), ", with error: ", health_error_info) retry_delay = get_retry_delay(retry_delay) @@ -452,7 +456,7 @@ function _M.connect(premature, consul_server, retry_delay) ", catalog_result status: ", catalog_res.status, ", catalog_result.headers.index: ", catalog_res.headers['X-Consul-Index'], ", consul_server.index: ", consul_server.index, - ", consul_server: ", json_delay_encode(consul_server, true)) + ", consul_server: ", json_delay_encode(consul_server)) -- if the current index is different from the last index, then update the service if (consul_server.catalog_index ~= tonumber(catalog_res.headers['X-Consul-Index'])) From 7f4611394bf021a85519bd0669208c01fab31100 Mon Sep 17 00:00:00 2001 From: Fabriceli Date: Thu, 4 May 2023 09:56:23 +0800 Subject: [PATCH 27/27] fix cr --- apisix/discovery/consul/init.lua | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index fd274f8621f5..ae1e4c64cc9c 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -52,7 +52,7 @@ local events_list local consul_services local default_skip_services = {"consul"} -local default_random_seed = 5 +local default_random_range = 5 local default_catalog_error_index = -1 local default_health_error_index = -2 local watch_type_catalog = 1 @@ -237,7 +237,7 @@ local function watch_catalog(consul_server) if consul_server.catalog_index > 0 and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then - local random_delay = math_random(default_random_seed) + local random_delay = math_random(default_random_range) log.info("watch catalog has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY @@ -266,7 +266,7 @@ local function watch_health(consul_server) if consul_server.health_index > 0 and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then - local random_delay = math_random(default_random_seed) + local random_delay = math_random(default_random_range) log.info("watch health has no change, re-watch consul after ", random_delay, " seconds") core_sleep(random_delay) goto RETRY @@ -346,7 +346,7 @@ function _M.connect(premature, consul_server, retry_delay) local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server) if not catalog_thread then - local random_delay = math_random(default_random_seed) + local random_delay = math_random(default_random_range) log.error("failed to spawn thread watch catalog: ", spawn_catalog_err, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) @@ -358,7 +358,7 @@ function _M.connect(premature, consul_server, retry_delay) local health_thread, err = thread_spawn(watch_health, consul_server) if not health_thread then thread_kill(catalog_thread) - local random_delay = math_random(default_random_seed) + local random_delay = math_random(default_random_range) log.error("failed to spawn thread watch health: ", err, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay) @@ -371,7 +371,7 @@ function _M.connect(premature, consul_server, retry_delay) thread_kill(catalog_thread) thread_kill(health_thread) if not thread_wait_ok then - local random_delay = math_random(default_random_seed) + local random_delay = math_random(default_random_range) log.error("failed to wait thread: ", watch_type, ", retry connecting consul after ", random_delay, " seconds") core_sleep(random_delay)