From 613c19efdc1646de487b4f819fcea2b85c2a4a27 Mon Sep 17 00:00:00 2001 From: windyrjc Date: Sun, 14 Nov 2021 11:31:46 +0800 Subject: [PATCH 01/10] feat(kafka-logger) kafka logger supports logging request body (#5343) --- apisix/plugins/kafka-logger.lua | 9 +++ apisix/utils/log-util.lua | 34 ++++++++-- docs/en/latest/plugins/kafka-logger.md | 1 + docs/zh/latest/plugins/kafka-logger.md | 1 + t/plugin/kafka-logger.t | 94 ++++++++++++++++++++++++++ 5 files changed, 132 insertions(+), 7 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index b680bd4c36f6..daac79bf865b 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -74,6 +74,15 @@ local schema = { inactive_timeout = {type = "integer", minimum = 1, default = 5}, batch_max_size = {type = "integer", minimum = 1, default = 1000}, include_req_body = {type = "boolean", default = false}, + request_body_expr = { + type = "array", + items = { + type = "array", + items = { + type = "string" + } + } + }, -- in lua-resty-kafka, cluster_name is defined as number -- see https://github.com/doujiang24/lua-resty-kafka#new-1 cluster_name = {type = "integer", minimum = 1, default = 1}, diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index 10620d1b7387..c96e52c7d91b 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -15,6 +15,7 @@ -- limitations under the License. -- local core = require("apisix.core") +local expr = require("resty.expr.v1") local ngx = ngx local pairs = pairs local str_byte = string.byte @@ -119,13 +120,32 @@ local function get_full_log(ngx, conf) } if conf.include_req_body then - local body = req_get_body_data() - if body then - log.request.body = body - else - local body_file = ngx.req.get_body_file() - if body_file then - log.request.body_file = body_file + + local log_request_body = true + + if conf.request_body_expr then + local request_expr, err = expr.new(conf.request_body_expr) + if not request_expr then + core.log.error('generate log expr err ' .. err) + end + conf.request_expr = request_expr + + local result = conf.request_expr:eval(ctx.var) + + if not result then + log_request_body = false + end + end + + if log_request_body then + local body = req_get_body_data() + if body then + log.request.body = body + else + local body_file = ngx.req.get_body_file() + if body_file then + log.request.body_file = body_file + end end end end diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index f19a693a4f38..1043f8a11d9f 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -58,6 +58,7 @@ For more info on Batch-Processor in Apache APISIX please refer. | retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. | | cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.| +| request_body_expr | object | optional | | | Whether to logging request body,based on [lua-resty-expr](https://github.com/api7/lua-resty-expr). | ### examples of meta_format diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 75cadf18aaec..3efb5cb6b528 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -58,6 +58,7 @@ title: kafka-logger | retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | | include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。| | cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。| +| request_body_expr | array | 可选 | | | 是否采集请求body,基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 | ### meta_format 参考示例 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 51876d30658a..7ece5ee80fc0 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1114,3 +1114,97 @@ GET /t --- error_log_like eval qr/create new kafka producer instance, brokers: \[\{"port":9092,"host":"127.0.0.127"}]/ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ + + +=== TEST 26: set route(id: 1,include_req_body = true,request_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test1", + "key" : "key1", + "timeout" : 1, + "include_req_body": true, + "request_body_expr": [ + [ + "remote_addr", + "==", + "127.0.0.1" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test1", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + +=== TEST 27: hit route, report log to kafka +--- request +POST /hello +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- wait: 2 +--- wait: 2 + From acb54e3fb7252539a6f3e007ec0d4197a300f139 Mon Sep 17 00:00:00 2001 From: windyrjc Date: Sun, 14 Nov 2021 20:06:49 +0800 Subject: [PATCH 02/10] reindex --- t/plugin/kafka-logger.t | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 7ece5ee80fc0..525f7d1bf7d2 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1116,6 +1116,7 @@ qr/create new kafka producer instance, brokers: \[\{"port":9092,"host":"127.0.0. qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ + === TEST 26: set route(id: 1,include_req_body = true,request_body_expr = array) --- config location /t { @@ -1195,6 +1196,7 @@ passed [error] + === TEST 27: hit route, report log to kafka --- request POST /hello @@ -1207,4 +1209,3 @@ hello world qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 --- wait: 2 - From fb04109a13c5eadb41275345e297eedf78721444 Mon Sep 17 00:00:00 2001 From: windyrjc Date: Sun, 14 Nov 2021 21:02:39 +0800 Subject: [PATCH 03/10] optimize --- apisix/plugins/kafka-logger.lua | 2 +- apisix/utils/log-util.lua | 14 +++++++++----- t/plugin/kafka-logger.t | 5 ++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index daac79bf865b..9881c48531a3 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -74,7 +74,7 @@ local schema = { inactive_timeout = {type = "integer", minimum = 1, default = 5}, batch_max_size = {type = "integer", minimum = 1, default = 1000}, include_req_body = {type = "boolean", default = false}, - request_body_expr = { + include_req_body_expr = { type = "array", items = { type = "array", diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index c96e52c7d91b..9216ba50cb97 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -123,12 +123,16 @@ local function get_full_log(ngx, conf) local log_request_body = true - if conf.request_body_expr then - local request_expr, err = expr.new(conf.request_body_expr) - if not request_expr then - core.log.error('generate log expr err ' .. err) + if conf.include_req_body_expr then + + if not conf.request_expr then + local request_expr, err = expr.new(conf.include_req_body_expr) + if not request_expr then + core.log.error('generate log expr err ' .. err) + return log + end + conf.request_expr = request_expr end - conf.request_expr = request_expr local result = conf.request_expr:eval(ctx.var) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 525f7d1bf7d2..94c0df82f252 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1131,7 +1131,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ { "127.0.0.1":9092 }, - "kafka_topic" : "test1", + "kafka_topic" : "test", "key" : "key1", "timeout" : 1, "include_req_body": true, @@ -1162,7 +1162,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ { "127.0.0.1":9092 }, - "kafka_topic" : "test1", + "kafka_topic" : "test", "key" : "key1", "timeout" : 1, "batch_max_size": 1 @@ -1208,4 +1208,3 @@ hello world --- error_log_like eval qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 ---- wait: 2 From d922ffd5641a36346b3d1ebfeca524b0326c9d6e Mon Sep 17 00:00:00 2001 From: windyrjc Date: Sun, 14 Nov 2021 22:30:27 +0800 Subject: [PATCH 04/10] change test kafka topic test2 --- t/plugin/kafka-logger.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 94c0df82f252..be44b187de69 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1131,7 +1131,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ { "127.0.0.1":9092 }, - "kafka_topic" : "test", + "kafka_topic" : "test2", "key" : "key1", "timeout" : 1, "include_req_body": true, @@ -1162,7 +1162,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ { "127.0.0.1":9092 }, - "kafka_topic" : "test", + "kafka_topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1 From 3e867eae04962f4799dc6685d8ceb1f41307267b Mon Sep 17 00:00:00 2001 From: windyrjc Date: Mon, 15 Nov 2021 11:09:28 +0800 Subject: [PATCH 05/10] optimize --- docs/en/latest/plugins/kafka-logger.md | 2 +- docs/zh/latest/plugins/kafka-logger.md | 2 +- t/plugin/kafka-logger.t | 108 ++++++++++++++++++++++++- 3 files changed, 108 insertions(+), 4 deletions(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 1043f8a11d9f..ebf5c7a8e4e7 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -57,8 +57,8 @@ For more info on Batch-Processor in Apache APISIX please refer. | max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | | retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. | +| include_req_body_expr | array | optional | | | Whether to logging request body,based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. | | cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.| -| request_body_expr | object | optional | | | Whether to logging request body,based on [lua-resty-expr](https://github.com/api7/lua-resty-expr). | ### examples of meta_format diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 3efb5cb6b528..76f0e8d4ca0c 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -57,8 +57,8 @@ title: kafka-logger | max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | | retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | | include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。| +| include_req_body_expr | array | 可选 | | | 是否采集请求body,基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`| | cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。| -| request_body_expr | array | 可选 | | | 是否采集请求body,基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 | ### meta_format 参考示例 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index be44b187de69..50f48afd7d46 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1117,7 +1117,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ -=== TEST 26: set route(id: 1,include_req_body = true,request_body_expr = array) +=== TEST 26: set route(id: 1,include_req_body = true,include_req_body_expr = array) --- config location /t { content_by_lua_block { @@ -1135,7 +1135,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ "key" : "key1", "timeout" : 1, "include_req_body": true, - "request_body_expr": [ + "include_req_body_expr": [ [ "remote_addr", "==", @@ -1208,3 +1208,107 @@ hello world --- error_log_like eval qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 + + + +=== TEST 28: hit route, not trigger request_body_expr rule +--- request +GET /hello +--- response_body +hello world +--- no_error_log eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 29: set route(id: 1,include_req_body = true,include_req_body_expr = array) eval false +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_req_body": true, + "include_req_body_expr": [ + [ + "remote_addr", + "==", + "0.0.0.0" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 30: hit route,not trigger request_body_expr rule eval false +--- request +POST /hello +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to kafka: \{.*"body":"abcdef"/ +--- wait: 2 From f69147131b5c9c22f13f0f5d56764901f4222b9c Mon Sep 17 00:00:00 2001 From: windyrjc Date: Tue, 16 Nov 2021 09:53:08 +0800 Subject: [PATCH 06/10] optimize test --- t/plugin/kafka-logger.t | 135 +++------------------------------------- 1 file changed, 7 insertions(+), 128 deletions(-) diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 50f48afd7d46..7a66eaa7ff95 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1137,9 +1137,9 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ "include_req_body": true, "include_req_body_expr": [ [ - "remote_addr", + "arg_name", "==", - "127.0.0.1" + "qwerty" ] ], "batch_max_size": 1 @@ -1152,33 +1152,6 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ "type": "roundrobin" }, "uri": "/hello" - }]], - [[{ - "node": { - "value": { - "plugins": { - "kafka-logger": { - "broker_list" : - { - "127.0.0.1":9092 - }, - "kafka_topic" : "test2", - "key" : "key1", - "timeout" : 1, - "batch_max_size": 1 - } - }, - "upstream": { - "nodes": { - "127.0.0.1:1980": 1 - }, - "type": "roundrobin" - }, - "uri": "/hello" - }, - "key": "/apisix/routes/1" - }, - "action": "set" }]] ) if code >= 300 then @@ -1196,116 +1169,22 @@ passed [error] - -=== TEST 27: hit route, report log to kafka +=== TEST 27: hit route, expr eval success --- request -POST /hello +POST /hello?name=qwerty abcdef --- response_body hello world --- no_error_log [error] ---- error_log_like eval -qr/send data to kafka: \{.*"body":"abcdef"/ ---- wait: 2 - - - -=== TEST 28: hit route, not trigger request_body_expr rule ---- request -GET /hello ---- response_body -hello world ---- no_error_log eval +--- error_log eval qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 - -=== TEST 29: set route(id: 1,include_req_body = true,include_req_body_expr = array) eval false ---- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/routes/1', - ngx.HTTP_PUT, - [[{ - "plugins": { - "kafka-logger": { - "broker_list" : - { - "127.0.0.1":9092 - }, - "kafka_topic" : "test2", - "key" : "key1", - "timeout" : 1, - "include_req_body": true, - "include_req_body_expr": [ - [ - "remote_addr", - "==", - "0.0.0.0" - ] - ], - "batch_max_size": 1 - } - }, - "upstream": { - "nodes": { - "127.0.0.1:1980": 1 - }, - "type": "roundrobin" - }, - "uri": "/hello" - }]], - [[{ - "node": { - "value": { - "plugins": { - "kafka-logger": { - "broker_list" : - { - "127.0.0.1":9092 - }, - "kafka_topic" : "test2", - "key" : "key1", - "timeout" : 1, - "batch_max_size": 1 - } - }, - "upstream": { - "nodes": { - "127.0.0.1:1980": 1 - }, - "type": "roundrobin" - }, - "uri": "/hello" - }, - "key": "/apisix/routes/1" - }, - "action": "set" - }]] - ) - if code >= 300 then - ngx.status = code - end - ngx.say(body) - } - } - ---- request -GET /t ---- response_body -passed ---- no_error_log -[error] ---- wait: 2 - - - -=== TEST 30: hit route,not trigger request_body_expr rule eval false +=== TEST 28: hit route,expr eval fail --- request -POST /hello +POST /hello?name=zcxv abcdef --- response_body hello world From d82854c8779dc557f903c6accea7c7f66c02d3e3 Mon Sep 17 00:00:00 2001 From: windyrjc Date: Tue, 16 Nov 2021 14:38:36 +0800 Subject: [PATCH 07/10] lint --- docs/en/latest/plugins/kafka-logger.md | 2 +- docs/zh/latest/plugins/kafka-logger.md | 2 +- t/plugin/kafka-logger.t | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index ebf5c7a8e4e7..fd6fc2495f0f 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -57,7 +57,7 @@ For more info on Batch-Processor in Apache APISIX please refer. | max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | | retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. | -| include_req_body_expr | array | optional | | | Whether to logging request body,based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. | +| include_req_body_expr | array | optional | | | Whether to logging request body, based on [lua-resty-expr](https://github.com/api7/lua-resty-expr), this option require to turn on `include_req_body` option. | | cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.| ### examples of meta_format diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 76f0e8d4ca0c..fc2204fb31f8 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -57,7 +57,7 @@ title: kafka-logger | max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | | retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | | include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。| -| include_req_body_expr | array | 可选 | | | 是否采集请求body,基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`| +| include_req_body_expr | array | 可选 | | | 是否采集请求body, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_req_body`| | cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。| ### meta_format 参考示例 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 7a66eaa7ff95..c7551bafb3c3 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1169,6 +1169,7 @@ passed [error] + === TEST 27: hit route, expr eval success --- request POST /hello?name=qwerty @@ -1182,6 +1183,7 @@ qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 + === TEST 28: hit route,expr eval fail --- request POST /hello?name=zcxv From a017631732b66e1e533d6cb3eda764f2b7cb9a73 Mon Sep 17 00:00:00 2001 From: windyrjc Date: Tue, 16 Nov 2021 18:20:07 +0800 Subject: [PATCH 08/10] optimize --- apisix/plugins/kafka-logger.lua | 10 ++++++++++ t/plugin/kafka-logger.t | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 9881c48531a3..85aa1c47899a 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -19,6 +19,7 @@ local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") +local expr = require("resty.expr.v1") local math = math local pairs = pairs @@ -76,6 +77,7 @@ local schema = { include_req_body = {type = "boolean", default = false}, include_req_body_expr = { type = "array", + minItems = 1, items = { type = "array", items = { @@ -107,6 +109,14 @@ local _M = { function _M.check_schema(conf, schema_type) + + if conf.vars then + local ok, err = expr.new(conf.vars) + if not ok then + return nil, {error_msg = "failed to validate the 'vars' expression: " .. err} + end + end + if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index c7551bafb3c3..5094910f37ea 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1124,7 +1124,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ local t = require("lib.test_admin").test local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, - [[{ + [=[{ "plugins": { "kafka-logger": { "broker_list" : @@ -1152,7 +1152,7 @@ qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/ "type": "roundrobin" }, "uri": "/hello" - }]] + }]=] ) if code >= 300 then ngx.status = code From 7ea1b172628eb778f527f686118641c3303b57b3 Mon Sep 17 00:00:00 2001 From: yundian Date: Wed, 17 Nov 2021 19:10:38 +0800 Subject: [PATCH 09/10] optimize --- apisix/plugins/kafka-logger.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 85aa1c47899a..e6543099846e 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -110,10 +110,10 @@ local _M = { function _M.check_schema(conf, schema_type) - if conf.vars then - local ok, err = expr.new(conf.vars) + if conf.include_req_body_expr then + local ok, err = expr.new(conf.include_req_body_expr) if not ok then - return nil, {error_msg = "failed to validate the 'vars' expression: " .. err} + return nil, {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err} end end From d31a5ab5ea271a6fdf468c78dd5c595e92d480b2 Mon Sep 17 00:00:00 2001 From: yundian Date: Wed, 17 Nov 2021 21:54:16 +0800 Subject: [PATCH 10/10] lint --- apisix/plugins/kafka-logger.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index e6543099846e..f045c3958e15 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -113,7 +113,8 @@ function _M.check_schema(conf, schema_type) if conf.include_req_body_expr then local ok, err = expr.new(conf.include_req_body_expr) if not ok then - return nil, {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err} + return nil, + {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err} end end