Skip to content

Commit

Permalink
Merge branch 'upstream/master' into github/master
Browse files Browse the repository at this point in the history
* upstream/master:
  docs: change the file name to 'create-ssl.py'.If 'ssl.py' is used as … (apache#8623)
  feat: Body transformer plugin (apache#8766)
  fix: mocking plugin panic when response_example contain $ (apache#8810) (apache#8816)
  feat: file logger plugin support response body in variable (apache#8711)
  docs(getting-started): rewrite the install section (apache#8807)
  feat: allow each logger to define custom log format in its conf (apache#8806)
  fix(etcd): reloaded data may be in res.body.node (apache#8736)
  fix: fix fetch all service info from consul (apache#8651)
  docs: fix global-rule.md wrong curl  address (apache#8793)
  feat: stream subsystem support consul service discovery (apache#8696)
  chore(kafka-logger): support configuration `meta_refresh_interval` parameter (apache#8762)
  feat: ready to release 2.15.2 (apache#8783)
  • Loading branch information
hongbinhsu committed Feb 13, 2023
2 parents 93942f5 + 0bc65ea commit a7e20e7
Show file tree
Hide file tree
Showing 95 changed files with 3,305 additions and 243 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ title: Changelog
- [3.1.0](#310)
- [3.0.0](#300)
- [3.0.0-beta](#300-beta)
- [2.15.2](#2152)
- [2.15.1](#2151)
- [2.15.0](#2150)
- [2.14.1](#2141)
Expand Down Expand Up @@ -295,6 +296,12 @@ Returns multiple configurations:
- avoid error when multiple plugins associated with consumer and have rewrite phase: [#7531](https://github.com/apache/apisix/pull/7531)
- upgrade lua-resty-etcd to 1.8.3 which fixes various issues: [#7565](https://github.com/apache/apisix/pull/7565)

## 2.15.2

**This is an LTS maintenance release and you can see the CHANGELOG in `release/2.15` branch.**

[https://github.com/apache/apisix/blob/release/2.15/CHANGELOG.md#2152](https://github.com/apache/apisix/blob/release/2.15/CHANGELOG.md#2152)

## 2.15.1

**This is an LTS maintenance release and you can see the CHANGELOG in `release/2.15` branch.**
Expand Down
6 changes: 2 additions & 4 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,9 @@ local function sync_data(self)
return false, err
end

local dir_res, headers = res.body.list or {}, res.headers
local dir_res, headers = res.body.list or res.body.node or {}, res.headers
log.debug("readdir key: ", self.key, " res: ",
json.delay_encode(dir_res))
if not dir_res then
return false, err
end

if self.values then
for i, val in ipairs(self.values) do
Expand Down Expand Up @@ -673,6 +670,7 @@ local function _automatic_fetch(premature, self)
end

-- for test
_M.test_sync_data = sync_data
_M.test_automatic_fetch = _automatic_fetch
function _M.inject_sync_data(f)
sync_data = f
Expand Down
4 changes: 4 additions & 0 deletions apisix/core/ctx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ do
balancer_port = true,
consumer_group_id = true,
consumer_name = true,
resp_body = function(ctx)
-- only for logger and requires the logger to have a special configuration
return ctx.resp_body or ''
end,
route_id = true,
route_name = true,
service_id = true,
Expand Down
159 changes: 82 additions & 77 deletions apisix/discovery/consul/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ local events
local events_list
local consul_services

local default_skip_services = {"consul"}

local _M = {
version = 0.1,
version = 0.2,
}


local function discovery_consul_callback(data, event, source, pid)
all_services = data
log.notice("update local variable all_services, event is: ", event,
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
end


Expand All @@ -75,44 +77,15 @@ function _M.nodes(service_name)
end

log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ",
json_delay_encode(resp_list, true))
json_delay_encode(resp_list, true))

return resp_list
end


local function parse_instance(node)
local service_name, host, port = node.Service, node.Address, node.Port
-- if exist, skip special service name
if service_name and skip_service_map[service_name] then
return false
end
-- "" means metadata of the service
return true, host, tonumber(port), "", service_name
end


local function update_all_services(server_name_prefix, data)
local up_services = core.table.new(0, #data)
local weight = default_weight
for _, node in pairs(data) do
local succ, ip, port, metadata, server_name = parse_instance(node)
if succ then
local nodes = up_services[server_name]
if not nodes then
nodes = core.table.new(1, 0)
up_services[server_name] = nodes
end
core.table.insert(nodes, {
host = ip,
port = port,
weight = metadata and metadata.weight or weight,
})
end
end

local function update_all_services(consul_server_url, up_services)
-- clean old unused data
local old_services = consul_services[server_name_prefix] or {}
local old_services = consul_services[consul_server_url] or {}
for k, _ in pairs(old_services) do
all_services[k] = nil
end
Expand All @@ -121,7 +94,7 @@ local function update_all_services(server_name_prefix, data)
for k, v in pairs(up_services) do
all_services[k] = v
end
consul_services[server_name_prefix] = up_services
consul_services[consul_server_url] = up_services

log.info("update all services: ", json_delay_encode(all_services, true))
end
Expand Down Expand Up @@ -154,7 +127,7 @@ local function read_dump_services()

local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
dump_params.expire, ", now_time: ", now_time)
dump_params.expire, ", now_time: ", now_time)
if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
Expand Down Expand Up @@ -223,9 +196,9 @@ function _M.connect(premature, consul_server, retry_delay)
and watch_result.status)
if watch_error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)
" by sub url: ", consul_server.consul_watch_sub_url,
", got watch result: ", json_delay_encode(watch_result, true),
", with error: ", watch_error_info)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
Expand All @@ -235,55 +208,83 @@ function _M.connect(premature, consul_server, retry_delay)
end

log.info("connect consul: ", consul_server.consul_server_url,
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))
", watch_result status: ", watch_result.status,
", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
", consul_server.index: ", consul_server.index,
", consul_server: ", json_delay_encode(consul_server, true))

-- if current index different last index then update service
if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
local up_services = core.table.new(0, #watch_result.body)
local consul_client_svc = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
})
for service_name, _ in pairs(watch_result.body) do
-- check if the service_name is 'skip service'
if skip_service_map[service_name] then
goto CONTINUE
end
-- get node from service
local svc_url = consul_server.consul_sub_url .. "/" .. service_name
local result, err = consul_client_svc:get(svc_url)
local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
", by service url: ", svc_url, ", with error: ", error_info)
goto CONTINUE
end

-- fetch all services info
local result, err = consul_client:get(consul_server.consul_sub_url)

local error_info = (err ~= nil and err) or
((result ~= nil and result.status ~= 200) and result.status)
-- decode body, decode json, update service, error handling
if result.body then
log.notice("service url: ", svc_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
-- add services to table
local nodes = up_services[service_name]
for _, node in ipairs(result.body) do
local svc_address, svc_port = node.ServiceAddress, node.ServicePort
if not svc_address then
svc_address = node.Address
end
-- if nodes is nil, new nodes table and set to up_services
if not nodes then
nodes = core.table.new(1, 0)
up_services[service_name] = nodes
end
-- add node to nodes table
core.table.insert(nodes, {
host = svc_address,
port = tonumber(svc_port),
weight = default_weight,
})
end
up_services[service_name] = nodes
end
:: CONTINUE ::
end

if error_info then
log.error("connect consul: ", consul_server.consul_server_url,
" by sub url: ", consul_server.consul_sub_url,
", got result: ", json_delay_encode(result, true),
", with error: ", error_info)
update_all_services(consul_server.consul_server_url, up_services)

retry_delay = get_retry_delay(retry_delay)
log.warn("retry connecting consul after ", retry_delay, " seconds")
core_sleep(retry_delay)
--update events
local ok, post_err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end

goto ERR
if dump_params then
ngx_timer_at(0, write_dump_services)
end

consul_server.index = watch_result.headers['X-Consul-Index']
-- only long connect type use index
if consul_server.keepalive then
consul_server.default_args.index = watch_result.headers['X-Consul-Index']
end
-- decode body, decode json, update service, error handling
if result.body then
log.notice("server_name: ", consul_server.consul_server_url,
", header: ", json_delay_encode(result.headers, true),
", body: ", json_delay_encode(result.body, true))
update_all_services(consul_server.consul_server_url, result.body)
--update events
local ok, err = events.post(events_list._source, events_list.updating, all_services)
if not ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", err)
end

if dump_params then
ngx_timer_at(0, write_dump_services)
end
end
end

:: ERR ::
Expand Down Expand Up @@ -324,7 +325,7 @@ local function format_consul_params(consul_conf)
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
consul_sub_url = "/agent/services",
consul_sub_url = "/catalog/service",
consul_watch_sub_url = "/catalog/services",
consul_server_url = v .. "/v1",
weight = consul_conf.weight,
Expand Down Expand Up @@ -375,10 +376,14 @@ function _M.init_worker()
skip_service_map[v] = true
end
end
-- set up default skip service
for _, v in ipairs(default_skip_services) do
skip_service_map[v] = true
end

local consul_servers_list, err = format_consul_params(consul_conf)
if err then
error(err)
error("format consul config got error: " .. err)
end
log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true))

Expand Down
Loading

0 comments on commit a7e20e7

Please sign in to comment.