Skip to content

Commit

Permalink
perf: simple setup upstream (#8130)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzssangglass authored Oct 23, 2022
1 parent ecdc209 commit 3887162
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 127 deletions.
2 changes: 1 addition & 1 deletion apisix/http/route.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter)
end
end

event.push(event.CONST.BUILD_ROUTER, uri_routes)
event.push(event.CONST.BUILD_ROUTER, routes)
core.log.info("route items: ", core.json.delay_encode(uri_routes, true))

if with_parameter then
Expand Down
214 changes: 110 additions & 104 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,115 @@ local function common_phase(phase_name)
end



function _M.handle_upstream(api_ctx, route, enable_websocket)
local up_id = route.value.upstream_id

-- used for the traffic-split plugin
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end

if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end

api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end

local route_val = route.value

api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end

if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
api_ctx.matched_upstream.tls.client_cert_id then

local cert_id = api_ctx.matched_upstream.tls.client_cert_id
local upstream_ssl = router.router_ssl.get_by_id(cert_id)
if not upstream_ssl or upstream_ssl.type ~= "client" then
local err = upstream_ssl and
"ssl type should be 'client'" or
"ssl id [" .. cert_id .. "] not exits"
core.log.error("failed to get ssl cert: ", err)

if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

core.log.info("matched ssl: ",
core.json.delay_encode(upstream_ssl, true))
api_ctx.upstream_ssl = upstream_ssl
end

if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end

local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

api_ctx.picked_server = server

set_upstream_headers(api_ctx, server)

-- run the before_proxy method in access phase first to avoid always reinit request
common_phase("before_proxy")

local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end

if api_ctx.dubbo_proxy_enabled then
stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
end


function _M.http_access_phase()
local ngx_ctx = ngx.ctx

Expand Down Expand Up @@ -495,110 +604,7 @@ function _M.http_access_phase()
plugin.run_plugin("access", plugins, api_ctx)
end

local up_id = route.value.upstream_id

-- used for the traffic-split plugin
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end

if up_id then
local upstream = apisix_upstream.get_by_id(up_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream

else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end

api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end

local route_val = route.value

api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end

if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
api_ctx.matched_upstream.tls.client_cert_id then

local cert_id = api_ctx.matched_upstream.tls.client_cert_id
local upstream_ssl = router.router_ssl.get_by_id(cert_id)
if not upstream_ssl or upstream_ssl.type ~= "client" then
local err = upstream_ssl and
"ssl type should be 'client'" or
"ssl id [" .. cert_id .. "] not exits"
core.log.error("failed to get ssl cert: ", err)

if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

core.log.info("matched ssl: ",
core.json.delay_encode(upstream_ssl, true))
api_ctx.upstream_ssl = upstream_ssl
end

if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end

-- load balancer is not required by kafka upstream, so the upstream
-- node selection process is intercepted and left to kafka to
-- handle on its own
if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
return pubsub_kafka.access(api_ctx)
end

local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end

local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

api_ctx.picked_server = server

set_upstream_headers(api_ctx, server)

-- run the before_proxy method in access phase first to avoid always reinit request
common_phase("before_proxy")

local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end

if api_ctx.dubbo_proxy_enabled then
stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
_M.handle_upstream(api_ctx, route, enable_websocket)
end


Expand Down
Loading

0 comments on commit 3887162

Please sign in to comment.