From f3a2f76d2109a20b18b6ce9b652a747be9f5b933 Mon Sep 17 00:00:00 2001 From: starsz Date: Tue, 27 Sep 2022 18:17:29 +0800 Subject: [PATCH] feat: support set broker array in kafka-logger plugin --- apisix/core/table.lua | 10 ++++++ apisix/plugins/kafka-logger.lua | 55 ++++++++++++++++++++++++--------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/apisix/core/table.lua b/apisix/core/table.lua index b307cc25d544b..1a02badf3c086 100644 --- a/apisix/core/table.lua +++ b/apisix/core/table.lua @@ -272,4 +272,14 @@ function _M.pick(obj, attrs) end +function _M.is_array(t) + local count = 0 + for k, v in pairs(t) do + count = count + 1 + end + + return #t == count +end + + return _M diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index cb43ae3db24be..0dcf4c15cd25d 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -19,6 +19,7 @@ local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") local bp_manager_mod = require("apisix.utils.batch-processor-manager") local plugin = require("apisix.plugin") +local table = require("apisix.core.table") local math = math local pairs = pairs @@ -40,15 +41,33 @@ local schema = { enum = {"default", "origin"}, }, broker_list = { - type = "object", - minProperties = 1, - patternProperties = { - [".*"] = { - description = "the port of kafka broker", - type = "integer", - minimum = 1, - maximum = 65535, - }, + { + oneOf = { + { + type = "object", + minProperties = 1, + patternProperties = { + [".*"] = { + description = "the port of kafka broker", + type = "integer", + minimum = 1, + maximum = 65535, + }, + }, + }, + { + type = "array", + minItems = 1, + items = { + type = "object", + properties = { + host = {type = "string"}, + port = {type = "number"}, + }, + }, + uniqueItems = true, + }, + } }, }, kafka_topic = {type = "string"}, @@ -202,12 +221,18 @@ function _M.log(conf, ctx) local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0) local broker_config = {} - for host, port in pairs(conf.broker_list) do - local broker = { - host = host, - port = port - } - core.table.insert(broker_list, broker) + if table.is_array(conf.broker_list) then + for _, broker in pairs(conf.broker_list) do + core.table.insert(broker_list, broker) + end + else + for host, port in pairs(conf.broker_list) do + local broker = { + host = host, + port = port, + } + core.table.insert(broker_list, broker) + end end broker_config["request_timeout"] = conf.timeout * 1000