Skip to content

Commit

Permalink
fix: fix fetch all service info from consul (#8651)
Browse files Browse the repository at this point in the history
Co-authored-by: Fabriceli <li842162578@gmail.com>
  • Loading branch information
Fabriceli and Fabriceli authored Feb 8, 2023
1 parent 5984321 commit bbff579
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 83 deletions.
159 changes: 82 additions & 77 deletions apisix/discovery/consul/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ local events
local events_list
local consul_services

local default_skip_services = {"consul"}

local _M = {
version = 0.1,
version = 0.2,
}


local function discovery_consul_callback(data, event, source, pid)
all_services = data
log.notice("update local variable all_services, event is: ", event,
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
end


Expand All @@ -75,44 +77,15 @@ function _M.nodes(service_name)
end

log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ",
json_delay_encode(resp_list, true))
json_delay_encode(resp_list, true))

return resp_list
end


local function parse_instance(node)
local service_name, host, port = node.Service, node.Address, node.Port
-- if exist, skip special service name
if service_name and skip_service_map[service_name] then
return false
end
-- "" means metadata of the service
return true, host, tonumber(port), "", service_name
end


local function update_all_services(server_name_prefix, data)
local up_services = core.table.new(0, #data)
local weight = default_weight
for _, node in pairs(data) do
local succ, ip, port, metadata, server_name = parse_instance(node)
if succ then
local nodes = up_services[server_name]
if not nodes then
nodes = core.table.new(1, 0)
up_services[server_name] = nodes
end
core.table.insert(nodes, {
host = ip,
port = port,
weight = metadata and metadata.weight or weight,
})
end
end

local function update_all_services(consul_server_url, up_services)
-- clean old unused data
local old_services = consul_services[server_name_prefix] or {}
local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
all_services[k] = nil
end
Expand All @@ -121,7 +94,7 @@ local function update_all_services(server_name_prefix, data)
for k, v in pairs(up_services) do
all_services[k] = v
end
consul_services[server_name_prefix] = up_services
consul_services[consul_server_url] = up_services

log.info("update all services: ", json_delay_encode(all_services, true))
end
Expand Down Expand Up @@ -154,7 +127,7 @@ local function read_dump_services()

local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
dump_params.expire, ", now_time: ", now_time)
dump_params.expire, ", now_time: ", now_time)
if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
Expand Down Expand Up @@ -223,9 +196,9 @@ function _M.connect(premature, consul_server, retry_delay)
and watch_result.status)
if watch_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
Expand All @@ -235,55 +208,83 @@ function _M.connect(premature, consul_server, retry_delay)
end

log.info("connect consul: ", consul_server.consul_server_url,
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))

-- if current index different last index then update service
if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
local up_services = core.table.new(0, #watch_result.body)
local consul_client_svc = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
})
for service_name, _ in pairs(watch_result.body) do
-- check if the service_name is 'skip service'
if skip_service_map[service_name] then
goto CONTINUE
end
-- get node from service
local svc_url = consul_server.consul_sub_url .. "/" .. service_name
local result, err = consul_client_svc:get(svc_url)
local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
", by service url: ", svc_url, ", with error: ", error_info)
goto CONTINUE
end

-- fetch all services info
local result, err = consul_client:get(consul_server.consul_sub_url)

local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
-- decode body, decode json, update service, error handling
if result.body then
log.notice("service url: ", svc_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
-- add services to table
local nodes = up_services[service_name]
for _, node in ipairs(result.body) do
local svc_address, svc_port = node.ServiceAddress, node.ServicePort
if not svc_address then
svc_address = node.Address
end
-- if nodes is nil, new nodes table and set to up_services
if not nodes then
nodes = core.table.new(1, 0)
up_services[service_name] = nodes
end
-- add node to nodes table
core.table.insert(nodes, {
host = svc_address,
port = tonumber(svc_port),
weight = default_weight,
})
end
up_services[service_name] = nodes
end
:: CONTINUE ::
end

if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_sub_url,
", got result: ", json_delay_encode(result, true),
", with error: ", error_info)
update_all_services(consul_server.consul_server_url, up_services)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
--update events
local ok, post_err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end

goto ERR
if dump_params then
ngx_timer_at(0, write_dump_services)
end

consul_server.index = watch_result.headers['X-Consul-Index']
-- only long connect type use index
if consul_server.keepalive then
consul_server.default_args.index = watch_result.headers['X-Consul-Index']
end
-- decode body, decode json, update service, error handling
if result.body then
log.notice("server_name: ", consul_server.consul_server_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
update_all_services(consul_server.consul_server_url, result.body)
--update events
local ok, err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", err)
end

if dump_params then
ngx_timer_at(0, write_dump_services)
end
end
end

:: ERR ::
Expand Down Expand Up @@ -324,7 +325,7 @@ local function format_consul_params(consul_conf)
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
consul_sub_url = "/agent/services",
consul_sub_url = "/catalog/service",
consul_watch_sub_url = "/catalog/services",
consul_server_url = v .. "/v1",
weight = consul_conf.weight,
Expand Down Expand Up @@ -375,10 +376,14 @@ function _M.init_worker()
skip_service_map[v] = true
end
end
-- set up default skip service
for _, v in ipairs(default_skip_services) do
skip_service_map[v] = true
end

local consul_servers_list, err = format_consul_params(consul_conf)
if err then
error(err)
error("format consul config got error: " .. err)
end
log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true))

Expand Down
7 changes: 4 additions & 3 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ nginx_config: # config for render the template to generate n
# path: "logs/consul_kv.dump"
# expire: 2592000 # unit sec, here is 30 day
# consul:
# servers:
# - "http://127.0.0.1:8500"
# servers: # make sure service name is unique in these consul servers
# - "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
# - "http://127.0.0.1:8600"
# skip_services: # if you need to skip special services
# - "service_a"
# - "service_a" # `consul` service is default skip service
# timeout:
# connect: 2000 # default 2000 ms
# read: 2000 # default 2000 ms
Expand All @@ -327,6 +327,7 @@ nginx_config: # config for render the template to generate n
# dump: # if you need, when registered nodes updated can dump into file
# path: "logs/consul.dump"
# expire: 2592000 # unit sec, here is 30 day
# load_on_init: true # default true, load the consul dump file on init
# kubernetes:
# ### kubernetes service discovery both support single-cluster and multi-cluster mode
# ### applicable to the case where the service is distributed in a single or multiple kubernetes clusters.
Expand Down
4 changes: 2 additions & 2 deletions docs/en/latest/discovery/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ First of all, we need to add following configuration in `conf/config.yaml` :
discovery:
consul:
servers: # make sure service name is unique in these consul servers
- "http://127.0.0.1:8500"
- "http://127.0.0.1:8600" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
- "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
- "http://127.0.0.1:8600" # `consul` service is default skip service
skip_services: # if you need to skip special services
- "service_a"
timeout:
Expand Down
2 changes: 1 addition & 1 deletion t/discovery/consul_dump.t
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ discovery:
- "http://127.0.0.1:8500"
dump:
path: "consul.dump"
load_on_init: true
load_on_init: false
--- config
location /t {
content_by_lua_block {
Expand Down

0 comments on commit bbff579

Please sign in to comment.