diff --git a/.travis/linux_openresty_runner.sh b/.travis/linux_openresty_runner.sh index cff10b86257a..30ef044eeeaf 100755 --- a/.travis/linux_openresty_runner.sh +++ b/.travis/linux_openresty_runner.sh @@ -34,6 +34,14 @@ before_install() { sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1) docker pull redis:3.0-alpine docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine + # spin up kafka cluster for tests (1 zookeper and 1 kafka instance) + docker pull bitnami/zookeeper:3.6.0 + docker pull bitnami/kafka:latest + docker network create kafka-net --driver bridge + docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0 + docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest + sleep 5 + docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2 } do_install() { diff --git a/.travis/linux_tengine_runner.sh b/.travis/linux_tengine_runner.sh index 1a356f9208f3..d14cec25d5dd 100755 --- a/.travis/linux_tengine_runner.sh +++ b/.travis/linux_tengine_runner.sh @@ -34,6 +34,14 @@ before_install() { sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1) docker pull redis:3.0-alpine docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine + # spin up kafka cluster for tests (1 zookeper and 1 kafka instance) + docker pull bitnami/zookeeper:3.6.0 + docker pull bitnami/kafka:latest + docker network create kafka-net --driver bridge + docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0 + docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest + sleep 5 + docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2 } tengine_install() { diff --git a/conf/config.yaml b/conf/config.yaml index 65f79da91151..f02134bb812f 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -143,6 +143,7 @@ plugins: # plugin list - proxy-cache - tcp-logger - proxy-mirror + - kafka-logger stream_plugins: - mqtt-proxy diff --git a/doc/README.md b/doc/README.md index 47c79bca9644..ad83c5b06182 100644 --- a/doc/README.md +++ b/doc/README.md @@ -59,9 +59,10 @@ Plugins * [response-rewrite](plugins/response-rewrite.md): Set customized response status code, body and header to the client. * [fault-injection](plugins/fault-injection.md): The specified response body, response code, and response time can be returned, which provides processing capabilities in different failure scenarios, such as service failure, service overload, and high service delay. * [proxy-cache](plugins/proxy-cache.md): Provides the ability to cache upstream response data. -* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers -* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers +* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers. +* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers. * [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests. +* [kafka-logger](plugins/kafka-logger.md): Log requests to External Kafka servers. Deploy to the Cloud ======= diff --git a/doc/README_CN.md b/doc/README_CN.md index b2081d25d2ec..f063486f5450 100644 --- a/doc/README_CN.md +++ b/doc/README_CN.md @@ -63,4 +63,4 @@ Reference document * [proxy-mirror](plugins/proxy-mirror-cn.md):代理镜像插件提供镜像客户端请求的能力。 * [udp-logger](plugins/udp-logger.md): 将请求记录到UDP服务器 * [tcp-logger](plugins/tcp-logger.md): 将请求记录到TCP服务器 - +* [kafka-logger](plugins/kafka-logger-cn.md): 将请求记录到外部Kafka服务器。 diff --git a/doc/plugins/kafka-logger-cn.md b/doc/plugins/kafka-logger-cn.md new file mode 100644 index 000000000000..5afce2dd4479 --- /dev/null +++ b/doc/plugins/kafka-logger-cn.md @@ -0,0 +1,130 @@ + + +# Summary +- [**定义**](#name) +- [**属性列表**](#attributes) +- [**信息**](#info) +- [**如何开启**](#how-to-enable) +- [**测试插件**](#test-plugin) +- [**禁用插件**](#disable-plugin) + +## 定义 + +`kafka-logger` 是一个插件,可用作ngx_lua nginx模块的Kafka客户端驱动程序。 + +这将提供将Log数据请求作为JSON对象发送到外部Kafka集群的功能。 + +## 属性列表 + +|属性名称 |必选项 |描述| +|--------- |--------|-----------| +| broker_list |必要的| 一系列的Kafka经纪人。| +| kafka_topic |必要的| 定位主题以推送数据。| +| timeout |可选的|上游发送数据超时。| +| async |可选的|布尔值,用于控制是否执行异步推送。| +| key |必要的|消息的密钥。| +| max_retry |可选的|没有重试次数。| + +## 信息 + +异步与同步数据推送之间的区别。 + +1. 同步模型 + + 如果成功,则返回当前代理和分区的偏移量(** cdata:LL **)。 + 如果发生错误,则返回“ nil”,并带有描述错误的字符串。 + +2. 在异步模型中 + + 消息将首先写入缓冲区。 + 当缓冲区超过`batch_num`时,它将发送到kafka服务器, + 或每个`flush_time`刷新缓冲区。 + + 如果成功,则返回“ true”。 + 如果出现错误,则返回“ nil”,并带有描述错误的字符串(“缓冲区溢出”)。 + +##### 样本经纪人名单 + +此插件支持一次推送到多个经纪人。如以下示例所示,指定外部kafka服务器的代理,以使此功能生效。 + +```json +{ + "127.0.0.1":9092, + "127.0.0.1":9093 +} +``` + +## 如何开启 + +1. 这是有关如何为特定路由启用kafka-logger插件的示例。 + +```shell +curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "username": "foo", + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## 测试插件 + +* 成功: + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## 禁用插件 + +当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效: + +```shell +$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value=' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/doc/plugins/kafka-logger.md b/doc/plugins/kafka-logger.md new file mode 100644 index 000000000000..7bdc53306bea --- /dev/null +++ b/doc/plugins/kafka-logger.md @@ -0,0 +1,134 @@ + + +# Summary +- [**Name**](#name) +- [**Attributes**](#attributes) +- [**Info**](#info) +- [**How To Enable**](#how-to-enable) +- [**Test Plugin**](#test-plugin) +- [**Disable Plugin**](#disable-plugin) + + +## Name + +`kafka-logger` is a plugin which works as a Kafka client driver for the ngx_lua nginx module. + +This will provide the ability to send Log data requests as JSON objects to external Kafka clusters. + +## Attributes + +|Name |Requirement |Description| +|--------- |--------|-----------| +| broker_list |required| An array of Kafka brokers.| +| kafka_topic |required| Target topic to push data.| +| timeout |optional|Timeout for the upstream to send data.| +| async |optional|Boolean value to control whether to perform async push.| +| key |required|Key for the message.| +| max_retry |optional|No of retries| + +## Info + +Difference between async and the sync data push. + +1. In sync model + + In case of success, returns the offset (** cdata: LL **) of the current broker and partition. + In case of errors, returns `nil` with a string describing the error. + +2. In async model + + The `message` will write to the buffer first. + It will send to the kafka server when the buffer exceed the `batch_num`, + or every `flush_time` flush the buffer. + + In case of success, returns `true`. + In case of errors, returns `nil` with a string describing the error (`buffer overflow`). + +##### Sample broker list + +This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below +sample to take effect of this functionality. + +```json +{ + "127.0.0.1":9092, + "127.0.0.1":9093 +} +``` + +## How To Enable + +1. Here is an examle on how to enable kafka-logger plugin for a specific route. + +```shell +curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "username": "foo", + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## Test Plugin + +* success: + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## Disable Plugin + +When you want to disable the `kafka-logger` plugin, it is very simple, + you can delete the corresponding json configuration in the plugin configuration, + no need to restart the service, it will take effect immediately: + +```shell +$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value=' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/lua/apisix/plugins/kafka-logger.lua b/lua/apisix/plugins/kafka-logger.lua new file mode 100644 index 000000000000..8f89af263592 --- /dev/null +++ b/lua/apisix/plugins/kafka-logger.lua @@ -0,0 +1,104 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local producer = require ("resty.kafka.producer") +local pairs = pairs +local type = type +local table = table + +local plugin_name = "kafka-logger" +local ngx = ngx + +local timer_at = ngx.timer.at + +local schema = { + type = "object", + properties = { + broker_list = { + type = "object" + }, + timeout = { -- timeout in milliseconds + type = "integer", minimum = 1, default= 2000 + }, + kafka_topic = {type = "string"}, + async = {type = "boolean", default = false}, + key = {type = "string"}, + max_retry = {type = "integer", minimum = 0 , default = 3}, + }, + required = {"broker_list", "kafka_topic", "key"} +} + +local _M = { + version = 0.1, + priority = 403, + name = plugin_name, + schema = schema, +} + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + +local function log(premature, conf, log_message) + if premature then + return + end + + if core.table.nkeys(conf.broker_list) == 0 then + core.log.error("failed to identify the broker specified") + end + + local broker_list = {} + local broker_config = {} + + for host, port in pairs(conf.broker_list) do + if type(host) == 'string' + and type(port) == 'number' then + + local broker = { + host = host, port = port + } + table.insert(broker_list,broker) + end + end + + broker_config["request_timeout"] = conf.timeout + broker_config["max_retry"] = conf.max_retry + + --Async producers will queue logs and push them when the buffer exceeds. + if conf.async then + broker_config["producer_type"] = "async" + end + + local prod, err = producer:new(broker_list,broker_config) + if err then + core.log.error("failed to identify the broker specified", err) + return + end + + local ok, err = prod:send(conf.kafka_topic, conf.key, log_message) + if not ok then + core.log.error("failed to send data to Kafka topic", err) + end +end + +function _M.log(conf) + return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx))) +end + +return _M diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index 7a3479417151..98424c6b314f 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -48,6 +48,7 @@ dependencies = { "lua-resty-prometheus = 1.0", "jsonschema = 0.8", "lua-resty-ipmatcher = 0.6", + "lua-resty-kafka = 0.07", } build = { diff --git a/t/admin/plugins.t b/t/admin/plugins.t index b8771c711328..ee4e0ab3a223 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -30,7 +30,7 @@ __DATA__ --- request GET /apisix/admin/plugins/list --- response_body_like eval -qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror"\]/ +qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror","kafka-logger"\]/ --- no_error_log [error] diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t index d5a6c1d1035a..fae59849b24b 100644 --- a/t/debug/debug-mode.t +++ b/t/debug/debug-mode.t @@ -75,6 +75,7 @@ loaded plugin and sort by priority: 899 name: response-rewrite loaded plugin and sort by priority: 506 name: grpc-transcode loaded plugin and sort by priority: 500 name: prometheus loaded plugin and sort by priority: 405 name: tcp-logger +loaded plugin and sort by priority: 403 name: kafka-logger loaded plugin and sort by priority: 400 name: udp-logger loaded plugin and sort by priority: 0 name: example-plugin loaded plugin and sort by priority: -1000 name: zipkin diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t new file mode 100644 index 000000000000..e9344d693252 --- /dev/null +++ b/t/plugin/kafka-logger.t @@ -0,0 +1,251 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: sanity +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + key = "key1", + broker_list = { + ["127.0.0.1"] = 3 + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +done +--- no_error_log +[error] + + + +=== TEST 2: missing broker list +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({kafka_topic = "test", key= "key1"}) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "broker_list" is required +done +--- no_error_log +[error] + + + +=== TEST 3: wrong type of string +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + broker_list = { + ["127.0.0.1"] = 3000 + }, + timeout = "10", + kafka_topic ="test", + key= "key1" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "timeout" validation failed: wrong type: expected integer, got string +done +--- no_error_log +[error] + + + +=== TEST 4: add plugin +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 5: access +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 0.2 + + + +=== TEST 6: error log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092, + "127.0.0.1":9093 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092, + "127.0.0.1":9093 + }, + "kafka_topic" : "test2", + "key" : "key1" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +failed to send data to Kafka topic +[error] +--- wait: 0.2