From 52cc863f44bc33267d320255fdf3755bae411166 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 13 May 2022 14:15:42 +0800 Subject: [PATCH 1/6] feat(xrpc): register variable rpc_time --- apisix/stream/xrpc/runner.lua | 17 +- docs/en/latest/apisix-variable.md | 1 + docs/en/latest/xrpc.md | 41 ++++ .../stream/xrpc/protocols/pingpong/init.lua | 9 + t/xrpc/pingpong2.t | 20 +- t/xrpc/pingpong3.t | 193 ++++++++++++++++++ 6 files changed, 269 insertions(+), 12 deletions(-) create mode 100644 t/xrpc/pingpong3.t diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index 8c082a1966af..1fd89b2eb1f5 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -27,6 +27,19 @@ local pcall = pcall local ipairs = ipairs local tostring = tostring + +core.ctx.register_var("rpc_time", function(ctx) + local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx + + if not curr_ctx then + core.log.warn("can't find current context by id: ", curr_ctx_id) + return nil + end + --use second as the unit, like the request_time + local time = (curr_ctx._rpc_end_time * 1000 - curr_ctx._rpc_start_time * 1000) / 1000 + return time +end) + local logger_expr_cache = core.lrucache.new({ ttl = 300, count = 1024 }) @@ -70,7 +83,7 @@ local function filter_logger(ctx, logger) core.log.error("failed to validate the 'filter' expression: ", err) return false end - return filter_expr:eval(ctx) + return filter_expr:eval(ctx.var) end @@ -91,7 +104,7 @@ end local function finialize_req(protocol, session, ctx) ctx._rpc_end_time = ngx_now() - + session._curr_ctx = ctx local loggers = session.route.protocol.logger if loggers and #loggers > 0 then for _, logger in ipairs(loggers) do diff --git a/docs/en/latest/apisix-variable.md b/docs/en/latest/apisix-variable.md index d6254b82048b..c9281731bee6 100644 --- a/docs/en/latest/apisix-variable.md +++ b/docs/en/latest/apisix-variable.md @@ -39,5 +39,6 @@ List in alphabetical order: | route_name | core | name of `route` | | | service_id | core | id of `service` | | | service_name | core | name of `service` | | +| rpc_time | xRPC | time spent at the rpc request level | | You can also [register your own variable](./plugin-develop.md#register-custom-variable). diff --git a/docs/en/latest/xrpc.md b/docs/en/latest/xrpc.md index 1fb2a515933a..9ad486e83063 100644 --- a/docs/en/latest/xrpc.md +++ b/docs/en/latest/xrpc.md @@ -131,6 +131,47 @@ One specifies the `superior_id`, whose corresponding value is the ID of another For example, for the Dubbo RPC protocol, the subordinate route is matched based on the service_name and other parameters configured in the route and the actual service_name brought in the request. If the match is successful, the configuration above the subordinate route is used, otherwise the configuration of the superior route is still used. In the above example, if the match for route 2 is successful, it will be forwarded to upstream 2; otherwise, it will still be forwarded to upstream 1. +### Log Reporting + +xRPC supports logging-related functions. You can use this feature to filter requests that require attention, such as high latency, excessive transfer content, etc. + +Each logger item configuration parameter will contain + +- name: the Logger plugin name, +- filter: the prerequisites for the execution of the logger plugin(e.g., request processing time exceeding a given value), +- conf: the configuration of the logger plugin itself. + + The following configuration is an example: + +```json +{ + ... + "protocol": { + "name": "redis", + "logger": { + { + "name": "syslog", + "filter": [ + ["rpc_time", ">=", 0.01] + ], + "conf": { + "host": "127.0.0.1", + "port": 8125, + } + } + } + } +} +``` + +This configuration means that when the `rpc_time` is greater than 0.01 seconds, xPRC reports the request log to the log server via the `syslog` plugin. `conf` is the configuration of the logging server required by the `syslog` plugin. + +Unlike standard TCP proxies, which only execute a logger when the connection is closed, xRPC's executed logger at the end of each 'request'. + +The protocol itself defines the granularity of the specific request, and the xRPC extension code implements the request's granularity. + +For example, in the Redis protocol, the execution of a command is considered a request. + ## How to write your own protocol Assuming that your protocol is named `my_proto`, you need to create a directory that can be introduced by `require "apisix.stream.xrpc.protocols.my_proto"`. diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 013725832480..fd6d31d3333c 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -29,6 +29,15 @@ local DONE = ngx.DONE local str_byte = string.byte +core.ctx.register_var("rpc_len", function(ctx) + local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx + if not curr_ctx then + core.log.warn("can't find current context by id: ", curr_ctx_id) + return nil + end + return curr_ctx.len +end) + local _M = {} local router_version local router diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t index 93c57bd27576..cdcf367c2f09 100644 --- a/t/xrpc/pingpong2.t +++ b/t/xrpc/pingpong2.t @@ -220,7 +220,7 @@ failed to validate the 'filter' expression: rule too short { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = {} } @@ -272,7 +272,7 @@ log filter: syslog filter result: true { name = "syslog", filter = { - {"len", "<", 10} + {"rpc_len", "<", 10} }, conf = {} } @@ -324,8 +324,8 @@ log filter: syslog filter result: false { name = "syslog", filter = { - {"len", ">", 12}, - {"len", "<", 14} + {"rpc_len", ">", 12}, + {"rpc_len", "<", 14} }, conf = {} } @@ -377,8 +377,8 @@ log filter: syslog filter result: true { name = "syslog", filter = { - {"len", "<", 10}, - {"len", ">", 12} + {"rpc_len", "<", 10}, + {"rpc_len", ">", 12} }, conf = {} } @@ -516,7 +516,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -576,7 +576,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/ { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -650,7 +650,7 @@ unlock with key xrpc-pingpong-logger#table { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", @@ -698,7 +698,7 @@ unlock with key xrpc-pingpong-logger#table { name = "syslog", filter = { - {"len", ">", 10} + {"rpc_len", ">", 10} }, conf = { host = "127.0.0.1", diff --git a/t/xrpc/pingpong3.t b/t/xrpc/pingpong3.t new file mode 100644 index 000000000000..c6d98810d656 --- /dev/null +++ b/t/xrpc/pingpong3.t @@ -0,0 +1,193 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX; + +my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx'; +my $version = eval { `$nginx_binary -V 2>&1` }; + +if ($version !~ m/\/apisix-nginx-module/) { + plan(skip_all => "apisix-nginx-module not installed"); +} else { + plan('no_plan'); +} + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +xrpc: + protocols: + - name: pingpong +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } + + my $config = $block->config // <<_EOC_; + location /t { + content_by_lua_block { + ngx.req.read_body() + local sock = ngx.socket.tcp() + sock:settimeout(1000) + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + local bytes, err = sock:send(ngx.req.get_body_data()) + if not bytes then + ngx.log(ngx.ERR, "send stream request error: ", err) + return ngx.exit(503) + end + while true do + local data, err = sock:receiveany(4096) + if not data then + sock:close() + break + end + ngx.print(data) + end + } + } +_EOC_ + + $block->set_value("config", $config); + + my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_; + local sock = ngx.req.socket(true) + sock:settimeout(10) + while true do + local data = sock:receiveany(4096) + if not data then + return + end + sock:send(data) + end +_EOC_ + + $block->set_value("stream_upstream_code", $stream_upstream_code); + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]\nRPC is not finished"); + } + + if (!defined $block->extra_stream_config) { + my $stream_config = <<_EOC_; + server { + listen 8125 udp; + content_by_lua_block { + require("lib.mock_layer4").dogstatsd() + } + } +_EOC_ + $block->set_value("extra_stream_config", $stream_config); + } + + $block; +}); + +run_tests; + +__DATA__ + +=== TEST 1: set custom log format +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/syslog', + ngx.HTTP_PUT, + [[{ + "log_format": { + "rpc_time": "$rpc_time" + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: use vae rpc_time +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "pingpong", + logger = { + { + name = "syslog", + filter = { + {"rpc_time", ">=", 0} + }, + conf = { + host = "127.0.0.1", + port = 8125, + sock_type = "udp", + batch_max_size = 1, + flush_limit = 1 + } + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:1995"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 3: verify the data received by the log server +--- request eval +"POST /t +" . +"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- wait: 0.5 +--- error_log eval +qr/message received:.*\"rpc_time\\"\:(0.\d+|0)\}\"/ From 5f7d5195b29bce994f4b3d6043c42641092ad7fa Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 13 May 2022 14:17:16 +0800 Subject: [PATCH 2/6] resolve code review --- apisix/stream/xrpc/runner.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index 1fd89b2eb1f5..b652775b2626 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -36,7 +36,7 @@ core.ctx.register_var("rpc_time", function(ctx) return nil end --use second as the unit, like the request_time - local time = (curr_ctx._rpc_end_time * 1000 - curr_ctx._rpc_start_time * 1000) / 1000 + local time = curr_ctx._rpc_end_time - curr_ctx._rpc_start_time return time end) From 8dbe602b02b8bc841fd09387935523230be8783d Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 13 May 2022 14:25:35 +0800 Subject: [PATCH 3/6] fix code lint --- apisix/stream/xrpc/runner.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index b652775b2626..298c50ad9303 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -32,7 +32,7 @@ core.ctx.register_var("rpc_time", function(ctx) local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx if not curr_ctx then - core.log.warn("can't find current context by id: ", curr_ctx_id) + core.log.warn("can't find current context") return nil end --use second as the unit, like the request_time From 3fe9de768d68ea741dd9e3f31ea41bbf6b24053a Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 13 May 2022 14:25:55 +0800 Subject: [PATCH 4/6] fix code lint --- t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index fd6d31d3333c..28fa4abc3592 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -32,7 +32,7 @@ local str_byte = string.byte core.ctx.register_var("rpc_len", function(ctx) local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx if not curr_ctx then - core.log.warn("can't find current context by id: ", curr_ctx_id) + core.log.warn("can't find current context") return nil end return curr_ctx.len From 585683d7f71333420c930acb82811917b6d7fdc2 Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Fri, 13 May 2022 23:46:42 +0800 Subject: [PATCH 5/6] resolve code review --- apisix/core/ctx.lua | 6 ++++-- apisix/stream/xrpc/runner.lua | 10 +--------- t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua | 7 +------ 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua index df6e430ce396..5bf3daa5743e 100644 --- a/apisix/core/ctx.lua +++ b/apisix/core/ctx.lua @@ -278,11 +278,12 @@ do else local getter = apisix_var_names[key] if getter then + local ctx = t._ctx if getter == true then - val = ngx.ctx.api_ctx and ngx.ctx.api_ctx[key] + val = ctx and ctx[key] else -- the getter is registered by ctx.register_var - val = getter(ngx.ctx.api_ctx) + val = getter(ctx) end else @@ -341,6 +342,7 @@ function _M.set_vars_meta(ctx) end var._request = get_request() + var._ctx = ctx setmetatable(var, mt) ctx.var = var end diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index 298c50ad9303..f88d8ac859fd 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -29,15 +29,7 @@ local tostring = tostring core.ctx.register_var("rpc_time", function(ctx) - local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx - - if not curr_ctx then - core.log.warn("can't find current context") - return nil - end - --use second as the unit, like the request_time - local time = curr_ctx._rpc_end_time - curr_ctx._rpc_start_time - return time + return ctx._rpc_end_time - ctx._rpc_start_time end) local logger_expr_cache = core.lrucache.new({ diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 28fa4abc3592..3ea0c7eaf3a1 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -30,12 +30,7 @@ local str_byte = string.byte core.ctx.register_var("rpc_len", function(ctx) - local curr_ctx = ctx.xrpc_session and ctx.xrpc_session._curr_ctx - if not curr_ctx then - core.log.warn("can't find current context") - return nil - end - return curr_ctx.len + return ctx.len end) local _M = {} From 2ad0418e038a1d37955af7d543c5617bac68a71d Mon Sep 17 00:00:00 2001 From: tzssangglass Date: Mon, 16 May 2022 09:52:58 +0800 Subject: [PATCH 6/6] resolve core review --- apisix/stream/xrpc/runner.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index f88d8ac859fd..072f7051567e 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -96,7 +96,6 @@ end local function finialize_req(protocol, session, ctx) ctx._rpc_end_time = ngx_now() - session._curr_ctx = ctx local loggers = session.route.protocol.logger if loggers and #loggers > 0 then for _, logger in ipairs(loggers) do