Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xrpc): register variable rpc_time #7040

Merged
merged 7 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apisix/core/ctx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,12 @@ do
else
local getter = apisix_var_names[key]
if getter then
local ctx = t._ctx
if getter == true then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx[key]
val = ctx and ctx[key]
else
-- the getter is registered by ctx.register_var
val = getter(ngx.ctx.api_ctx)
val = getter(ctx)
end

else
Expand Down Expand Up @@ -341,6 +342,7 @@ function _M.set_vars_meta(ctx)
end

var._request = get_request()
var._ctx = ctx
setmetatable(var, mt)
ctx.var = var
end
Expand Down
8 changes: 6 additions & 2 deletions apisix/stream/xrpc/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ local pcall = pcall
local ipairs = ipairs
local tostring = tostring


core.ctx.register_var("rpc_time", function(ctx)
return ctx._rpc_end_time - ctx._rpc_start_time
end)

local logger_expr_cache = core.lrucache.new({
ttl = 300, count = 1024
})
Expand Down Expand Up @@ -72,7 +77,7 @@ local function filter_logger(ctx, logger)
core.log.error("failed to validate the 'filter' expression: ", err)
return false
end
return filter_expr:eval(ctx)
return filter_expr:eval(ctx.var)
end


Expand All @@ -93,7 +98,6 @@ end

local function finialize_req(protocol, session, ctx)
ctx._rpc_end_time = ngx_now()

local loggers = session.route.protocol.logger
if loggers and #loggers > 0 then
for _, logger in ipairs(loggers) do
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/apisix-variable.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ List in alphabetical order:
| route_name | core | name of `route` | |
| service_id | core | id of `service` | |
| service_name | core | name of `service` | |
| rpc_time | xRPC | time spent at the rpc request level | |

You can also [register your own variable](./plugin-develop.md#register-custom-variable).
41 changes: 41 additions & 0 deletions docs/en/latest/xrpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,47 @@ One specifies the `superior_id`, whose corresponding value is the ID of another

For example, for the Dubbo RPC protocol, the subordinate route is matched based on the service_name and other parameters configured in the route and the actual service_name brought in the request. If the match is successful, the configuration above the subordinate route is used, otherwise the configuration of the superior route is still used. In the above example, if the match for route 2 is successful, it will be forwarded to upstream 2; otherwise, it will still be forwarded to upstream 1.

### Log Reporting

xRPC supports logging-related functions. You can use this feature to filter requests that require attention, such as high latency, excessive transfer content, etc.

Each logger item configuration parameter will contain

- name: the Logger plugin name,
- filter: the prerequisites for the execution of the logger plugin(e.g., request processing time exceeding a given value),
- conf: the configuration of the logger plugin itself.

The following configuration is an example:

```json
{
...
"protocol": {
"name": "redis",
"logger": {
{
"name": "syslog",
"filter": [
["rpc_time", ">=", 0.01]
],
"conf": {
"host": "127.0.0.1",
"port": 8125,
}
}
}
}
}
```

This configuration means that when the `rpc_time` is greater than 0.01 seconds, xPRC reports the request log to the log server via the `syslog` plugin. `conf` is the configuration of the logging server required by the `syslog` plugin.

Unlike standard TCP proxies, which only execute a logger when the connection is closed, xRPC's executed logger at the end of each 'request'.

The protocol itself defines the granularity of the specific request, and the xRPC extension code implements the request's granularity.

For example, in the Redis protocol, the execution of a command is considered a request.

## How to write your own protocol

Assuming that your protocol is named `my_proto`, you need to create a directory that can be introduced by `require "apisix.stream.xrpc.protocols.my_proto"`.
Expand Down
4 changes: 4 additions & 0 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ local DONE = ngx.DONE
local str_byte = string.byte


core.ctx.register_var("rpc_len", function(ctx)
return ctx.len
end)

local _M = {}
local router_version
local router
Expand Down
20 changes: 10 additions & 10 deletions t/xrpc/pingpong2.t
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ failed to validate the 'filter' expression: rule too short
{
name = "syslog",
filter = {
{"len", ">", 10}
{"rpc_len", ">", 10}
},
conf = {}
}
Expand Down Expand Up @@ -272,7 +272,7 @@ log filter: syslog filter result: true
{
name = "syslog",
filter = {
{"len", "<", 10}
{"rpc_len", "<", 10}
},
conf = {}
}
Expand Down Expand Up @@ -324,8 +324,8 @@ log filter: syslog filter result: false
{
name = "syslog",
filter = {
{"len", ">", 12},
{"len", "<", 14}
{"rpc_len", ">", 12},
{"rpc_len", "<", 14}
},
conf = {}
}
Expand Down Expand Up @@ -377,8 +377,8 @@ log filter: syslog filter result: true
{
name = "syslog",
filter = {
{"len", "<", 10},
{"len", ">", 12}
{"rpc_len", "<", 10},
{"rpc_len", ">", 12}
},
conf = {}
}
Expand Down Expand Up @@ -516,7 +516,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
{
name = "syslog",
filter = {
{"len", ">", 10}
{"rpc_len", ">", 10}
},
conf = {
host = "127.0.0.1",
Expand Down Expand Up @@ -576,7 +576,7 @@ qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
{
name = "syslog",
filter = {
{"len", ">", 10}
{"rpc_len", ">", 10}
},
conf = {
host = "127.0.0.1",
Expand Down Expand Up @@ -650,7 +650,7 @@ unlock with key xrpc-pingpong-logger#table
{
name = "syslog",
filter = {
{"len", ">", 10}
{"rpc_len", ">", 10}
},
conf = {
host = "127.0.0.1",
Expand Down Expand Up @@ -698,7 +698,7 @@ unlock with key xrpc-pingpong-logger#table
{
name = "syslog",
filter = {
{"len", ">", 10}
{"rpc_len", ">", 10}
},
conf = {
host = "127.0.0.1",
Expand Down
193 changes: 193 additions & 0 deletions t/xrpc/pingpong3.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
use t::APISIX;

my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
my $version = eval { `$nginx_binary -V 2>&1` };

if ($version !~ m/\/apisix-nginx-module/) {
plan(skip_all => "apisix-nginx-module not installed");
} else {
plan('no_plan');
}

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->extra_yaml_config) {
my $extra_yaml_config = <<_EOC_;
xrpc:
protocols:
- name: pingpong
_EOC_
$block->set_value("extra_yaml_config", $extra_yaml_config);
}

my $config = $block->config // <<_EOC_;
location /t {
content_by_lua_block {
ngx.req.read_body()
local sock = ngx.socket.tcp()
sock:settimeout(1000)
local ok, err = sock:connect("127.0.0.1", 1985)
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return ngx.exit(503)
end

local bytes, err = sock:send(ngx.req.get_body_data())
if not bytes then
ngx.log(ngx.ERR, "send stream request error: ", err)
return ngx.exit(503)
end
while true do
local data, err = sock:receiveany(4096)
if not data then
sock:close()
break
end
ngx.print(data)
end
}
}
_EOC_

$block->set_value("config", $config);

my $stream_upstream_code = $block->stream_upstream_code // <<_EOC_;
local sock = ngx.req.socket(true)
sock:settimeout(10)
while true do
local data = sock:receiveany(4096)
if not data then
return
end
sock:send(data)
end
_EOC_

$block->set_value("stream_upstream_code", $stream_upstream_code);

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]\nRPC is not finished");
}

if (!defined $block->extra_stream_config) {
my $stream_config = <<_EOC_;
server {
listen 8125 udp;
content_by_lua_block {
require("lib.mock_layer4").dogstatsd()
}
}
_EOC_
$block->set_value("extra_stream_config", $stream_config);
}

$block;
});

run_tests;

__DATA__

=== TEST 1: set custom log format
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/plugin_metadata/syslog',
ngx.HTTP_PUT,
[[{
"log_format": {
"rpc_time": "$rpc_time"
}
}]]
)

if code >= 300 then
ngx.status = code
ngx.say(body)
return
end

ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed



=== TEST 2: use vae rpc_time
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
{
protocol = {
name = "pingpong",
logger = {
{
name = "syslog",
filter = {
{"rpc_time", ">=", 0}
},
conf = {
host = "127.0.0.1",
port = 8125,
sock_type = "udp",
batch_max_size = 1,
flush_limit = 1
}
}
}
},
upstream = {
nodes = {
["127.0.0.1:1995"] = 1
},
type = "roundrobin"
}
}
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed



=== TEST 3: verify the data received by the log server
--- request eval
"POST /t
" .
"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
--- stream_conf_enable
--- wait: 0.5
--- error_log eval
qr/message received:.*\"rpc_time\\"\:(0.\d+|0)\}\"/