Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support config stream_route upstream in service #10298

Merged
merged 7 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apisix/admin/services.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local get_routes = require("apisix.router").http_routes
local get_stream_routes = require("apisix.router").stream_routes
local apisix_upstream = require("apisix.upstream")
local resource = require("apisix.admin.resource")
local schema_plugin = require("apisix.admin.plugins").check_schema
Expand Down Expand Up @@ -99,6 +100,21 @@ local function delete_checker(id)
end
end

local stream_routes, stream_routes_ver = get_stream_routes()
core.log.info("stream_routes: ", core.json.delay_encode(stream_routes, true))
core.log.info("stream_routes_ver: ", stream_routes_ver)
if stream_routes_ver and stream_routes then
for _, route in ipairs(stream_routes) do
if type(route) == "table" and route.value
and route.value.service_id
and tostring(route.value.service_id) == id then
return 400, {error_msg = "can not delete this service directly,"
.. " stream_route [" .. route.value.id
.. "] is still using it now"}
end
end
end

return nil, nil
end

Expand Down
17 changes: 17 additions & 0 deletions apisix/admin/stream_routes.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ local function check_conf(id, conf, need_id, schema)
end
end

local service_id = conf.service_id
if service_id then
local key = "/services/" .. service_id
local res, err = core.etcd.get(key)
if not res then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "]: "
.. err}
end

if res.status ~= 200 then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "], "
.. "response code: " .. res.status}
end
end

local ok, err = stream_route_checker(conf, true)
if not ok then
return nil, {error_msg = err}
Expand Down
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ return {
},
STREAM_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/services"] = true,
["/plugins"] = true,
["/ssls"] = true,
["/stream_routes"] = true,
Expand Down
2 changes: 1 addition & 1 deletion apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function _M.init_worker()
filter = filter,
})
if not services then
error("failed to create etcd instance for fetching upstream: " .. err)
error("failed to create etcd instance for fetching /services: " .. err)
shreemaan-abhishek marked this conversation as resolved.
Show resolved Hide resolved
return
end
end
Expand Down
29 changes: 29 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ function _M.stream_init_worker()
plugin.init_worker()
xrpc.init_worker()
router.stream_init_worker()
require("apisix.http.service").init_worker()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the service.lua be moved out of the http directory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, obviously service is designed for http

apisix_upstream.init_worker()

local we = require("resty.worker.events")
Expand Down Expand Up @@ -1078,6 +1079,34 @@ function _M.stream_preread_phase()

api_ctx.matched_upstream = upstream

elseif matched_route.value.service_id then
local service = service_fetch(matched_route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", matched_route.value.service_id)
return core.response.exit(404)
end

matched_route = plugin.merge_service_stream_route(service, matched_route)
api_ctx.matched_route = matched_route
api_ctx.conf_type = "stream_route&service"
api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name
api_ctx.matched_upstream = matched_route.value.upstream
if matched_route.value.upstream_id and not matched_route.value.upstream then
local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
lingsamuel marked this conversation as resolved.
Show resolved Hide resolved
end

api_ctx.matched_upstream = upstream
end
else
if matched_route.has_domain then
local err
Expand Down
46 changes: 46 additions & 0 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ local stream_local_plugins_hash = core.table.new(0, 32)
local merged_route = core.lrucache.new({
ttl = 300, count = 512
})
local merged_stream_route = core.lrucache.new({
ttl = 300, count = 512
})
local expr_lrucache = core.lrucache.new({
ttl = 300, count = 512
})
Expand Down Expand Up @@ -637,6 +640,49 @@ function _M.merge_service_route(service_conf, route_conf)
end


local function merge_service_stream_route(service_conf, route_conf)
-- because many fields in Service are not supported by stream route,
-- so we copy the stream route as base object
local new_conf = core.table.deepcopy(route_conf)
if service_conf.value.plugins then
for name, conf in pairs(service_conf.value.plugins) do
if not new_conf.value.plugins then
new_conf.value.plugins = {}
end

if not new_conf.value.plugins[name] then
new_conf.value.plugins[name] = conf
end
end
end

new_conf.value.service_id = nil

if not new_conf.value.upstream and service_conf.value.upstream then
new_conf.value.upstream = service_conf.value.upstream
end

if not new_conf.value.upstream_id and service_conf.value.upstream_id then
new_conf.value.upstream_id = service_conf.value.upstream_id
end

return new_conf
end


function _M.merge_service_stream_route(service_conf, route_conf)
lingsamuel marked this conversation as resolved.
Show resolved Hide resolved
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" stream route conf: ", core.json.delay_encode(route_conf, true))

local version = route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
local route_service_key = route_conf.value.id .. "#"
.. version
return merged_stream_route(route_service_key, version,
merge_service_stream_route,
service_conf, route_conf)
end


local function merge_consumer_route(route_conf, consumer_conf, consumer_group_conf)
if not consumer_conf.plugins or
core.table.nkeys(consumer_conf.plugins) == 0
Expand Down
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ _M.stream_route = {
},
upstream = upstream_schema,
upstream_id = id_schema,
service_id = id_schema,
plugins = plugins_schema,
protocol = xrpc_protocol_schema,
},
Expand Down
4 changes: 4 additions & 0 deletions apisix/stream/router/ip_port.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ do
for _, route in ipairs(items) do
local hit = match_addrs(route, vars)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
ctx.matched_route = route
return true
end
Expand Down Expand Up @@ -175,6 +177,8 @@ do
for _, route in ipairs(other_routes) do
local hit = match_addrs(route, api_ctx.var)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
api_ctx.matched_route = route
return true
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,7 @@ Stream Route resource request address: /apisix/admin/stream_routes/{id}
| ----------- | -------- | -------- | ------------------------------------------------------------------- | ----------------------------- |
| upstream | False | Upstream | Configuration of the [Upstream](./terminology/upstream.md). | |
| upstream_id | False | Upstream | Id of the [Upstream](terminology/upstream.md) service. | |
| service_id | False | String | Id of the [Service](terminology/service.md) service. | |
| remote_addr | False | IPv4, IPv4 CIDR, IPv6 | Filters Upstream forwards by matching with client IP. | "127.0.0.1" or "127.0.0.1/32" or "::1" |
| server_addr | False | IPv4, IPv4 CIDR, IPv6 | Filters Upstream forwards by matching with APISIX Server IP. | "127.0.0.1" or "127.0.0.1/32" or "::1" |
| server_port | False | Integer | Filters Upstream forwards by matching with APISIX Server port. | 9090 |
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/admin-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ Plugin 资源请求地址:/apisix/admin/stream_routes/{id}
| ---------------- | ------| -------- | ------------------------------------------------------------------------------| ------ |
| upstream | 否 | Upstream | Upstream 配置,详细信息请参考 [Upstream](terminology/upstream.md)。 | |
| upstream_id | 否 | Upstream | 需要使用的 Upstream id,详细信息请 [Upstream](terminology/upstream.md)。 | |
| service_id | 否 | String | 需要使用的 [Service](terminology/service.md) id. | |
| remote_addr | 否 | IPv4, IPv4 CIDR, IPv6 | 过滤选项:如果客户端 IP 匹配,则转发到上游 | "127.0.0.1" 或 "127.0.0.1/32" 或 "::1" |
| server_addr | 否 | IPv4, IPv4 CIDR, IPv6 | 过滤选项:如果 APISIX 服务器的 IP 与 `server_addr` 匹配,则转发到上游。 | "127.0.0.1" 或 "127.0.0.1/32" 或 "::1" |
| server_port | 否 | 整数 | 过滤选项:如果 APISIX 服务器的端口 与 `server_port` 匹配,则转发到上游。 | 9090 |
Expand Down
Loading
Loading