Skip to content

Commit

Permalink
feat: support config stream_route upstream in service (#10298)
Browse files Browse the repository at this point in the history
* feat: support config stream_route upstream ain service

Signed-off-by: Ling Samuel (WSL) <lingsamuelgrace@gmail.com>
  • Loading branch information
lingsamuel authored Oct 15, 2023
1 parent 6fa5a89 commit 0b38ea2
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 1 deletion.
16 changes: 16 additions & 0 deletions apisix/admin/services.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local get_routes = require("apisix.router").http_routes
local get_stream_routes = require("apisix.router").stream_routes
local apisix_upstream = require("apisix.upstream")
local resource = require("apisix.admin.resource")
local schema_plugin = require("apisix.admin.plugins").check_schema
Expand Down Expand Up @@ -99,6 +100,21 @@ local function delete_checker(id)
end
end

local stream_routes, stream_routes_ver = get_stream_routes()
core.log.info("stream_routes: ", core.json.delay_encode(stream_routes, true))
core.log.info("stream_routes_ver: ", stream_routes_ver)
if stream_routes_ver and stream_routes then
for _, route in ipairs(stream_routes) do
if type(route) == "table" and route.value
and route.value.service_id
and tostring(route.value.service_id) == id then
return 400, {error_msg = "can not delete this service directly,"
.. " stream_route [" .. route.value.id
.. "] is still using it now"}
end
end
end

return nil, nil
end

Expand Down
17 changes: 17 additions & 0 deletions apisix/admin/stream_routes.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ local function check_conf(id, conf, need_id, schema)
end
end

local service_id = conf.service_id
if service_id then
local key = "/services/" .. service_id
local res, err = core.etcd.get(key)
if not res then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "]: "
.. err}
end

if res.status ~= 200 then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "], "
.. "response code: " .. res.status}
end
end

local ok, err = stream_route_checker(conf, true)
if not ok then
return nil, {error_msg = err}
Expand Down
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ return {
},
STREAM_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/services"] = true,
["/plugins"] = true,
["/ssls"] = true,
["/stream_routes"] = true,
Expand Down
2 changes: 1 addition & 1 deletion apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function _M.init_worker()
filter = filter,
})
if not services then
error("failed to create etcd instance for fetching upstream: " .. err)
error("failed to create etcd instance for fetching /services: " .. err)
return
end
end
Expand Down
29 changes: 29 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ function _M.stream_init_worker()
plugin.init_worker()
xrpc.init_worker()
router.stream_init_worker()
require("apisix.http.service").init_worker()
apisix_upstream.init_worker()

local we = require("resty.worker.events")
Expand Down Expand Up @@ -1078,6 +1079,34 @@ function _M.stream_preread_phase()

api_ctx.matched_upstream = upstream

elseif matched_route.value.service_id then
local service = service_fetch(matched_route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", matched_route.value.service_id)
return core.response.exit(404)
end

matched_route = plugin.merge_service_stream_route(service, matched_route)
api_ctx.matched_route = matched_route
api_ctx.conf_type = "stream_route&service"
api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name
api_ctx.matched_upstream = matched_route.value.upstream
if matched_route.value.upstream_id and not matched_route.value.upstream then
local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream
end
else
if matched_route.has_domain then
local err
Expand Down
46 changes: 46 additions & 0 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ local stream_local_plugins_hash = core.table.new(0, 32)
local merged_route = core.lrucache.new({
ttl = 300, count = 512
})
local merged_stream_route = core.lrucache.new({
ttl = 300, count = 512
})
local expr_lrucache = core.lrucache.new({
ttl = 300, count = 512
})
Expand Down Expand Up @@ -637,6 +640,49 @@ function _M.merge_service_route(service_conf, route_conf)
end


local function merge_service_stream_route(service_conf, route_conf)
-- because many fields in Service are not supported by stream route,
-- so we copy the stream route as base object
local new_conf = core.table.deepcopy(route_conf)
if service_conf.value.plugins then
for name, conf in pairs(service_conf.value.plugins) do
if not new_conf.value.plugins then
new_conf.value.plugins = {}
end

if not new_conf.value.plugins[name] then
new_conf.value.plugins[name] = conf
end
end
end

new_conf.value.service_id = nil

if not new_conf.value.upstream and service_conf.value.upstream then
new_conf.value.upstream = service_conf.value.upstream
end

if not new_conf.value.upstream_id and service_conf.value.upstream_id then
new_conf.value.upstream_id = service_conf.value.upstream_id
end

return new_conf
end


function _M.merge_service_stream_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" stream route conf: ", core.json.delay_encode(route_conf, true))

local version = route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
local route_service_key = route_conf.value.id .. "#"
.. version
return merged_stream_route(route_service_key, version,
merge_service_stream_route,
service_conf, route_conf)
end


local function merge_consumer_route(route_conf, consumer_conf, consumer_group_conf)
if not consumer_conf.plugins or
core.table.nkeys(consumer_conf.plugins) == 0
Expand Down
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ _M.stream_route = {
},
upstream = upstream_schema,
upstream_id = id_schema,
service_id = id_schema,
plugins = plugins_schema,
protocol = xrpc_protocol_schema,
},
Expand Down
4 changes: 4 additions & 0 deletions apisix/stream/router/ip_port.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ do
for _, route in ipairs(items) do
local hit = match_addrs(route, vars)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
ctx.matched_route = route
return true
end
Expand Down Expand Up @@ -175,6 +177,8 @@ do
for _, route in ipairs(other_routes) do
local hit = match_addrs(route, api_ctx.var)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
api_ctx.matched_route = route
return true
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,7 @@ Stream Route resource request address: /apisix/admin/stream_routes/{id}
| ----------- | -------- | -------- | ------------------------------------------------------------------- | ----------------------------- |
| upstream | False | Upstream | Configuration of the [Upstream](./terminology/upstream.md). | |
| upstream_id | False | Upstream | Id of the [Upstream](terminology/upstream.md) service. | |
| service_id | False | String | Id of the [Service](terminology/service.md) service. | |
| remote_addr | False | IPv4, IPv4 CIDR, IPv6 | Filters Upstream forwards by matching with client IP. | "127.0.0.1" or "127.0.0.1/32" or "::1" |
| server_addr | False | IPv4, IPv4 CIDR, IPv6 | Filters Upstream forwards by matching with APISIX Server IP. | "127.0.0.1" or "127.0.0.1/32" or "::1" |
| server_port | False | Integer | Filters Upstream forwards by matching with APISIX Server port. | 9090 |
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ Plugin 资源请求地址:/apisix/admin/stream_routes/{id}
| ---------------- | ------| -------- | ------------------------------------------------------------------------------| ------ |
| upstream | 否 | Upstream | Upstream 配置,详细信息请参考 [Upstream](terminology/upstream.md)。 | |
| upstream_id | 否 | Upstream | 需要使用的 Upstream id,详细信息请 [Upstream](terminology/upstream.md)。 | |
| service_id | 否 | String | 需要使用的 [Service](terminology/service.md) id. | |
| remote_addr | 否 | IPv4, IPv4 CIDR, IPv6 | 过滤选项:如果客户端 IP 匹配,则转发到上游 | "127.0.0.1" 或 "127.0.0.1/32" 或 "::1" |
| server_addr | 否 | IPv4, IPv4 CIDR, IPv6 | 过滤选项:如果 APISIX 服务器的 IP 与 `server_addr` 匹配,则转发到上游。 | "127.0.0.1" 或 "127.0.0.1/32" 或 "::1" |
| server_port | 否 | 整数 | 过滤选项:如果 APISIX 服务器的端口 与 `server_port` 匹配,则转发到上游。 | 9090 |
Expand Down
Loading

0 comments on commit 0b38ea2

Please sign in to comment.