Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka-logger): add cluster name support #4876

Merged
merged 6 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ local timer_at = ngx.timer.at
local ngx = ngx
local buffers = {}


local lrucache = core.lrucache.new({
type = "plugin",
})
Expand Down Expand Up @@ -60,7 +59,10 @@ local schema = {
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false}
include_req_body = {type = "boolean", default = false},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
},
required = {"broker_list", "kafka_topic"}
}
Expand Down Expand Up @@ -134,9 +136,9 @@ local function remove_stale_objects(premature)
end


local function create_producer(broker_list, broker_config)
local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config)
return producer:new(broker_list, broker_config, cluster_name)
end


Expand Down Expand Up @@ -209,7 +211,9 @@ function _M.log(conf, ctx)
broker_config["producer_type"] = conf.producer_type

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config)
broker_list, broker_config, conf.cluster_name)
core.log.info("kafka cluster name ", conf.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)
if err then
return nil, "failed to identify the broker specified: " .. err
end
Expand Down
13 changes: 8 additions & 5 deletions ci/install-ext-services-via-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
docker run --rm -itd -e HTTP_PORT=8888 -e HTTPS_PORT=9999 -p 8888:8888 -p 9999:9999 mendhak/http-https-echo
# Runs Keycloak version 10.0.2 with inbuilt policies for unit tests
docker run --rm -itd -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=123456 -p 8090:8080 -p 8443:8443 sshniro/keycloak-apisix:1.0.0
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
# spin up kafka cluster for tests (2 zookeper and 2 kafka instance)
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker run --name zookeeper-server1 -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name zookeeper-server2 -d -p 12181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server1:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker run --name kafka-server2 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server2:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 19092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i kafka-server2 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4

# start skywalking
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 12800:12800 apache/skywalking-oap-server:8.3.0-es6
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|

### examples of meta_format

Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ title: kafka-logger
| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|

### meta_format 参考示例

Expand Down
142 changes: 142 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -722,3 +722,145 @@ GET /t
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]



=== TEST 20: update the broker_list and cluster_name, generate different kafka producers
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)
ngx.sleep(0.5)

if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end

code, body = t('/apisix/admin/global_rules/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1": 9092
},
"kafka_topic" : "test2",
"timeout" : 1,
"batch_max_size": 1,
"include_req_body": false,
"cluster_name": 1
}
}
}]]
)

if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end

t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)

code, body = t('/apisix/admin/global_rules/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1": 19092
},
"kafka_topic" : "test4",
"timeout" : 1,
"batch_max_size": 1,
"include_req_body": false,
"cluster_name": 2
}
}
}]]
)

if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end

t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)

ngx.sleep(2)
ngx.say("passed")
}
}
--- request
GET /t
--- timeout: 10
--- response
passed
--- wait: 5
--- error_log
phase_func(): kafka cluster name 1, broker_list[1] port 9092
phase_func(): kafka cluster name 2, broker_list[1] port 19092
--- no_error_log eval
qr/not found topic/



=== TEST 21: use the topic that does not exist on kafka(even if kafka allows auto create topics, first time push messages to kafka would got this error)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/global_rules/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1": 9092
},
"kafka_topic" : "undefined_topic",
"timeout" : 1,
"batch_max_size": 1,
"include_req_body": false
}
}
}]]
)

if code >= 300 then
ngx.status = code
ngx.say("fail")
return
end

t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)

ngx.sleep(2)
ngx.say("passed")
}
}
--- request
GET /t
--- timeout: 5
--- response
passed
--- error_log eval
qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/