Skip to content

Commit

Permalink
feat(kafka-logger): supports logging request body (#5501)
Browse files Browse the repository at this point in the history
Co-authored-by: windyrjc <windyrjc@formail.com>
Co-authored-by: yundian <yundian@mogu.com>
  • Loading branch information
3 people authored Nov 18, 2021
1 parent 2c12b36 commit 5394dce
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 7 deletions.
20 changes: 20 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local expr = require("resty.expr.v1")

local math = math
local pairs = pairs
Expand Down Expand Up @@ -74,6 +75,16 @@ local schema = {
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
include_req_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -98,6 +109,15 @@ local _M = {


function _M.check_schema(conf, schema_type)

if conf.include_req_body_expr then
local ok, err = expr.new(conf.include_req_body_expr)
if not ok then
return nil,
{error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
end
end

if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
Expand Down
38 changes: 31 additions & 7 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
local expr = require("resty.expr.v1")
local ngx = ngx
local pairs = pairs
local str_byte = string.byte
Expand Down Expand Up @@ -119,13 +120,36 @@ local function get_full_log(ngx, conf)
}

if conf.include_req_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file

local log_request_body = true

if conf.include_req_body_expr then

if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate log expr err ' .. err)
return log
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
log_request_body = false
end
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
| include_req_body_expr | array | optional | | | Whether to logging request body, based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|

### examples of meta_format
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ title: kafka-logger
| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
| include_req_body_expr | array | 可选 | | | 是否采集请求body, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`|
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|

### meta_format 参考示例
Expand Down
79 changes: 79 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,82 @@ GET /t
--- error_log_like eval
qr/create new kafka producer instance, brokers: \[\{"port":9092,"host":"127.0.0.127"}]/
qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/



=== TEST 26: set route(id: 1,include_req_body = true,include_req_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[=[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_req_body": true,
"include_req_body_expr": [
[
"arg_name",
"==",
"qwerty"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]=]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 27: hit route, expr eval success
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- no_error_log
[error]
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2



=== TEST 28: hit route,expr eval fail
--- request
POST /hello?name=zcxv
abcdef
--- response_body
hello world
--- no_error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2

0 comments on commit 5394dce

Please sign in to comment.