Skip to content

Commit

Permalink
feat: support to use upstream_id in stream_route (#4121)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers authored May 6, 2021
1 parent ffed438 commit 3a3874a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 26 deletions.
78 changes: 52 additions & 26 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ local str_byte = string.byte
local str_sub = string.sub
local tonumber = tonumber
local control_api_router

local is_http = false
if ngx.config.subsystem == "http" then
is_http = true
control_api_router = require("apisix.control.router")
end

local load_balancer
local local_conf
local ver_header = "APISIX/" .. core.version.VERSION
local ver_header = "APISIX/" .. core.version.VERSION


local _M = {version = 0.4}
Expand Down Expand Up @@ -118,7 +122,7 @@ function _M.http_init_worker()
end

require("apisix.debug").init_worker()
require("apisix.upstream").init_worker()
apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()

local_conf = core.config.local_conf()
Expand Down Expand Up @@ -254,6 +258,38 @@ local function set_upstream_host(api_ctx)
end


local function get_upstream_by_id(up_id)
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
local upstream = upstreams:get(tostring(up_id))
if not upstream then
core.log.error("failed to find upstream by id: " .. up_id)
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

if upstream.has_domain then
local err
upstream, err = parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
if is_http then
return core.response.exit(500)
end

return ngx_exit(1)
end
end

core.log.info("parsed upstream: ", core.json.delay_encode(upstream))
return upstream.dns_value or upstream.value
end
end


function _M.http_access_phase()
local ngx_ctx = ngx.ctx

Expand Down Expand Up @@ -385,30 +421,12 @@ function _M.http_access_phase()
end

if up_id then
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
local upstream = upstreams:get(tostring(up_id))
if not upstream then
core.log.error("failed to find upstream by id: " .. up_id)
return core.response.exit(502)
end

if upstream.has_domain then
local err
upstream, err = parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
return core.response.exit(500)
end
end

if upstream.value.pass_host then
api_ctx.pass_host = upstream.value.pass_host
api_ctx.upstream_host = upstream.value.upstream_host
end
local upstream = get_upstream_by_id(up_id)
api_ctx.matched_upstream = upstream

core.log.info("parsed upstream: ", core.json.delay_encode(upstream))
api_ctx.matched_upstream = upstream.dns_value or upstream.value
if upstream and upstream.pass_host then
api_ctx.pass_host = upstream.pass_host
api_ctx.upstream_host = upstream.upstream_host
end

else
Expand Down Expand Up @@ -722,6 +740,7 @@ function _M.stream_init_worker()

plugin.init_worker()
router.stream_init_worker()
apisix_upstream.init_worker()

if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
Expand Down Expand Up @@ -756,11 +775,18 @@ function _M.stream_preread_phase()
return ngx_exit(1)
end


local up_id = matched_route.value.upstream_id
if up_id then
api_ctx.matched_upstream = get_upstream_by_id(up_id)
else
api_ctx.matched_upstream = matched_route.value.upstream
end

local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))

api_ctx.matched_upstream = matched_route.value.upstream
api_ctx.conf_type = "stream/route"
api_ctx.conf_version = matched_route.modifiedIndex
api_ctx.conf_id = matched_route.value.id
Expand Down
67 changes: 67 additions & 0 deletions t/stream-node/sanity.t
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,70 @@ GET /t
passed
--- no_error_log
[error]
=== TEST 7: set upstream (id: 1)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/upstreams/1',
ngx.HTTP_PUT,
[[{
"nodes": {
"127.0.0.1:1995": 1
},
"type": "roundrobin"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 8: set stream route (id: 1) which uses upstream_id
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"upstream_id": "1"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]
=== TEST 9: hit route
--- stream_enable
--- stream_request eval
mmm
--- stream_response
hello world
--- no_error_log
[error]

0 comments on commit 3a3874a

Please sign in to comment.