Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): supports logging request body #5501

Merged
merged 10 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local expr = require("resty.expr.v1")

local math = math
local pairs = pairs
Expand Down Expand Up @@ -74,6 +75,16 @@ local schema = {
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
include_req_body_expr = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we can merge it, could you add a check for the expr in the check_schema? Like:

if conf.vars then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand All @@ -98,6 +109,15 @@ local _M = {


function _M.check_schema(conf, schema_type)

if conf.include_req_body_expr then
local ok, err = expr.new(conf.include_req_body_expr)
if not ok then
return nil,
{error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
end
end

if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
Expand Down
38 changes: 31 additions & 7 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
local expr = require("resty.expr.v1")
local ngx = ngx
local pairs = pairs
local str_byte = string.byte
Expand Down Expand Up @@ -119,13 +120,36 @@ local function get_full_log(ngx, conf)
}

if conf.include_req_body then
spacewander marked this conversation as resolved.
Show resolved Hide resolved
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file

local log_request_body = true

if conf.include_req_body_expr then

if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate log expr err ' .. err)
return log
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
log_request_body = false
end
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
| include_req_body_expr | array | optional | | | Whether to logging request body, based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|

### examples of meta_format
Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ title: kafka-logger
| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
| include_req_body_expr | array | 可选 | | | 是否采集请求body, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`|
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|

### meta_format 参考示例
Expand Down
79 changes: 79 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,82 @@ GET /t
--- error_log_like eval
qr/create new kafka producer instance, brokers: \[\{"port":9092,"host":"127.0.0.127"}]/
qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/



=== TEST 26: set route(id: 1,include_req_body = true,include_req_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[=[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_req_body": true,
"include_req_body_expr": [
[
"arg_name",
"==",
"qwerty"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]=]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 27: hit route, expr eval success
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- no_error_log
[error]
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2



=== TEST 28: hit route,expr eval fail
--- request
POST /hello?name=zcxv
abcdef
--- response_body
hello world
--- no_error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate --- wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done