From 1381ad7d1edc10bdb6e0a990428d192539b6fcbf Mon Sep 17 00:00:00 2001 From: sshniro Date: Fri, 27 Mar 2020 16:55:30 +0100 Subject: [PATCH 1/3] Updating kafka logger to use the batch processor util --- doc/plugins/kafka-logger.md | 32 +++++++----- lua/apisix/plugins/kafka-logger.lua | 78 ++++++++++++++++++++++------- t/plugin/kafka-logger.t | 8 +-- 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/doc/plugins/kafka-logger.md b/doc/plugins/kafka-logger.md index 7bdc53306bea..83393088f6d1 100644 --- a/doc/plugins/kafka-logger.md +++ b/doc/plugins/kafka-logger.md @@ -34,14 +34,19 @@ This will provide the ability to send Log data requests as JSON objects to exter ## Attributes -|Name |Requirement |Description| -|--------- |--------|-----------| -| broker_list |required| An array of Kafka brokers.| -| kafka_topic |required| Target topic to push data.| -| timeout |optional|Timeout for the upstream to send data.| -| async |optional|Boolean value to control whether to perform async push.| -| key |required|Key for the message.| -| max_retry |optional|No of retries| +|Name |Requirement |Description| +|--------- |-------- |-----------| +| broker_list |required | An array of Kafka brokers.| +| kafka_topic |required | Target topic to push data.| +| timeout |optional |Timeout for the upstream to send data.| +| async |optional |Boolean value to control whether to perform async push.| +| key |required |Key for the message.| +|name |optional |A unique identifier to identity the batch processor| +|batch_max_size |optional |Max size of each batch, default is 1000| +|inactive_timeout|optional |maximum age in seconds when the buffer will be flushed if inactive, default is 5s| +|buffer_duration|optional |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5| +|max_retry_count|optional |Maximum number of retries before removing from the processing pipe line; default is zero| +|retry_delay |optional |Number of seconds the process execution should be delayed if the execution fails; default is 1| ## Info @@ -75,7 +80,7 @@ sample to take effect of this functionality. ## How To Enable -1. Here is an examle on how to enable kafka-logger plugin for a specific route. +The following is an example on how to enable the kafka-logger for a specific route. ```shell curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' @@ -88,7 +93,9 @@ curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f1 "127.0.0.1":9092 }, "kafka_topic" : "test2", - "key" : "key1" + "key" : "key1", + "batch_max_size": 1, + "name": "kafka logger" } }, "upstream": { @@ -114,9 +121,8 @@ hello, world ## Disable Plugin -When you want to disable the `kafka-logger` plugin, it is very simple, - you can delete the corresponding json configuration in the plugin configuration, - no need to restart the service, it will take effect immediately: +Remove the corresponding json configuration in the plugin configuration to disable the `kafka-logger`. +APISIX plugins are hot-reloaded, therefore no need to restart APISIX. ```shell $ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value=' diff --git a/lua/apisix/plugins/kafka-logger.lua b/lua/apisix/plugins/kafka-logger.lua index 8f89af263592..db72dd75fe3d 100644 --- a/lua/apisix/plugins/kafka-logger.lua +++ b/lua/apisix/plugins/kafka-logger.lua @@ -17,14 +17,13 @@ local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") +local batch_processor = require("apisix.utils.batch-processor") local pairs = pairs local type = type local table = table - local plugin_name = "kafka-logger" local ngx = ngx - -local timer_at = ngx.timer.at +local buffers = {} local schema = { type = "object", @@ -32,13 +31,16 @@ local schema = { broker_list = { type = "object" }, - timeout = { -- timeout in milliseconds - type = "integer", minimum = 1, default= 2000 - }, kafka_topic = {type = "string"}, async = {type = "boolean", default = false}, key = {type = "string"}, - max_retry = {type = "integer", minimum = 0 , default = 3}, + timeout = {type = "integer", minimum = 1, default = 3}, + name = {type = "string", default = "kafka logger"}, + max_retry_count = {type = "integer", minimum = 0, default = 0}, + retry_delay = {type = "integer", minimum = 0, default = 1}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, + inactive_timeout = {type = "integer", minimum = 1, default = 5}, + batch_max_size = {type = "integer", minimum = 1, default = 1000}, }, required = {"broker_list", "kafka_topic", "key"} } @@ -50,15 +52,13 @@ local _M = { schema = schema, } + function _M.check_schema(conf) return core.schema.check(schema, conf) end -local function log(premature, conf, log_message) - if premature then - return - end +local function send_kafka_data(conf, log_message) if core.table.nkeys(conf.broker_list) == 0 then core.log.error("failed to identify the broker specified") end @@ -68,7 +68,7 @@ local function log(premature, conf, log_message) for host, port in pairs(conf.broker_list) do if type(host) == 'string' - and type(port) == 'number' then + and type(port) == 'number' then local broker = { host = host, port = port @@ -77,8 +77,8 @@ local function log(premature, conf, log_message) end end - broker_config["request_timeout"] = conf.timeout - broker_config["max_retry"] = conf.max_retry + broker_config["request_timeout"] = conf.timeout * 1000 + broker_config["max_retry"] = conf.max_retry_count --Async producers will queue logs and push them when the buffer exceeds. if conf.async then @@ -87,18 +87,60 @@ local function log(premature, conf, log_message) local prod, err = producer:new(broker_list,broker_config) if err then - core.log.error("failed to identify the broker specified", err) - return + return nil, "failed to identify the broker specified: " .. err end local ok, err = prod:send(conf.kafka_topic, conf.key, log_message) if not ok then - core.log.error("failed to send data to Kafka topic", err) + return nil, "failed to send data to Kafka topic" .. err end end + function _M.log(conf) - return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx))) + local entry = log_util.get_full_log(ngx) + + if not entry.route_id then + core.log.error("failed to obtain the route id for udp logger") + return + end + + local log_buffer = buffers[entry.route_id] + + -- If a logger is not present for the route, create one + if not log_buffer then + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data + if batch_max_size == 1 then + data = core.json.encode(entries[1]) -- encode as single {} + else + data = core.json.encode(entries) -- encode as array [{}] + end + return send_kafka_data(conf, data) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.err("error when creating the batch processor: " .. err) + return + end + + buffers[entry.route_id] = log_buffer + end + + log_buffer:push(entry) end return _M diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index e9344d693252..4dfbe40fba93 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -195,7 +195,8 @@ hello world "127.0.0.1":9093 }, "kafka_topic" : "test2", - "key" : "key1" + "key" : "key1", + "batch_max_size": 1 } }, "upstream": { @@ -217,7 +218,8 @@ hello world "127.0.0.1":9093 }, "kafka_topic" : "test2", - "key" : "key1" + "key" : "key1", + "batch_max_size": 1 } }, "upstream": { @@ -248,4 +250,4 @@ GET /t --- error_log failed to send data to Kafka topic [error] ---- wait: 0.2 +--- wait: 1 From 301276730d8390f8f68199b5e6d9b97592665e2b Mon Sep 17 00:00:00 2001 From: sshniro Date: Tue, 31 Mar 2020 14:51:56 +0200 Subject: [PATCH 2/3] Using the internal buffer to buffer request instead of the kafka lib --- lua/apisix/plugins/kafka-logger.lua | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lua/apisix/plugins/kafka-logger.lua b/lua/apisix/plugins/kafka-logger.lua index db72dd75fe3d..0844a175c82b 100644 --- a/lua/apisix/plugins/kafka-logger.lua +++ b/lua/apisix/plugins/kafka-logger.lua @@ -32,7 +32,6 @@ local schema = { type = "object" }, kafka_topic = {type = "string"}, - async = {type = "boolean", default = false}, key = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, name = {type = "string", default = "kafka logger"}, @@ -80,11 +79,6 @@ local function send_kafka_data(conf, log_message) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["max_retry"] = conf.max_retry_count - --Async producers will queue logs and push them when the buffer exceeds. - if conf.async then - broker_config["producer_type"] = "async" - end - local prod, err = producer:new(broker_list,broker_config) if err then return nil, "failed to identify the broker specified: " .. err From 62d89bc36b96fc94d5daf83ccdf312a1a21a5838 Mon Sep 17 00:00:00 2001 From: sshniro Date: Thu, 2 Apr 2020 22:35:49 +0200 Subject: [PATCH 3/3] Updating the kafka logger --- lua/apisix/plugins/kafka-logger.lua | 63 ++++++++++++++++------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/lua/apisix/plugins/kafka-logger.lua b/lua/apisix/plugins/kafka-logger.lua index 0844a175c82b..a9050b9d6080 100644 --- a/lua/apisix/plugins/kafka-logger.lua +++ b/lua/apisix/plugins/kafka-logger.lua @@ -77,7 +77,6 @@ local function send_kafka_data(conf, log_message) end broker_config["request_timeout"] = conf.timeout * 1000 - broker_config["max_retry"] = conf.max_retry_count local prod, err = producer:new(broker_list,broker_config) if err then @@ -95,45 +94,51 @@ function _M.log(conf) local entry = log_util.get_full_log(ngx) if not entry.route_id then - core.log.error("failed to obtain the route id for udp logger") + core.log.error("failed to obtain the route id for kafka logger") return end local log_buffer = buffers[entry.route_id] - -- If a logger is not present for the route, create one - if not log_buffer then - -- Generate a function to be executed by the batch processor - local func = function(entries, batch_max_size) - local data - if batch_max_size == 1 then - data = core.json.encode(entries[1]) -- encode as single {} - else - data = core.json.encode(entries) -- encode as array [{}] - end - return send_kafka_data(conf, data) + if log_buffer then + log_buffer:push(entry) + return + end + + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data, err = core.json.encode(entries[1]) -- encode as single {} + else + data, err = core.json.encode(entries) -- encode as array [{}] end - local config = { - name = conf.name, - retry_delay = conf.retry_delay, - batch_max_size = conf.batch_max_size, - max_retry_count = conf.max_retry_count, - buffer_duration = conf.buffer_duration, - inactive_timeout = conf.inactive_timeout, - } - - local err - log_buffer, err = batch_processor:new(func, config) - - if not log_buffer then - core.log.err("error when creating the batch processor: " .. err) - return + if not data then + return false, 'error occurred while encoding the data: ' .. err end - buffers[entry.route_id] = log_buffer + return send_kafka_data(conf, data) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return end + buffers[entry.route_id] = log_buffer log_buffer:push(entry) end