diff --git a/apisix/init.lua b/apisix/init.lua index e0903779c964..6225fccfa255 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -42,12 +42,16 @@ local str_byte = string.byte local str_sub = string.sub local tonumber = tonumber local control_api_router + +local is_http = false if ngx.config.subsystem == "http" then + is_http = true control_api_router = require("apisix.control.router") end + local load_balancer local local_conf -local ver_header = "APISIX/" .. core.version.VERSION +local ver_header = "APISIX/" .. core.version.VERSION local _M = {version = 0.4} @@ -118,7 +122,7 @@ function _M.http_init_worker() end require("apisix.debug").init_worker() - require("apisix.upstream").init_worker() + apisix_upstream.init_worker() require("apisix.plugins.ext-plugin.init").init_worker() local_conf = core.config.local_conf() @@ -254,6 +258,38 @@ local function set_upstream_host(api_ctx) end +local function get_upstream_by_id(up_id) + local upstreams = core.config.fetch_created_obj("/upstreams") + if upstreams then + local upstream = upstreams:get(tostring(up_id)) + if not upstream then + core.log.error("failed to find upstream by id: " .. up_id) + if is_http then + return core.response.exit(502) + end + + return ngx_exit(1) + end + + if upstream.has_domain then + local err + upstream, err = parse_domain_in_up(upstream) + if err then + core.log.error("failed to get resolved upstream: ", err) + if is_http then + return core.response.exit(500) + end + + return ngx_exit(1) + end + end + + core.log.info("parsed upstream: ", core.json.delay_encode(upstream)) + return upstream.dns_value or upstream.value + end +end + + function _M.http_access_phase() local ngx_ctx = ngx.ctx @@ -385,30 +421,12 @@ function _M.http_access_phase() end if up_id then - local upstreams = core.config.fetch_created_obj("/upstreams") - if upstreams then - local upstream = upstreams:get(tostring(up_id)) - if not upstream then - core.log.error("failed to find upstream by id: " .. up_id) - return core.response.exit(502) - end - - if upstream.has_domain then - local err - upstream, err = parse_domain_in_up(upstream) - if err then - core.log.error("failed to get resolved upstream: ", err) - return core.response.exit(500) - end - end - - if upstream.value.pass_host then - api_ctx.pass_host = upstream.value.pass_host - api_ctx.upstream_host = upstream.value.upstream_host - end + local upstream = get_upstream_by_id(up_id) + api_ctx.matched_upstream = upstream - core.log.info("parsed upstream: ", core.json.delay_encode(upstream)) - api_ctx.matched_upstream = upstream.dns_value or upstream.value + if upstream and upstream.pass_host then + api_ctx.pass_host = upstream.pass_host + api_ctx.upstream_host = upstream.upstream_host end else @@ -722,6 +740,7 @@ function _M.stream_init_worker() plugin.init_worker() router.stream_init_worker() + apisix_upstream.init_worker() if core.config == require("apisix.core.config_yaml") then core.config.init_worker() @@ -756,11 +775,18 @@ function _M.stream_preread_phase() return ngx_exit(1) end + + local up_id = matched_route.value.upstream_id + if up_id then + api_ctx.matched_upstream = get_upstream_by_id(up_id) + else + api_ctx.matched_upstream = matched_route.value.upstream + end + local plugins = core.tablepool.fetch("plugins", 32, 0) api_ctx.plugins = plugin.stream_filter(matched_route, plugins) -- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true)) - api_ctx.matched_upstream = matched_route.value.upstream api_ctx.conf_type = "stream/route" api_ctx.conf_version = matched_route.modifiedIndex api_ctx.conf_id = matched_route.value.id diff --git a/t/stream-node/sanity.t b/t/stream-node/sanity.t index 7eaf06e0bb50..cd4e0197de74 100644 --- a/t/stream-node/sanity.t +++ b/t/stream-node/sanity.t @@ -163,3 +163,70 @@ GET /t passed --- no_error_log [error] + + + +=== TEST 7: set upstream (id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/upstreams/1', + ngx.HTTP_PUT, + [[{ + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 8: set stream route (id: 1) which uses upstream_id +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "upstream_id": "1" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 9: hit route +--- stream_enable +--- stream_request eval +mmm +--- stream_response +hello world +--- no_error_log +[error]