Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating kafka logger to use the batch processor util #1358

Merged
merged 4 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 65 additions & 24 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs = pairs
local type = type
local table = table

local plugin_name = "kafka-logger"
local ngx = ngx

local timer_at = ngx.timer.at
local buffers = {}

local schema = {
type = "object",
properties = {
broker_list = {
type = "object"
},
timeout = { -- timeout in milliseconds
type = "integer", minimum = 1, default= 2000
},
kafka_topic = {type = "string"},
async = {type = "boolean", default = false},
key = {type = "string"},
max_retry = {type = "integer", minimum = 0 , default = 3},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
},
required = {"broker_list", "kafka_topic", "key"}
}
Expand All @@ -50,15 +51,13 @@ local _M = {
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end

local function log(premature, conf, log_message)
if premature then
return
end

local function send_kafka_data(conf, log_message)
if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
end
Expand All @@ -68,7 +67,7 @@ local function log(premature, conf, log_message)

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then
and type(port) == 'number' then

local broker = {
host = host, port = port
Expand All @@ -77,28 +76,70 @@ local function log(premature, conf, log_message)
end
end

broker_config["request_timeout"] = conf.timeout
broker_config["max_retry"] = conf.max_retry

--Async producers will queue logs and push them when the buffer exceeds.
if conf.async then
broker_config["producer_type"] = "async"
end
broker_config["request_timeout"] = conf.timeout * 1000

local prod, err = producer:new(broker_list,broker_config)
if err then
core.log.error("failed to identify the broker specified", err)
return
return nil, "failed to identify the broker specified: " .. err
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
if not ok then
core.log.error("failed to send data to Kafka topic", err)
return nil, "failed to send data to Kafka topic" .. err
end
end


function _M.log(conf)
return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for kafka logger")
return
end

local log_buffer = buffers[entry.route_id]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_kafka_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[entry.route_id] = log_buffer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question:

If there are no more new logs to be sent later, how to release the log_buffer object?
If the log_buffer is useless, how to delete it from buffers? If we don't delete it, it will make the buffers bigger.

we need to control the max count number of timer. the default value is 1024 for each worker process.

https://github.com/openresty/lua-nginx-module#lua_max_pending_timers

Copy link
Member Author

@sshniro sshniro Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, @membphis I was also thinking about this problem,
For the memory problem:
we need to write a timer function which executes in all active loggers and finds stale buffer objects and remove the variable. Maybe it can run every 30 mins?

For the timer problem:
The buffer by default has an inactive timeout. If we don't send any logs for 30 seconds, by default it will send the existing logs and will not create any additional timers. Due to this fact, we won't be creating timers for inactive buffers. Only active buffers will have a timer running.

If we agree on the memory optimization technique, then I can send the fix in another PR.

A practical bottleneck is if a user configures like 1024 routes with a logger and actively using all the routes.
But I assume its not a practical scenario. To tackle this scenario we might have to have a global logger instead of a route based logger.

Copy link
Member Author

@sshniro sshniro Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue to remove stale objects from memory:
#1494

The following issue is to create a global logger, to resolve the issue from having more than 1024 loggers running concurrently:
#1492

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@membphis any update on this?

log_buffer:push(entry)
end

return _M
32 changes: 19 additions & 13 deletions doc/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ This will provide the ability to send Log data requests as JSON objects to exter

## Attributes

|Name |Requirement |Description|
|--------- |--------|-----------|
| broker_list |required| An array of Kafka brokers.|
| kafka_topic |required| Target topic to push data.|
| timeout |optional|Timeout for the upstream to send data.|
| async |optional|Boolean value to control whether to perform async push.|
| key |required|Key for the message.|
| max_retry |optional|No of retries|
|Name |Requirement |Description|
|--------- |-------- |-----------|
| broker_list |required | An array of Kafka brokers.|
| kafka_topic |required | Target topic to push data.|
| timeout |optional |Timeout for the upstream to send data.|
| async |optional |Boolean value to control whether to perform async push.|
| key |required |Key for the message.|
|name |optional |A unique identifier to identity the batch processor|
|batch_max_size |optional |Max size of each batch, default is 1000|
|inactive_timeout|optional |maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
|buffer_duration|optional |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5|
|max_retry_count|optional |Maximum number of retries before removing from the processing pipe line; default is zero|
|retry_delay |optional |Number of seconds the process execution should be delayed if the execution fails; default is 1|

## Info

Expand Down Expand Up @@ -75,7 +80,7 @@ sample to take effect of this functionality.

## How To Enable

1. Here is an examle on how to enable kafka-logger plugin for a specific route.
The following is an example on how to enable the kafka-logger for a specific route.

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
Expand All @@ -88,7 +93,9 @@ curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f1
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1,
"name": "kafka logger"
}
},
"upstream": {
Expand All @@ -114,9 +121,8 @@ hello, world

## Disable Plugin

When you want to disable the `kafka-logger` plugin, it is very simple,
you can delete the corresponding json configuration in the plugin configuration,
no need to restart the service, it will take effect immediately:
Remove the corresponding json configuration in the plugin configuration to disable the `kafka-logger`.
APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
Expand Down
8 changes: 5 additions & 3 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1
}
},
"upstream": {
Expand All @@ -217,7 +218,8 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1
}
},
"upstream": {
Expand Down Expand Up @@ -248,4 +250,4 @@ GET /t
--- error_log
failed to send data to Kafka topic
[error]
--- wait: 0.2
--- wait: 1