Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix fetch all service info from consul #8651

Merged
merged 15 commits into from
Feb 8, 2023
159 changes: 82 additions & 77 deletions apisix/discovery/consul/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ local events
local events_list
local consul_services

local default_skip_services = {"consul"}

local _M = {
version = 0.1,
version = 0.2,
}


local function discovery_consul_callback(data, event, source, pid)
all_services = data
log.notice("update local variable all_services, event is: ", event,
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
end


Expand All @@ -75,44 +77,15 @@ function _M.nodes(service_name)
end

log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ",
json_delay_encode(resp_list, true))
json_delay_encode(resp_list, true))

return resp_list
end


local function parse_instance(node)
local service_name, host, port = node.Service, node.Address, node.Port
-- if exist, skip special service name
if service_name and skip_service_map[service_name] then
return false
end
-- "" means metadata of the service
return true, host, tonumber(port), "", service_name
end


local function update_all_services(server_name_prefix, data)
local up_services = core.table.new(0, #data)
local weight = default_weight
for _, node in pairs(data) do
local succ, ip, port, metadata, server_name = parse_instance(node)
if succ then
local nodes = up_services[server_name]
if not nodes then
nodes = core.table.new(1, 0)
up_services[server_name] = nodes
end
core.table.insert(nodes, {
host = ip,
port = port,
weight = metadata and metadata.weight or weight,
})
end
end

local function update_all_services(consul_server_url, up_services)
-- clean old unused data
local old_services = consul_services[server_name_prefix] or {}
local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
all_services[k] = nil
end
Expand All @@ -121,7 +94,7 @@ local function update_all_services(server_name_prefix, data)
for k, v in pairs(up_services) do
all_services[k] = v
end
consul_services[server_name_prefix] = up_services
consul_services[consul_server_url] = up_services

log.info("update all services: ", json_delay_encode(all_services, true))
end
Expand Down Expand Up @@ -154,7 +127,7 @@ local function read_dump_services()

local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
dump_params.expire, ", now_time: ", now_time)
dump_params.expire, ", now_time: ", now_time)
if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
Expand Down Expand Up @@ -223,9 +196,9 @@ function _M.connect(premature, consul_server, retry_delay)
and watch_result.status)
if watch_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
Expand All @@ -235,55 +208,83 @@ function _M.connect(premature, consul_server, retry_delay)
end

log.info("connect consul: ", consul_server.consul_server_url,
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))

-- if current index different last index then update service
if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
local up_services = core.table.new(0, #watch_result.body)
local consul_client_svc = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
Fabriceli marked this conversation as resolved.
Show resolved Hide resolved
})
for service_name, _ in pairs(watch_result.body) do
-- check if the service_name is 'skip service'
if skip_service_map[service_name] then
goto CONTINUE
end
-- get node from service
local svc_url = consul_server.consul_sub_url .. "/" .. service_name
local result, err = consul_client_svc:get(svc_url)
local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
", by service url: ", svc_url, ", with error: ", error_info)
goto CONTINUE
end

-- fetch all services info
local result, err = consul_client:get(consul_server.consul_sub_url)

local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
-- decode body, decode json, update service, error handling
if result.body then
log.notice("service url: ", svc_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
-- add services to table
local nodes = up_services[service_name]
for _, node in ipairs(result.body) do
local svc_address, svc_port = node.ServiceAddress, node.ServicePort
if not svc_address then
svc_address = node.Address
end
-- if nodes is nil, new nodes table and set to up_services
if not nodes then
nodes = core.table.new(1, 0)
up_services[service_name] = nodes
end
-- add node to nodes table
core.table.insert(nodes, {
host = svc_address,
port = tonumber(svc_port),
weight = default_weight,
})
end
up_services[service_name] = nodes
end
:: CONTINUE ::
end

if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_sub_url,
", got result: ", json_delay_encode(result, true),
", with error: ", error_info)
update_all_services(consul_server.consul_server_url, up_services)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
--update events
local ok, post_err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end

goto ERR
if dump_params then
ngx_timer_at(0, write_dump_services)
end

consul_server.index = watch_result.headers['X-Consul-Index']
-- only long connect type use index
if consul_server.keepalive then
consul_server.default_args.index = watch_result.headers['X-Consul-Index']
end
-- decode body, decode json, update service, error handling
if result.body then
log.notice("server_name: ", consul_server.consul_server_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
update_all_services(consul_server.consul_server_url, result.body)
--update events
local ok, err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", err)
end

if dump_params then
ngx_timer_at(0, write_dump_services)
end
end
end

:: ERR ::
Expand Down Expand Up @@ -324,7 +325,7 @@ local function format_consul_params(consul_conf)
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
consul_sub_url = "/agent/services",
consul_sub_url = "/catalog/service",
consul_watch_sub_url = "/catalog/services",
consul_server_url = v .. "/v1",
weight = consul_conf.weight,
Expand Down Expand Up @@ -375,10 +376,14 @@ function _M.init_worker()
skip_service_map[v] = true
end
end
-- set up default skip service
for _, v in ipairs(default_skip_services) do
skip_service_map[v] = true
end

local consul_servers_list, err = format_consul_params(consul_conf)
if err then
error(err)
error("format consul config got error: " .. err)
end
log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true))

Expand Down
7 changes: 4 additions & 3 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ nginx_config: # config for render the template to generate n
# path: "logs/consul_kv.dump"
# expire: 2592000 # unit sec, here is 30 day
# consul:
# servers:
# - "http://127.0.0.1:8500"
# servers: # make sure service name is unique in these consul servers
# - "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
# - "http://127.0.0.1:8600"
# skip_services: # if you need to skip special services
# - "service_a"
# - "service_a" # `consul` service is default skip service
# timeout:
# connect: 2000 # default 2000 ms
# read: 2000 # default 2000 ms
Expand All @@ -340,6 +340,7 @@ nginx_config: # config for render the template to generate n
# dump: # if you need, when registered nodes updated can dump into file
# path: "logs/consul.dump"
# expire: 2592000 # unit sec, here is 30 day
# load_on_init: true # default true, load the consul dump file on init
# kubernetes:
# ### kubernetes service discovery both support single-cluster and multi-cluster mode
# ### applicable to the case where the service is distributed in a single or multiple kubernetes clusters.
Expand Down
4 changes: 2 additions & 2 deletions docs/en/latest/discovery/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ First of all, we need to add following configuration in `conf/config.yaml` :
discovery:
consul:
servers: # make sure service name is unique in these consul servers
- "http://127.0.0.1:8500"
- "http://127.0.0.1:8600" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
- "http://127.0.0.1:8500" # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
- "http://127.0.0.1:8600" # `consul` service is default skip service
skip_services: # if you need to skip special services
- "service_a"
timeout:
Expand Down
2 changes: 1 addition & 1 deletion t/discovery/consul_dump.t
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ discovery:
- "http://127.0.0.1:8500"
dump:
path: "consul.dump"
load_on_init: true
load_on_init: false
Fabriceli marked this conversation as resolved.
Show resolved Hide resolved
--- config
location /t {
content_by_lua_block {
Expand Down