Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Logger. #1312

Merged
merged 4 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .travis/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

do_install() {
Expand Down
8 changes: 8 additions & 0 deletions .travis/linux_tengine_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

tengine_install() {
Expand Down
1 change: 1 addition & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ plugins: # plugin list
- proxy-cache
- tcp-logger
- proxy-mirror
- kafka-logger

stream_plugins:
- mqtt-proxy
5 changes: 3 additions & 2 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ Plugins
* [response-rewrite](plugins/response-rewrite.md): Set customized response status code, body and header to the client.
* [fault-injection](plugins/fault-injection.md): The specified response body, response code, and response time can be returned, which provides processing capabilities in different failure scenarios, such as service failure, service overload, and high service delay.
* [proxy-cache](plugins/proxy-cache.md): Provides the ability to cache upstream response data.
* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers
* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers
* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers.
* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers.
* [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests.
* [kafka-logger](plugins/kafka-logger.md): Log requests to External Kafka servers.

Deploy to the Cloud
=======
Expand Down
2 changes: 1 addition & 1 deletion doc/README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ Reference document
* [proxy-mirror](plugins/proxy-mirror-cn.md):代理镜像插件提供镜像客户端请求的能力。
* [udp-logger](plugins/udp-logger.md): 将请求记录到UDP服务器
* [tcp-logger](plugins/tcp-logger.md): 将请求记录到TCP服务器

* [kafka-logger](plugins/kafka-logger-cn.md): 将请求记录到外部Kafka服务器。
130 changes: 130 additions & 0 deletions doc/plugins/kafka-logger-cn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# Summary
- [**定义**](#name)
- [**属性列表**](#attributes)
- [**信息**](#info)
- [**如何开启**](#how-to-enable)
- [**测试插件**](#test-plugin)
- [**禁用插件**](#disable-plugin)

## 定义

`kafka-logger` 是一个插件,可用作ngx_lua nginx模块的Kafka客户端驱动程序。

这将提供将Log数据请求作为JSON对象发送到外部Kafka集群的功能。

## 属性列表

|属性名称 |必选项 |描述|
|--------- |--------|-----------|
| broker_list |必要的| 一系列的Kafka经纪人。|
| kafka_topic |必要的| 定位主题以推送数据。|
| timeout |可选的|上游发送数据超时。|
| async |可选的|布尔值,用于控制是否执行异步推送。|
| key |必要的|消息的密钥。|
| max_retry |可选的|没有重试次数。|

## 信息

异步与同步数据推送之间的区别。

1. 同步模型

如果成功,则返回当前代理和分区的偏移量(** cdata:LL **)。
如果发生错误,则返回“ nil”,并带有描述错误的字符串。

2. 在异步模型中

消息将首先写入缓冲区。
当缓冲区超过`batch_num`时,它将发送到kafka服务器,
或每个`flush_time`刷新缓冲区。

如果成功,则返回“ true”。
如果出现错误,则返回“ nil”,并带有描述错误的字符串(“缓冲区溢出”)。

##### 样本经纪人名单

此插件支持一次推送到多个经纪人。如以下示例所示,指定外部kafka服务器的代理,以使此功能生效。

```json
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
```

## 如何开启

1. 这是有关如何为特定路由启用kafka-logger插件的示例。

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"username": "foo",
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
```

## 测试插件

* 成功:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

## 禁用插件

当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效:

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
134 changes: 134 additions & 0 deletions doc/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# Summary
- [**Name**](#name)
- [**Attributes**](#attributes)
- [**Info**](#info)
- [**How To Enable**](#how-to-enable)
- [**Test Plugin**](#test-plugin)
- [**Disable Plugin**](#disable-plugin)


## Name

`kafka-logger` is a plugin which works as a Kafka client driver for the ngx_lua nginx module.

This will provide the ability to send Log data requests as JSON objects to external Kafka clusters.

## Attributes

|Name |Requirement |Description|
|--------- |--------|-----------|
| broker_list |required| An array of Kafka brokers.|
| kafka_topic |required| Target topic to push data.|
| timeout |optional|Timeout for the upstream to send data.|
| async |optional|Boolean value to control whether to perform async push.|
| key |required|Key for the message.|
| max_retry |optional|No of retries|

## Info

Difference between async and the sync data push.

1. In sync model

In case of success, returns the offset (** cdata: LL **) of the current broker and partition.
In case of errors, returns `nil` with a string describing the error.

2. In async model

The `message` will write to the buffer first.
It will send to the kafka server when the buffer exceed the `batch_num`,
or every `flush_time` flush the buffer.

In case of success, returns `true`.
In case of errors, returns `nil` with a string describing the error (`buffer overflow`).

##### Sample broker list

This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below
sample to take effect of this functionality.

```json
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
```

## How To Enable

1. Here is an examle on how to enable kafka-logger plugin for a specific route.

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"username": "foo",
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
```

## Test Plugin

* success:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

## Disable Plugin

When you want to disable the `kafka-logger` plugin, it is very simple,
you can delete the corresponding json configuration in the plugin configuration,
no need to restart the service, it will take effect immediately:

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
Loading