Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: plugin grpc-transcode supports grpc deadline #1149

Merged
merged 3 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions lua/apisix/plugins/grpc-transcode.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,19 @@ local schema = {
type = "object",
properties = {
proto_id = schema_def.id_schema,
service = { type = "string" },
method = { type = "string" },
service = {
description = "the grpc service name",
type = "string"
},
method = {
description = "the method name in the grpc service.",
type = "string"
},
deadline = {
description = "deadline for grpc, millisecond",
type = "number",
default = 0
},
pb_option = { type = "array",
items = { type="string", anyOf = pb_option_def },
minItems = 1,
Expand All @@ -63,6 +74,17 @@ local schema = {
additionalProperties = true }
}

local status_rel = {
["3"] = 400,
["4"] = 504,
["5"] = 404,
["7"] = 403,
["11"] = 416,
["12"] = 501,
["13"] = 500,
["14"] = 503,
}

local _M = {
version = 0.1,
priority = 506,
Expand Down Expand Up @@ -102,7 +124,7 @@ function _M.access(conf, ctx)
end

local ok, err = request(proto_obj, conf.service,
conf.method, conf.pb_option)
conf.method, conf.pb_option, conf.deadline)
if not ok then
core.log.error("transform request error: ", err)
return
Expand All @@ -119,6 +141,18 @@ function _M.header_filter(conf, ctx)

ngx.header["Content-Type"] = "application/json"
ngx.header["Trailer"] = {"grpc-status", "grpc-message"}

local headers = ngx.resp.get_headers()
if headers["grpc-status"] ~= nil and headers["grpc-status"] ~= "0" then
local http_status = status_rel[headers["grpc-status"]]
if http_status ~= nil then
ngx.status = http_status
else
ngx.status = 599
end
return
end

end


Expand Down
9 changes: 8 additions & 1 deletion lua/apisix/plugins/grpc-transcode/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ local ngx = ngx
local string = string
local table = table
local ipairs = ipairs
local tonumber = tonumber

return function (proto, service, method, pb_option, default_values)
return function (proto, service, method, pb_option, deadline, default_values)
core.log.info("proto: ", core.json.delay_encode(proto, true))
local m = util.find_method(proto, service, method)
if not m then
Expand Down Expand Up @@ -62,5 +63,11 @@ return function (proto, service, method, pb_option, default_values)
ngx.req.set_uri("/" .. service .. "/" .. method, false)
ngx.req.set_uri_args({})
ngx.req.set_body_data(message)

local dl = tonumber(deadline)
if dl~= nil and dl > 0 then
ngx.req.set_header("grpc-timeout", dl .. "m")
end

return true
end
44 changes: 22 additions & 22 deletions lua/apisix/plugins/grpc-transcode/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,33 @@ return function(proto, service, method, pb_option)
ngx.arg[1] = nil
end

if not eof then
return
end

ngx.ctx.buffered = nil
local buffer = table.concat(buffered)
if not ngx.req.get_headers()["X-Grpc-Web"] then
buffer = string.sub(buffer, 6)
end
if eof then
ngx.ctx.buffered = nil
local buffer = table.concat(buffered)
if not ngx.req.get_headers()["X-Grpc-Web"] then
buffer = string.sub(buffer, 6)
end

if pb_option then
for _, opt in ipairs(pb_option) do
pb.option(opt)
if pb_option then
for _, opt in ipairs(pb_option) do
pb.option(opt)
end
end
end

local decoded = pb.decode(m.output_type, buffer)
if not decoded then
ngx.arg[1] = "failed to decode response data by protobuf"
return
end
local decoded = pb.decode(m.output_type, buffer)
if not decoded then
ngx.arg[1] = "failed to decode response data by protobuf"
return "failed to decode response data by protobuf"
end

local response, err = core.json.encode(decoded)
if not response then
core.log.error("failed to call json_encode data: ", err)
response = "failed to json_encode response body"
end

local response, err = core.json.encode(decoded)
if not response then
core.log.error("failed to call json_encode data: ", err)
response = "failed to json_encode response body"
ngx.arg[1] = response
end

ngx.arg[1] = response
end
105 changes: 105 additions & 0 deletions t/plugin/grpc-transcode.t
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ Connection refused) while connecting to upstream
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc Plus (PlusRequest) returns (PlusReply) {}
rpc SayHelloAfterDelay (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}
Expand Down Expand Up @@ -324,3 +326,106 @@ GET /grpc_plus?a=1&b=2251799813685260
qr/\{"result":"#2251799813685261"\}/
--- no_error_log
[error]



=== TEST 11: set route3 deadline nodelay
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/3',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"uri": "/grpc_deadline",
"service_protocol": "grpc",
"plugins": {
"grpc-transcode": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHello",
"deadline": 500
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 12: hit route
--- request
GET /grpc_deadline?name=apisix
--- response_body eval
qr/\{"message":"Hello apisix"\}/
--- no_error_log
[error]



=== TEST 13: set route4 deadline delay
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/4',
ngx.HTTP_PUT,
[[{
"methods": ["GET"],
"uri": "/grpc_delay",
"service_protocol": "grpc",
"plugins": {
"grpc-transcode": {
"proto_id": "1",
"service": "helloworld.Greeter",
"method": "SayHelloAfterDelay",
"deadline": 500
}
},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:50051": 1
}
}
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 14: hit route
--- request
GET /grpc_delay?name=apisix
--- error_code: 504