Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brokers field to support set broker with the same host in kafka-logger plugin #7999

Merged
merged 9 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ local schema = {
default = "default",
enum = {"default", "origin"},
},
-- deprecated, use "brokers" instead
broker_list = {
type = "object",
minProperties = 1,
Expand All @@ -51,6 +52,27 @@ local schema = {
},
},
},
brokers = {
tokers marked this conversation as resolved.
Show resolved Hide resolved
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
},
required = {"host", "port"},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
Expand Down Expand Up @@ -89,7 +111,10 @@ local schema = {
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1}
},
required = {"broker_list", "kafka_topic"}
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
{ required = {"brokers", "kafka_topic"},},
}
}

local metadata_schema = {
Expand Down Expand Up @@ -199,15 +224,17 @@ function _M.log(conf, ctx)
end

-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
local broker_list = core.table.clone(conf.brokers or {})
local broker_config = {}

for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
if conf.broker_list then
for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
end
end

broker_config["request_timeout"] = conf.timeout * 1000
Expand Down
28 changes: 19 additions & 9 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ It might take some time to receive the log data. It will be automatically sent a

| Name | Type | Required | Default | Valid values | Description |
| ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| broker_list | object | True | | | List of Kafka brokers (nodes). |
| broker_list | object | True | | | Deprecated, use `brokers` instead. List of Kafka brokers. (nodes). |
| brokers | array | True | | | List of Kafka brokers (nodes). |
| brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the Chinese version?
CC @SylviaBABY

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the Chinese version? CC @SylviaBABY

I think we need to update the CN version. But you can merge this first. I'll update the CN version depending on this part later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the Chinese version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

| brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down Expand Up @@ -164,10 +167,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
{
"plugins": {
"kafka-logger": {
"broker_list" :
"brokers" : [
{
"127.0.0.1":9092
},
"host" :"127.0.0.1",
"port" : 9092
}
],
"kafka_topic" : "test2",
"key" : "key1",
"batch_max_size": 1,
Expand All @@ -187,11 +192,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:

```json
"broker_list" :
{
"127.0.0.1":9092,
"127.0.0.1":9093
},
"brokers" : [
{
"host" :"127.0.0.1",
"port" : 9092
},
{
"host" :"127.0.0.1",
"port" : 9093
}
],
```

## Example usage
Expand Down
29 changes: 20 additions & 9 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作

| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
| ---------------------- | ------- | ------ | -------------- | --------------------- | ------------------------------------------------ |
| broker_list | object | 是 | | | 需要推送的 Kafka 的 broker 列表。 |
| broker_list | object | 是 | | | 已废弃,现使用 `brokers` 属性代替。原指需要推送的 Kafka 的 broker 列表。 |
| brokers | array | 是 | | | 需要推送的 Kafka 的 broker 列表。 |
| brokers.host | string | 是 | | | Kafka broker 的节点 host 配置,例如 `192.168.1.1` |
| brokers.port | string | 是 | | | Kafka broker 的节点端口配置 |
| kafka_topic | string | 是 | | | 需要推送的 topic。 |
| producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。 |
| required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 |
Expand Down Expand Up @@ -162,10 +165,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"brokers" : [
{
"host": "127.0.0.1",
"port": 9092
}
],
"kafka_topic" : "test2",
"key" : "key1"
}
Expand All @@ -183,10 +188,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
该插件还支持一次推送到多个 Broker,示例如下:

```json
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
"brokers" : [
{
"host" :"127.0.0.1",
"port" : 9092
},
{
"host" :"127.0.0.1",
"port" : 9093
}
],
```

## 测试插件
Expand Down
2 changes: 1 addition & 1 deletion t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ done
}
}
--- response_body
property "broker_list" is required
value should match only one schema, but matches none
done


Expand Down
134 changes: 134 additions & 0 deletions t/plugin/kafka-logger2.t
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,72 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
key= "key1",
},
},
{
input = {
brokers = {
},
kafka_topic = "test",
tokers marked this conversation as resolved.
Show resolved Hide resolved
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
}
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
port = 9092,
}
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = "9093",
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 0,
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 65536,
},
},
kafka_topic = "test",
key = "key1",
},
},
}

local plugin = require("apisix.plugins.kafka-logger")
Expand All @@ -361,6 +427,12 @@ property "broker_list" validation failed: expect object to have at least 1 prope
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be at least 1
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be at most 65535
property "brokers" validation failed: expect array to have at least 1 items
property "brokers" validation failed: failed to validate item 1: property "port" is required
property "brokers" validation failed: failed to validate item 1: property "host" is required
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535



Expand Down Expand Up @@ -715,3 +787,65 @@ hello world
[qr/send data to kafka: \{.*"body":"abcdef"/,
qr/send data to kafka: \{.*"body":"hello world\\n"/]
--- wait: 2



=== TEST 20: update route(id: 1,include_req_body = true,include_req_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[=[{
"plugins": {
"kafka-logger": {
"brokers" :
[{
"host":"127.0.0.1",
"port": 9092
}],
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_req_body": true,
"include_req_body_expr": [
[
"arg_name",
"==",
"qwerty"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]=]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- response_body
passed



=== TEST 21: hit route, expr eval success
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2