diff --git a/.travis/linux_runner.sh b/.travis/linux_runner.sh index 8c0d1e6b518c..dde269df03fe 100755 --- a/.travis/linux_runner.sh +++ b/.travis/linux_runner.sh @@ -23,23 +23,39 @@ before_install() { do_install() { wget -qO - https://openresty.org/package/pubkey.gpg | sudo apt-key add - + sudo apt-get -y update --fix-missing sudo apt-get -y install software-properties-common sudo add-apt-repository -y "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main" sudo apt-get update sudo apt-get install openresty-debug + sudo add-apt-repository -y ppa:longsleep/golang-backports + sudo apt-get update + sudo apt-get install golang + + export GO111MOUDULE=on + export_or_prefix sudo luarocks make --lua-dir=${OPENRESTY_PREFIX}luajit rockspec/apisix-dev-1.0-0.rockspec --tree=deps --only-deps --local sudo luarocks install --lua-dir=${OPENRESTY_PREFIX}luajit lua-resty-libr3 --tree=deps --local - git clone https://github.com/openresty/test-nginx.git test-nginx + git clone https://github.com/membphis/test-nginx.git test-nginx + + git clone https://github.com/nic-chen/grpc_server_example.git grpc_server_example + + cd grpc_server_example/ + go build -o grpc_server_example main.go + cd .. } script() { export_or_prefix export PATH=$OPENRESTY_PREFIX/nginx/sbin:$OPENRESTY_PREFIX/luajit/bin:$OPENRESTY_PREFIX/bin:$PATH sudo service etcd start + + ./grpc_server_example/grpc_server_example & + ./bin/apisix help ./bin/apisix init ./bin/apisix init_etcd diff --git a/.travis/osx_runner.sh b/.travis/osx_runner.sh index 54c587ca509a..1f02b4490df0 100755 --- a/.travis/osx_runner.sh +++ b/.travis/osx_runner.sh @@ -18,6 +18,8 @@ export_or_prefix() { before_install() { HOMEBREW_NO_AUTO_UPDATE=1 brew install perl cpanminus etcd luarocks openresty/brew/openresty-debug + brew upgrade go + export GO111MOUDULE=on sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1) export_or_prefix luarocks install --lua-dir=${OPENRESTY_PREFIX}/luajit luacov-coveralls --local --tree=deps @@ -29,7 +31,12 @@ do_install() { make dev make dev_r3 - git clone https://github.com/openresty/test-nginx.git test-nginx + git clone https://github.com/membphis/test-nginx.git test-nginx + git clone https://github.com/nic-chen/grpc_server_example.git grpc_server_example + + cd grpc_server_example/ + go build -o grpc_server_example main.go + cd .. } script() { @@ -38,6 +45,9 @@ script() { luarocks install luacheck brew services start etcd + + ./grpc_server_example/grpc_server_example & + make help make init sudo make run diff --git a/COPYRIGHT b/COPYRIGHT index d0290c406358..1f1293eff277 100644 --- a/COPYRIGHT +++ b/COPYRIGHT @@ -307,3 +307,58 @@ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. %%%%%%%%% + +lua-resty-grpc-gateway +https://github.com/ysugimoto/lua-resty-grpc-gateway + +MIT License + +Copyright (c) 2019 Yoshiaki Sugimoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +%%%%%%%%% + +lua-protobuf +https://github.com/starwing/lua-protobuf + +MIT License + +Copyright (c) 2019 Yoshiaki Sugimoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + + diff --git a/Makefile b/Makefile index 16f5a6afe5db..806ef87a5b30 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,8 @@ check: lua/apisix/admin/*.lua \ lua/apisix/core/*.lua \ lua/apisix/http/*.lua \ - lua/apisix/plugins/*.lua > \ + lua/apisix/plugins/*.lua \ + lua/apisix/plugins/grpc-proxy/*.lua > \ /tmp/check.log 2>&1 || (cat /tmp/check.log && exit 1) diff --git a/README.md b/README.md index 8ab43d2c8ed9..2e21dee9ebe4 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ For more detailed information, see the [White Paper](https://www.iresty.com/down - **[Limit-concurrency](doc/plugins/limit-conn.md)** - **OpenTracing: [Zipkin](doc/plugins/zipkin.md)** - **Monitoring and Metrics**: [Prometheus](doc/plugins/prometheus.md) +- **[gRPC-Proxy](doc/plugins/grpc-proxy.md)**:REST <-> gRPC proxying. - **Custom plugins**: Allows hooking of common phases, such as `rewrite`, `access`, `header filer`, `body filter` and `log`, also allows to hook the `balancer` stage. - **Dashboard**: Built-in dashboard to control APISIX. - **CLI**: start\stop\reload APISIX through the command line. diff --git a/README_CN.md b/README_CN.md index e815dfbffe7c..55b6679f42f2 100644 --- a/README_CN.md +++ b/README_CN.md @@ -41,6 +41,7 @@ APISIX 通过插件机制,提供动态负载平衡、身份验证、限流限 - **[限制并发](doc/plugins/limit-conn-cn.md)** - **OpenTracing: [Zipkin](doc/plugins/zipkin.md)** - **监控和指标**: [Prometheus](doc/plugins/prometheus-cn.md) +- **[gRPC-Proxy](doc/plugins/grpc-proxy-cn.md)**:REST <-> gRPC proxying. - **自定义插件**: 允许挂载常见阶段,例如`rewrite`,`access`,`header filer`,`body filter`和`log`,还允许挂载 `balancer` 阶段。 - **控制台**: 内置控制台来操作 APISIX 集群。 - **CLI**: 使用命令行来启动、关闭和重启 APISIX。 diff --git a/bin/apisix b/bin/apisix index c8920860a438..e9b238e1c70a 100755 --- a/bin/apisix +++ b/bin/apisix @@ -128,7 +128,7 @@ http { apisix.http_balancer_phase() } - keepalive 32; + keepalive 320; } init_by_lua_block { @@ -231,6 +231,33 @@ http { apisix.http_header_filter_phase() } + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + + log_by_lua_block { + apisix.http_log_phase() + } + } + + location @grpc_pass { + + access_by_lua_block { + apisix.grpc_access_phase() + } + + grpc_set_header Content-Type application/grpc; + grpc_socket_keepalive on; + grpc_pass grpc://apisix_backend; + + header_filter_by_lua_block { + apisix.http_header_filter_phase() + } + + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + log_by_lua_block { apisix.http_log_phase() } diff --git a/conf/config.yaml b/conf/config.yaml index 888a61a02a66..75ca77c198e2 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -35,3 +35,4 @@ plugins: # plugin list - jwt-auth - zipkin - ip-restriction + - grpc-proxy diff --git a/conf/nginx.conf b/conf/nginx.conf index 1c536a465fad..40fe263db597 100644 --- a/conf/nginx.conf +++ b/conf/nginx.conf @@ -62,7 +62,7 @@ http { apisix.http_balancer_phase() } - keepalive 32; + keepalive 320; } init_by_lua_block { @@ -141,9 +141,37 @@ http { apisix.http_header_filter_phase() } + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + log_by_lua_block { apisix.http_log_phase() } } + + location @grpc_pass { + + access_by_lua_block { + apisix.grpc_access_phase() + } + + grpc_set_header Content-Type application/grpc; + grpc_socket_keepalive on; + grpc_pass grpc://apisix_backend; + + header_filter_by_lua_block { + apisix.http_header_filter_phase() + } + + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + + log_by_lua_block { + apisix.http_log_phase() + } + } + } } diff --git a/doc/plugins/grpc-proxy-cn.md b/doc/plugins/grpc-proxy-cn.md new file mode 100644 index 000000000000..921354ddf05d --- /dev/null +++ b/doc/plugins/grpc-proxy-cn.md @@ -0,0 +1,90 @@ +[English](grpc-proxy.md) +# grpc-proxy + +HTTP(s) -> APISIX -> gRPC server + +### Proto + +#### 参数 +* `content`: `.proto` 文件的内容 + +#### 添加proto + +路径中最后的数字,会被用作 proto 的 id 做唯一标识,比如下面示例的 proto `id` 是 `1` : + +```shell +curl http://127.0.0.1:9080/apisix/admin/proto/1 -X PUT -d ' +{ + "content" : "syntax = \"proto3\"; + package helloworld; + service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} + } + message HelloRequest { + string name = 1; + } + message HelloReply { + string message = 1; + }" +}' +``` + +### 参数 + +* `proto_id`: `.proto`内容的id. +* `service`: grpc服务名. +* `method`: grpc服务中要调用的方法名. + + + +### 示例 + +#### 使用 grpc-proxy 插件 + +在指定 route 中,代理 grpc 服务接口: + +* 注意: 这个 route 的属性`service_protocal` 必须设置为 `grpc` +* 例子所代理的 grpc 服务可参考:[grpc_server_example](https://github.com/nic-chen/grpc_server_example) + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/111 -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/grpctest", + "service_protocol": "grpc", + "plugins": { + "grpc-proxy": { + "proto_id": "1", + "service": "helloworld.Greeter", + "method": "SayHello" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:50051": 1 + } + } +}' +``` + + +#### 测试 + +访问上面配置的 route: + +```shell +$ curl -i http://127.0.0.1:9080/grpctest +HTTP/1.1 200 OK +Date: Fri, 16 Aug 2019 11:55:36 GMT +Content-Type: application/json +Transfer-Encoding: chunked +Connection: keep-alive +Server: APISIX web server +Proxy-Connection: keep-alive + +{"message":"Hello world"} +``` + +这表示已成功代理。 + diff --git a/doc/plugins/grpc-proxy.md b/doc/plugins/grpc-proxy.md new file mode 100644 index 000000000000..62b9adadc9ae --- /dev/null +++ b/doc/plugins/grpc-proxy.md @@ -0,0 +1,91 @@ +[中文](grpc-proxy-cn.md) +# grpc-proxy + +HTTP(s) -> APISIX -> gRPC server + +### Proto + +#### Parameters +* `content`: `.proto` file's content. + +#### Add a proto + +Here's an example, adding a proto which `id` is `1`: + +```shell +curl http://127.0.0.1:9080/apisix/admin/proto/1 -X PUT -d ' +{ + "content" : "syntax = \"proto3\"; + package helloworld; + service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} + } + message HelloRequest { + string name = 1; + } + message HelloReply { + string message = 1; + }" +}' +``` + +### Parameters + +* `proto_id`: `.proto` content id. +* `service`: the grpc service name. +* `method`: the method name of grpc service. + +### example + +#### enable plugin + +Here's an example, to enable the grpc-proxy plugin to specified route: + +* attention: the route's option `service_protocal` must be `grpc` +* the grpc server example:[grpc_server_example](https://github.com/nic-chen/grpc_server_example) + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/111 -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/grpctest", + "service_protocol": "grpc", + "plugins": { + "grpc-proxy": { + "proto_id": "1", + "service": "helloworld.Greeter", + "method": "SayHello" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:50051": 1 + } + } +}' +``` + + +#### test plugin + +The above configuration proxy : +```shell +curl -i http://127.0.0.1:9080/grpctest +``` + +response: +``` +HTTP/1.1 200 OK +Date: Fri, 16 Aug 2019 11:55:36 GMT +Content-Type: application/json +Transfer-Encoding: chunked +Connection: keep-alive +Server: APISIX web server +Proxy-Connection: keep-alive + +{"message":"Hello world"} +``` + +This means that the proxying is working. + diff --git a/lua/apisix.lua b/lua/apisix.lua index ecb6533b5d03..522a3333838a 100644 --- a/lua/apisix.lua +++ b/lua/apisix.lua @@ -142,7 +142,7 @@ function _M.http_access_phase() local ngx_ctx = ngx.ctx local api_ctx = ngx_ctx.api_ctx - if api_ctx == nil then + if not api_ctx then api_ctx = core.tablepool.fetch("api_ctx", 0, 32) ngx_ctx.api_ctx = api_ctx end @@ -159,6 +159,70 @@ function _M.http_access_phase() return core.response.exit(404) end + -- + if route.value.service_protocol == "grpc" then + return ngx.exec("@grpc_pass") + end + + if route.value.service_id then + -- core.log.info("matched route: ", core.json.delay_encode(route.value)) + local service = service_fetch(route.value.service_id) + if not service then + core.log.error("failed to fetch service configuration by ", + "id: ", route.value.service_id) + return core.response.exit(404) + end + + local changed + route, changed = plugin.merge_service_route(service, route) + api_ctx.matched_route = route + + if changed then + api_ctx.conf_type = "route&service" + api_ctx.conf_version = route.modifiedIndex .. "&" + .. service.modifiedIndex + api_ctx.conf_id = route.value.id .. "&" + .. service.value.id + else + api_ctx.conf_type = "service" + api_ctx.conf_version = service.modifiedIndex + api_ctx.conf_id = service.value.id + end + + else + api_ctx.conf_type = "route" + api_ctx.conf_version = route.modifiedIndex + api_ctx.conf_id = route.value.id + end + + local plugins = core.tablepool.fetch("plugins", 32, 0) + api_ctx.plugins = plugin.filter(route, plugins) + + run_plugin("rewrite", plugins, api_ctx) + run_plugin("access", plugins, api_ctx) +end + +function _M.grpc_access_phase() + local ngx_ctx = ngx.ctx + local api_ctx = ngx_ctx.api_ctx + + if not api_ctx then + api_ctx = core.tablepool.fetch("api_ctx", 0, 32) + ngx_ctx.api_ctx = api_ctx + end + + core.ctx.set_vars_meta(api_ctx) + + router.router_http.match(api_ctx) + + core.log.info("route: ", + core.json.delay_encode(api_ctx.matched_route, true)) + + local route = api_ctx.matched_route + if not route then + return core.response.exit(404) + end + if route.value.service_id then -- core.log.info("matched route: ", core.json.delay_encode(route.value)) local service = service_fetch(route.value.service_id) @@ -198,10 +262,14 @@ function _M.http_access_phase() end + function _M.http_header_filter_phase() run_plugin("header_filter") end +function _M.http_body_filter_phase() + run_plugin("body_filter") +end function _M.http_log_phase() local api_ctx = run_plugin("log") diff --git a/lua/apisix/admin/init.lua b/lua/apisix/admin/init.lua index da8e82cd3fad..c83168b2f0d8 100644 --- a/lua/apisix/admin/init.lua +++ b/lua/apisix/admin/init.lua @@ -17,6 +17,7 @@ local resources = { schema = require("apisix.admin.schema"), ssl = require("apisix.admin.ssl"), plugins = require("apisix.admin.plugins"), + proto = require("apisix.admin.proto"), } diff --git a/lua/apisix/admin/proto.lua b/lua/apisix/admin/proto.lua new file mode 100644 index 000000000000..d9fcabd1f153 --- /dev/null +++ b/lua/apisix/admin/proto.lua @@ -0,0 +1,153 @@ +local type = type +local ipairs = ipairs +local core = require("apisix.core") +local get_routes = require("apisix.http.router").http_routes +local get_services = require("apisix.http.service").services +local tostring = tostring + + +local _M = { + version = 0.1, +} + + +local function check_conf(id, conf, need_id) + if not conf then + return nil, {error_msg = "missing configurations"} + end + + id = id or conf.id + if need_id and not id then + return nil, {error_msg = "missing proto id"} + end + + if not need_id and id then + return nil, {error_msg = "wrong proto id, do not need it"} + end + + if need_id and conf.id and tostring(conf.id) ~= tostring(id) then + return nil, {error_msg = "wrong proto id"} + end + + core.log.info("schema: ", core.json.delay_encode(core.schema.proto)) + core.log.info("conf : ", core.json.delay_encode(conf)) + local ok, err = core.schema.check(core.schema.proto, conf) + if not ok then + return nil, {error_msg = "invalid configuration: " .. err} + end + + return need_id and id or true +end + + +function _M.put(id, conf) + local id, err = check_conf(id, conf, true) + if not id then + return 400, err + end + + local key = "/proto/" .. id + local res, err = core.etcd.set(key, conf) + if not res then + core.log.error("failed to put proto[", key, "]: ", err) + return 500, {error_msg = err} + end + + return res.status, res.body +end + + +function _M.get(id) + local key = "/proto" + if id then + key = key .. "/" .. id + end + + local res, err = core.etcd.get(key) + if not res then + core.log.error("failed to get proto[", key, "]: ", err) + return 500, {error_msg = err} + end + + return res.status, res.body +end + + +function _M.post(id, conf) + local id, err = check_conf(id, conf, false) + if not id then + return 400, err + end + + local key = "/proto" + -- core.log.info("key: ", key) + local res, err = core.etcd.push("/proto", conf) + if not res then + core.log.error("failed to post proto[", key, "]: ", err) + return 500, {error_msg = err} + end + + return res.status, res.body +end + +function _M.check_proto_used(plugins, deleting, ptype, pid) + + core.log.info("plugins1: ", core.json.delay_encode(plugins, true)) + + if plugins then + if type(plugins) == "table" and plugins["grpc-proxy"] + and plugins["grpc-proxy"].proto_id + and tostring(plugins["grpc-proxy"].proto_id) == deleting then + return 400, {error_msg = "can not delete this proto," + .. ptype .. " [" .. pid + .. "] is still using it now"} + end + end +end + +function _M.delete(id) + if not id then + return 400, {error_msg = "missing proto id"} + end + + local routes, routes_ver = get_routes() + + core.log.info("routes: ", core.json.delay_encode(routes, true)) + core.log.info("routes_ver: ", routes_ver) + + if routes_ver and routes then + for _, route in ipairs(routes) do + if type(route) == "table" and route.value + and route.value.plugins then + return _M.check_proto_used(route.value.plugins, id, "route", route.value.id) + end + end + end + + local services, services_ver = get_services() + + core.log.info("services: ", core.json.delay_encode(services, true)) + core.log.info("services_ver: ", services_ver) + + if services_ver and services then + for _, service in ipairs(services) do + if type(service) == "table" and service.value + and service.value.plugins then + return _M.check_proto_used(service.value.plugins, id, "service", service.value.id) + end + end + end + + local key = "/proto/" .. id + -- core.log.info("key: ", key) + local res, err = core.etcd.delete(key) + if not res then + core.log.error("failed to delete proto[", key, "]: ", err) + return 500, {error_msg = err} + end + + return res.status, res.body +end + + +return _M diff --git a/lua/apisix/admin/routes.lua b/lua/apisix/admin/routes.lua index 56df1e1fd952..630feaec0d23 100644 --- a/lua/apisix/admin/routes.lua +++ b/lua/apisix/admin/routes.lua @@ -27,7 +27,7 @@ local function check_conf(id, conf, need_id) return nil, {error_msg = "wrong route id"} end - core.log.info("schema: ", core.json.delay_encode(core.schema.route)) + core.log.info("schema: ", core.schema.route) core.log.info("conf : ", core.json.delay_encode(conf)) local ok, err = core.schema.check(core.schema.route, conf) if not ok then diff --git a/lua/apisix/core/schema.lua b/lua/apisix/core/schema.lua index efcea4c764af..de1d15219892 100644 --- a/lua/apisix/core/schema.lua +++ b/lua/apisix/core/schema.lua @@ -1,4 +1,5 @@ local json = require('rapidjson') +local cjson = require('cjson.safe') local schema_validator = json.SchemaValidator local schema_doc = json.SchemaDocument local json_doc = json.Document @@ -244,7 +245,7 @@ local upstream_schema = { } -_M.route = [[{ +local route = [[{ "type": "object", "properties": { "methods": { @@ -257,6 +258,9 @@ _M.route = [[{ }, "uniqueItems": true }, + "service_protocol": { + "enum": [ "grpc", "http" ] + }, "desc": {"type": "string", "maxLength": 256}, "plugins": ]] .. json.encode(plugins_schema) .. [[, "upstream": ]] .. json.encode(upstream_schema) .. [[, @@ -288,6 +292,13 @@ _M.route = [[{ ], "additionalProperties": false }]] +do + local route_t, err = cjson.decode(route) + if err then + error("invalid route: " .. route) + end + _M.route = cjson.encode(route_t) +end _M.service = { @@ -345,4 +356,16 @@ _M.ssl = { } +_M.proto = { + type = "object", + properties = { + content = { + type = "string", minLength = 1, maxLength = 4096 + } + }, + required = {"content"}, + additionalProperties = false, +} + + return _M diff --git a/lua/apisix/http/router.lua b/lua/apisix/http/router.lua index f6316099527d..8e3ef0328f35 100644 --- a/lua/apisix/http/router.lua +++ b/lua/apisix/http/router.lua @@ -10,6 +10,7 @@ function _M.init_worker() local conf = local_conf() local router_http_name = "r3_uri" local router_ssl_name = "r3_sni" + if conf and conf.apisix and conf.apisix.router then router_http_name = conf.apisix.router.http or router_http_name router_ssl_name = conf.apisix.router.ssl or router_ssl_name diff --git a/lua/apisix/plugins/grpc-proxy.lua b/lua/apisix/plugins/grpc-proxy.lua new file mode 100644 index 000000000000..3e9c82ba941e --- /dev/null +++ b/lua/apisix/plugins/grpc-proxy.lua @@ -0,0 +1,83 @@ +local ngx = ngx +local core = require("apisix.core") +local plugin_name = "grpc-proxy" +local proto = require("apisix.plugins.grpc-proxy.proto") +local request = require("apisix.plugins.grpc-proxy.request") +local response = require("apisix.plugins.grpc-proxy.response") + + +local schema = { + type = "object", + additionalProperties = true +} + + +local _M = { + version = 0.1, + priority = 506, + name = plugin_name, + schema = schema, +} + + +function _M.init() + proto.init() +end + + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + if not ok then + return false, err + end + + return true +end + + +function _M.access(conf, ctx) + core.log.info("conf: ", core.json.delay_encode(conf)) + + local proto_id = conf.proto_id + if not proto_id then + core.log.error("proto id miss: ", proto_id) + return + end + + local proto_obj, err = proto.fetch(proto_id) + if err then + core.log.error("proto load error: ", err) + return + end + + local ok, err = request(proto_obj, conf.service, conf.method) + if not ok then + core.log.error("trasnform request error: ", err) + return + end + + ctx.proto_obj = proto_obj +end + + +function _M.header_filter(conf, ctx) + ngx.header["Content-Type"] = "application/json" + ngx.header["Trailer"] = {"grpc-status", "grpc-message"} +end + + +function _M.body_filter(conf, ctx) + local proto_obj = ctx.proto_obj + if not proto_obj then + return + end + + local err = response(proto_obj, conf.service, conf.method) + if err then + core.log.error("trasnform response error: ", err) + return + end +end + + +return _M diff --git a/lua/apisix/plugins/grpc-proxy/proto.lua b/lua/apisix/plugins/grpc-proxy/proto.lua new file mode 100644 index 000000000000..ea0c01951cd4 --- /dev/null +++ b/lua/apisix/plugins/grpc-proxy/proto.lua @@ -0,0 +1,64 @@ +local core = require("apisix.core") +local protoc = require("protoc") +local ipairs = ipairs +local protos + + +local lrucache_proto = core.lrucache.new({ + ttl = 300, count = 100 +}) + + +local function create_proto_obj(proto_id) + if protos.values == nil then + return nil + end + + local content + for _, proto in ipairs(protos.values) do + if proto_id == proto.value.id then + content = proto.value.content + break + end + end + + if not content then + return nil, "failed to find proto by id: " .. proto_id + end + + local _p = protoc.new() + local res = _p:load(content) + + if not res or not _p.loaded then + return nil, "failed to load proto content" + end + + + return _p.loaded +end + + +local _M = {version = 0.1} + + +function _M.fetch(proto_id) + return lrucache_proto(proto_id, protos.conf_version, + create_proto_obj, proto_id) +end + + +function _M.init() + local err + protos, err = core.config.new("/proto", { + automatic = true, + item_schema = core.schema.proto + }) + if not protos then + core.log.error("failed to create etcd instance for fetching protos: ", + err) + return + end +end + + +return _M diff --git a/lua/apisix/plugins/grpc-proxy/request.lua b/lua/apisix/plugins/grpc-proxy/request.lua new file mode 100644 index 000000000000..9e52ac6cb9e2 --- /dev/null +++ b/lua/apisix/plugins/grpc-proxy/request.lua @@ -0,0 +1,38 @@ + +local util = require("apisix.plugins.grpc-proxy.util") +local core = require("apisix.core") +local pb = require("pb") +local bit = require("bit") +local ngx = ngx +local string = string +local table = table + + +return function (proto, service, method, default_values) + core.log.info("proto: ", core.json.delay_encode(proto, true)) + local m = util.find_method(proto, service, method) + if not m then + return false, "Undefined service method: " .. service .. "/" .. method + .. " end" + end + + ngx.req.read_body() + local encoded = pb.encode(m.input_type, + util.map_message(m.input_type, default_values or {})) + local size = string.len(encoded) + local prefix = { + string.char(0), + string.char(bit.band(bit.rshift(size, 24), 0xFF)), + string.char(bit.band(bit.rshift(size, 16), 0xFF)), + string.char(bit.band(bit.rshift(size, 8), 0xFF)), + string.char(bit.band(size, 0xFF)) + } + + local message = table.concat(prefix, "") .. encoded + + ngx.req.set_method(ngx.HTTP_POST) + ngx.req.set_uri("/" .. service .. "/" .. method, false) + ngx.req.set_uri_args({}) + ngx.req.set_body_data(message) + return true +end diff --git a/lua/apisix/plugins/grpc-proxy/response.lua b/lua/apisix/plugins/grpc-proxy/response.lua new file mode 100644 index 000000000000..7440ee94c732 --- /dev/null +++ b/lua/apisix/plugins/grpc-proxy/response.lua @@ -0,0 +1,41 @@ +local util = require("apisix.plugins.grpc-proxy.util") +local core = require("apisix.core") +local pb = require("pb") +local ngx = ngx +local string = string +local table = table + + +return function(proto, service, method) + local m = util.find_method(proto, service, method) + if not m then + return false, "2.Undefined service method: " .. service .. "/" .. method + .. " end." + end + + local chunk, eof = ngx.arg[1], ngx.arg[2] + local buffered = ngx.ctx.buffered + if not buffered then + buffered = {} + ngx.ctx.buffered = buffered + end + + if chunk ~= "" then + core.table.insert(buffered, chunk) + ngx.arg[1] = nil + end + + if not eof then + return + end + + ngx.ctx.buffered = nil + local buffer = table.concat(buffered) + if not ngx.req.get_headers()["X-Grpc-Web"] then + buffer = string.sub(buffer, 6) + end + + local decoded = pb.decode(m.output_type, buffer) + local response = core.json.encode(decoded) + ngx.arg[1] = response +end diff --git a/lua/apisix/plugins/grpc-proxy/util.lua b/lua/apisix/plugins/grpc-proxy/util.lua new file mode 100644 index 000000000000..d8f64000797d --- /dev/null +++ b/lua/apisix/plugins/grpc-proxy/util.lua @@ -0,0 +1,84 @@ +local json = require("apisix.core.json") +local pb = require("pb") +local ngx = ngx +local pairs = pairs +local ipairs = ipairs +local string = string +local tonumber = tonumber +local type = type + + +local _M = {version = 0.1} + + +function _M.find_method(protos, service, method) + for k, loaded in pairs(protos) do + if type(loaded) == 'table' then + local package = loaded.package + for _, s in ipairs(loaded.service or {}) do + if package .. "." .. s.name == service then + for _, m in ipairs(s.method) do + if m.name == method then + return m + end + end + end + end + end + end + + return nil +end + + +local function get_from_request(name, kind) + local request_table + if ngx.req.get_method() == "POST" then + if string.find(ngx.req.get_headers()["Content-Type"] or "", + "application/json", true) then + request_table = json.decode(ngx.req.get_body_data()) + else + request_table = ngx.req.get_post_args() + end + else + request_table = ngx.req.get_uri_args() + end + + local prefix = kind:sub(1, 3) + if prefix == "str" then + return request_table[name] or nil + end + + if prefix == "int" then + if request_table[name] then + return tonumber(request_table[name]) + end + end + + return nil +end + + +function _M.map_message(field, default_values) + if not pb.type(field) then + return nil, "Field " .. field .. " is not defined" + end + + local request = {} + local sub, err + for name, _, field_type in pb.fields(field) do + if field_type:sub(1, 1) == "." then + sub, err = _M.map_message(field_type, default_values) + if err then + return nil, err + end + request[name] = sub + else + request[name] = get_from_request(name, field_type) or default_values[name] or nil + end + end + return request +end + + +return _M diff --git a/rockspec/apisix-dev-1.0-0.rockspec b/rockspec/apisix-dev-1.0-0.rockspec index 7cda3a8f3c47..488374e9709b 100644 --- a/rockspec/apisix-dev-1.0-0.rockspec +++ b/rockspec/apisix-dev-1.0-0.rockspec @@ -28,6 +28,7 @@ dependencies = { "opentracing-openresty = 0.1", "lua-resty-radixtree = 0.4", "lua-resty-iputils = 0.3.0-1", + "lua-protobuf = 0.3.1", } build = { diff --git a/t/APISix.pm b/t/APISix.pm index 71093a234286..960fdb43513b 100644 --- a/t/APISix.pm +++ b/t/APISix.pm @@ -8,6 +8,7 @@ repeat_each(1); log_level('info'); no_long_string(); no_shuffle(); +worker_connections(128); my $pwd = cwd(); @@ -157,6 +158,32 @@ _EOC_ apisix.http_header_filter_phase() } + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + + log_by_lua_block { + apisix.http_log_phase() + } + } + + location \@grpc_pass { + access_by_lua_block { + apisix.grpc_access_phase() + } + + grpc_set_header Content-Type application/grpc; + grpc_socket_keepalive on; + grpc_pass grpc://apisix_backend; + + header_filter_by_lua_block { + apisix.http_header_filter_phase() + } + + body_filter_by_lua_block { + apisix.http_body_filter_phase() + } + log_by_lua_block { apisix.http_log_phase() } diff --git a/t/admin/plugins.t b/t/admin/plugins.t index 45f1cc046b00..62a414f53a38 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -14,6 +14,6 @@ __DATA__ --- request GET /apisix/admin/plugins/list --- response_body_like eval -qr/\["limit-req","limit-count","limit-conn","key-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction"\]/ +qr/\["limit-req","limit-count","limit-conn","key-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-proxy"\]/ --- no_error_log [error] diff --git a/t/admin/schema.t b/t/admin/schema.t index 4a017b03d340..3f50b5385dfb 100644 --- a/t/admin/schema.t +++ b/t/admin/schema.t @@ -14,7 +14,7 @@ __DATA__ --- request GET /apisix/admin/schema/route --- response_body eval -qr/"plugins": \{"type":"object"}/ +qr/"plugins":\{"type":"object"}/ --- no_error_log [error] diff --git a/t/debug-mode.t b/t/debug-mode.t index 16c48720a55c..1d0ca71735b0 100644 --- a/t/debug-mode.t +++ b/t/debug-mode.t @@ -46,6 +46,7 @@ loaded plugin and sort by priority: 1003 name: limit-conn loaded plugin and sort by priority: 1002 name: limit-count loaded plugin and sort by priority: 1001 name: limit-req loaded plugin and sort by priority: 1000 name: node-status +loaded plugin and sort by priority: 506 name: grpc-proxy loaded plugin and sort by priority: 500 name: prometheus loaded plugin and sort by priority: 0 name: example-plugin loaded plugin and sort by priority: -1000 name: zipkin diff --git a/t/plugin/grpc-proxy.t b/t/plugin/grpc-proxy.t new file mode 100644 index 000000000000..7409ca248061 --- /dev/null +++ b/t/plugin/grpc-proxy.t @@ -0,0 +1,149 @@ +BEGIN { + if ($ENV{TEST_NGINX_CHECK_LEAK}) { + $SkipReason = "unavailable for the hup tests"; + + } else { + $ENV{TEST_NGINX_USE_HUP} = 1; + undef $ENV{TEST_NGINX_USE_STAP}; + } +} + +use t::APISix 'no_plan'; + +repeat_each(1); +no_long_string(); +no_shuffle(); +no_root_location(); +log_level('debug'); + +run_tests; + +__DATA__ + +=== TEST 1: set proto(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/proto/1', + ngx.HTTP_PUT, + [[{ + "content" : "syntax = \"proto3\"; + package helloworld; + service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} + } + message HelloRequest { + string name = 1; + } + message HelloReply { + string message = 1; + }" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: set routes(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "uri": "/grpctest", + "service_protocol": "grpc", + "plugins": { + "grpc-proxy": { + "proto_id": "1", + "service": "helloworld.Greeter", + "method": "SayHello" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:50051": 1 + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + +=== TEST 3: hit route +--- request +GET /grpctest +--- response_body eval +qr/\{"message":"Hello "\}/ +--- no_error_log +[error] + + +=== TEST 4: wrong service protocol +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "methods": ["GET"], + "uri": "/grpctest", + "service_protocol": "asf", + "plugins": { + "grpc-proxy": { + "proto_id": "1", + "service": "helloworld.Greeter", + "method": "SayHello" + } + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:50051": 1 + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- error_code: 400 +--- no_error_log +[error] + +