Skip to content

Commit

Permalink
feat: support send error-log to kafka brokers (#8693)
Browse files Browse the repository at this point in the history
Fixes #8678
  • Loading branch information
ronething authored Jan 31, 2023
1 parent 5a3a808 commit a5dc4c3
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 4 deletions.
115 changes: 114 additions & 1 deletion apisix/plugins/error-log-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local timers = require("apisix.timers")
local http = require("resty.http")
local producer = require("resty.kafka.producer")
local plugin_name = "error-log-logger"
local table = core.table
local schema_def = core.schema
Expand All @@ -32,6 +33,9 @@ local string = require("string")
local lrucache = core.lrucache.new({
ttl = 300, count = 32
})
local kafka_prod_lrucache = core.lrucache.new({
ttl = 300, count = 32
})


local metadata_schema = {
Expand Down Expand Up @@ -66,6 +70,62 @@ local metadata_schema = {
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
},
kafka = {
type = "object",
properties = {
brokers = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
},
required = {"brokers", "kafka_topic"},
},
name = {type = "string", default = plugin_name},
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
Expand All @@ -81,6 +141,7 @@ local metadata_schema = {
{required = {"skywalking"}},
{required = {"tcp"}},
{required = {"clickhouse"}},
{required = {"kafka"}},
-- for compatible with old schema
{required = {"host", "port"}}
},
Expand Down Expand Up @@ -285,11 +346,63 @@ local function update_filter(value)
end


local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config, cluster_name)
end


local function send_to_kafka(log_message)
-- avoid race of the global config
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
return false, "please set the correct plugin_metadata for " .. plugin_name
end
local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
if not config then
return false, "get config failed: " .. err
end

core.log.info("sending a batch logs to kafka brokers: ",
core.json.delay_encode(config.kafka.brokers))

local broker_config = {}
broker_config["request_timeout"] = config.timeout * 1000
broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks

-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex,
create_producer, config.kafka.brokers, broker_config,
config.kafka.cluster_name)
if not prod then
return false, "get kafka producer failed: " .. err
end
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)

local ok
for i = 1, #log_message, 2 do
ok, err = prod:send(config.kafka.kafka_topic,
config.kafka.key, core.json.encode(log_message[i]))
if not ok then
return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.encode(config.kafka.brokers)
end
core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i]))
end

return true
end


local function send(data)
if config.skywalking then
return send_to_skywalking(data)
elseif config.clickhouse then
return send_to_clickhouse(data)
elseif config.kafka then
return send_to_kafka(data)
end
return send_to_tcp_server(data)
end
Expand All @@ -307,7 +420,7 @@ local function process()
core.log.warn("set log filter failed for ", err)
return
end
if not (config.tcp or config.skywalking or config.clickhouse) then
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then
config.tcp = {
host = config.host,
port = config.port,
Expand Down
36 changes: 35 additions & 1 deletion docs/en/latest/plugins/error-log-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo

## Description

The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server.
The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server.

It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires.

Expand All @@ -48,6 +48,18 @@ It might take some time to receive the log data. It will be automatically sent a
| clickhouse.password | String | False | | | ClickHouse password. |
| clickhouse.database | String | False | | | Name of the database to store the logs. |
| clickhouse.logtable | String | False | | | Table name to store the logs. |
| kafka.brokers | array | True | | | List of Kafka brokers (nodes). |
| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker |
| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config |
| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. |
| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. |
| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
| kafka.key | string | False | | | Key used for allocating partitions for messages. |
| kafka.cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. |
| timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. |
| keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. |
| level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. |
Expand Down Expand Up @@ -118,6 +130,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A
}'
```

### Configuring Kafka server

The Plugin sends the error log to Kafka, you can configure it as shown below:

```shell
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"kafka":{
"brokers":[
{
"host":"127.0.0.1",
"port":9092
}
],
"kafka_topic":"test2"
},
"level":"ERROR",
"inactive_timeout":1
}'
```

## Disable Plugin

To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`):
Expand Down
Loading

0 comments on commit a5dc4c3

Please sign in to comment.