Skip to content

Commit

Permalink
feat: support set broker array in kafka-logger plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
starsz committed Sep 27, 2022
1 parent a624339 commit f3a2f76
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
10 changes: 10 additions & 0 deletions apisix/core/table.lua
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,14 @@ function _M.pick(obj, attrs)
end


function _M.is_array(t)
local count = 0
for k, v in pairs(t) do
count = count + 1
end

return #t == count
end


return _M
55 changes: 40 additions & 15 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin = require("apisix.plugin")
local table = require("apisix.core.table")

local math = math
local pairs = pairs
Expand All @@ -40,15 +41,33 @@ local schema = {
enum = {"default", "origin"},
},
broker_list = {
type = "object",
minProperties = 1,
patternProperties = {
[".*"] = {
description = "the port of kafka broker",
type = "integer",
minimum = 1,
maximum = 65535,
},
{
oneOf = {
{
type = "object",
minProperties = 1,
patternProperties = {
[".*"] = {
description = "the port of kafka broker",
type = "integer",
minimum = 1,
maximum = 65535,
},
},
},
{
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {type = "string"},
port = {type = "number"},
},
},
uniqueItems = true,
},
}
},
},
kafka_topic = {type = "string"},
Expand Down Expand Up @@ -202,12 +221,18 @@ function _M.log(conf, ctx)
local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
local broker_config = {}

for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
if table.is_array(conf.broker_list) then
for _, broker in pairs(conf.broker_list) do
core.table.insert(broker_list, broker)
end
else
for host, port in pairs(conf.broker_list) do
local broker = {
host = host,
port = port,
}
core.table.insert(broker_list, broker)
end
end

broker_config["request_timeout"] = conf.timeout * 1000
Expand Down

0 comments on commit f3a2f76

Please sign in to comment.