Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): supports logging request body #5501

Merged
merged 10 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ local schema = {
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
include_req_body_expr = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we can merge it, could you add a check for the expr in the check_schema? Like:

if conf.vars then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

type = "array",
items = {
type = "array",
items = {
type = "string"
}
}
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
include_req_body_expr = {
type = "array",
items = {
type = "array",
items = {
type = "string"
}
}
},
include_req_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},

-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
Expand Down
38 changes: 31 additions & 7 deletions apisix/utils/log-util.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
local expr = require("resty.expr.v1")
local ngx = ngx
local pairs = pairs
local str_byte = string.byte
Expand Down Expand Up @@ -119,13 +120,36 @@ local function get_full_log(ngx, conf)
}

if conf.include_req_body then
spacewander marked this conversation as resolved.
Show resolved Hide resolved
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file

local log_request_body = true

if conf.include_req_body_expr then

if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
core.log.error('generate log expr err ' .. err)
return log
end
conf.request_expr = request_expr
end

local result = conf.request_expr:eval(ctx.var)

if not result then
log_request_body = false
end
end

if log_request_body then
local body = req_get_body_data()
if body then
log.request.body = body
else
local body_file = ngx.req.get_body_file()
if body_file then
log.request.body_file = body_file
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|
| request_body_expr | object | optional | | | Whether to logging request body,based on [lua-resty-expr](https://github.com/api7/lua-resty-expr). |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to move it under include_req_body and explain its relation to include_req_body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


### examples of meta_format

Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ title: kafka-logger
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|
| request_body_expr | array | 可选 | | | 是否采集请求body,基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 |

### meta_format 参考示例

Expand Down
94 changes: 94 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,97 @@ GET /t
--- error_log_like eval
qr/create new kafka producer instance, brokers: \[\{"port":9092,"host":"127.0.0.127"}]/
qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/



=== TEST 26: set route(id: 1,include_req_body = true,request_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[[{
[=[{

"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_req_body": true,
"request_body_expr": [
[
"remote_addr",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use a test case that can eval to false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it with arg_xx. There is no need to use a separate rule.

Copy link
Contributor Author

@windyrjc windyrjc Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not understand what this means, please be more specific?, thank you. 😁

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use arg_blah == 1 in the expression, and hit it with "POST /hello?blah=1" and "POST /hello?blah=2".

Copy link
Contributor Author

@windyrjc windyrjc Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using ctx.var to eval the expr and ctx.var doesn't directly hold args. it only has args table. If you want to achieve this, i need to eval the expr by ctx.var.args

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean to use $arg_ variable. You can refer to

"vars": [["arg_name", "==", "James"]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean to use $arg_ variable. You can refer to

"vars": [["arg_name", "==", "James"]]

done ,thank you very much

"==",
"127.0.0.1"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]],
[[{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the part which is not relative to the test, from L1156 to L1182

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"node": {
"value": {
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
},
"key": "/apisix/routes/1"
},
"action": "set"
}]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}]]
}]=]

)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 27: hit route, report log to kafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test that expr is evaluated to false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

--- request
POST /hello
abcdef
--- response_body
hello world
--- no_error_log
[error]
--- error_log_like eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2