Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add brokers field to support set broker with the same host in kafka-logger plugin #7999

Merged
merged 9 commits into from
Oct 8, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ local plugin = require("apisix.plugin")

local math = math
local pairs = pairs
local ipairs = ipairs
local type = type
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")
Expand Down Expand Up @@ -51,6 +52,26 @@ local schema = {
},
},
},
brokers = {
tokers marked this conversation as resolved.
Show resolved Hide resolved
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
Expand Down Expand Up @@ -89,7 +110,10 @@ local schema = {
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1}
},
required = {"broker_list", "kafka_topic"}
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
{ required = {"brokers", "kafka_topic"},},
}
}

local metadata_schema = {
Expand Down Expand Up @@ -199,15 +223,24 @@ function _M.log(conf, ctx)
end

-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
local length = conf.broker_list and core.table.nkeys(conf.broker_list) or #conf.brokers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI: if we don't need to rewrite the content of table, we can use core.table.clone instead of alloc + insert in loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the remind. Improved.

local broker_list = core.table.new(length, 0)
local broker_config = {}

for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
if conf.broker_list then
for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
end
end

if conf.broker then
for _, broker in ipairs(conf.brokers) do
core.table.insert(broker_list, broker)
end
end

broker_config["request_timeout"] = conf.timeout * 1000
Expand Down Expand Up @@ -251,3 +284,4 @@ end


return _M

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the empty line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

28 changes: 19 additions & 9 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ It might take some time to receive the log data. It will be automatically sent a

| Name | Type | Required | Default | Valid values | Description |
| ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| broker_list | object | True | | | List of Kafka brokers (nodes). |
| broker_list | deprecated | True | | | Use `brokers` instead. List of Kafka brokers. (nodes). |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to put the "deprecated" in the Description instead of the Type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

| brokers | array | True | | | List of Kafka brokers (nodes). |
| brokers.host | string | True | | | The host of Kafka broker |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| brokers.host | string | True | | | The host of Kafka broker |
| brokers.host | string | True | | | The host of Kafka broker, e.g., `192.168.1.1`. |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

| brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
Expand Down Expand Up @@ -164,10 +167,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
{
"plugins": {
"kafka-logger": {
"broker_list" :
"brokers" : [
{
"127.0.0.1":9092
},
"host" :"127.0.0.1",
"port" : 9092
}
],
"kafka_topic" : "test2",
"key" : "key1",
"batch_max_size": 1,
Expand All @@ -187,11 +192,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:

```json
"broker_list" :
{
"127.0.0.1":9092,
"127.0.0.1":9093
},
"brokers" : [
{
"host" :"127.0.0.1",
"port" : 9092
},
{
"host" :"127.0.0.1",
"port" : 9093
}
],
```

## Example usage
Expand Down
2 changes: 1 addition & 1 deletion t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ done
}
}
--- response_body
property "broker_list" is required
value should match only one schema, but matches none
done


Expand Down
110 changes: 110 additions & 0 deletions t/plugin/kafka-logger2.t
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,50 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
key= "key1",
},
},
{
input = {
brokers = {
},
kafka_topic = "test",
tokers marked this conversation as resolved.
Show resolved Hide resolved
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = "9093",
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 0,
},
},
kafka_topic = "test",
key = "key1",
},
},
{
input = {
brokers = {
{
host = "127.0.0.1",
port = 65536,
},
},
kafka_topic = "test",
key = "key1",
},
},
}

local plugin = require("apisix.plugins.kafka-logger")
Expand All @@ -361,6 +405,10 @@ property "broker_list" validation failed: expect object to have at least 1 prope
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be at least 1
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be at most 65535
property "brokers" validation failed: expect array to have at least 1 items
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535



Expand Down Expand Up @@ -715,3 +763,65 @@ hello world
[qr/send data to kafka: \{.*"body":"abcdef"/,
qr/send data to kafka: \{.*"body":"hello world\\n"/]
--- wait: 2



=== TEST 20: update route(id: 1,include_req_body = true,include_req_body_expr = array)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[=[{
"plugins": {
"kafka-logger": {
"brokers" :
[{
"host":"127.0.0.1",
"port": 9092
}],
"kafka_topic" : "test2",
"key" : "key1",
"timeout" : 1,
"include_req_body": true,
"include_req_body_expr": [
[
"arg_name",
"==",
"qwerty"
]
],
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]=]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}

--- response_body
passed



=== TEST 21: hit route, expr eval success
--- request
POST /hello?name=qwerty
abcdef
--- response_body
hello world
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2