Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rocketmq logger #5653

Merged
merged 24 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against
- High performance: The single-core QPS reaches 18k with an average delay of fewer than 0.2 milliseconds.
- [Fault Injection](docs/en/latest/plugins/fault-injection.md)
- [REST Admin API](docs/en/latest/admin-api.md): Using the REST Admin API to control Apache APISIX, which only allows 127.0.0.1 access by default, you can modify the `allow_admin` field in `conf/config.yaml` to specify a list of IPs that are allowed to call the Admin API. Also, note that the Admin API uses key auth to verify the identity of the caller. **The `admin_key` field in `conf/config.yaml` needs to be modified before deployment to ensure security**.
- External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md))
- External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md), [RocketMQ Logger](docs/en/latest/plugins/rocketmq-logger.md))
- [Datadog](docs/en/latest/plugins/datadog.md): push custom metrics to the DogStatsD server, comes bundled with [Datadog agent](https://docs.datadoghq.com/agent/), over the UDP protocol. DogStatsD basically is an implementation of StatsD protocol which collects the custom metrics for Apache APISIX agent, aggregates it into a single data point and sends it to the configured Datadog server.
- [Helm charts](https://github.com/apache/apisix-helm-chart)

Expand Down
271 changes: 271 additions & 0 deletions apisix/plugins/rocketmq-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.rocketmq.producer")
local acl_rpchook = require("resty.rocketmq.acl_rpchook")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")

local pairs = pairs
local type = type
local ipairs = ipairs
local plugin_name = "rocketmq-logger"
local stale_timer_running = false
local timer_at = ngx.timer.at
local ngx = ngx
local buffers = {}
yuz10 marked this conversation as resolved.
Show resolved Hide resolved

local lrucache = core.lrucache.new({
type = "plugin",
})

local schema = {
type = "object",
properties = {
meta_format = {
type = "string",
default = "default",
enum = {"default", "origin"},
},
nameserver_list = {
type = "object",
minProperties = 1,
patternProperties = {
yuz10 marked this conversation as resolved.
Show resolved Hide resolved
[".*"] = {
description = "the port of rocketmq nameserver",
type = "integer",
minimum = 1,
maximum = 65535,
},
},
},
rocketmq_topic = {type = "string"},
yuz10 marked this conversation as resolved.
Show resolved Hide resolved
key = {type = "string"},
tag = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
use_tls = {type = "boolean", default = false},
access_key = {type = "string", default = ""},
secret_key = {type = "string", default = ""},
name = {type = "string", default = "rocketmq logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
include_req_body = {type = "boolean", default = false},
include_req_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
include_resp_body = {type = "boolean", default = false},
include_resp_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array",
items = {
type = "string"
}
}
},
},
required = {"nameserver_list", "rocketmq_topic"}
}

local metadata_schema = {
type = "object",
properties = {
log_format = log_util.metadata_schema_log_format,
},
}

local _M = {
version = 0.1,
priority = 402,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
}


function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end

local ok, err = core.schema.check(schema, conf)
if not ok then
return nil, err
end
return log_util.check_log_schema(conf)
end


-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end

for key, batch in ipairs(buffers) do
yuz10 marked this conversation as resolved.
Show resolved Hide resolved
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.warn("removing batch processor stale object, conf: ",
core.json.delay_encode(key))
buffers[key] = nil
end
end

stale_timer_running = false
end


local function create_producer(nameserver_list, producer_config)
core.log.info("create new rocketmq producer instance")
local prod = producer.new(nameserver_list, "apisixLogProducer")
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
if producer_config.use_tls then
prod:setUseTLS(true)
end
if producer_config.access_key ~= '' then
local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
prod:addRPCHook(aclHook)
end
prod:setTimeout(producer_config.timeout)
return prod
end


local function send_rocketmq_data(conf, log_message, prod)
local result, err = prod:send(conf.rocketmq_topic, log_message, conf.tag, conf.key)
if not result then
return false, "failed to send data to rocketmq topic: " .. err ..
", nameserver_list: " .. core.json.encode(conf.nameserver_list)
Comment on lines +154 to +155
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the nameserver for the current error?

Copy link
Member Author

@yuz10 yuz10 Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I dont understand what you mean. the nameserver is add to err and test case will verify the error log if send fail. such as https://github.com/yuz10/apisix/blob/master/t/plugin/rocketmq-logger.t#L862

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have more nameservers in nameserver_list, one of them is unavailable and this message is sent to the unavailable nameserver, can the log here only print out the unavailable nameserver?

Ok, if this is not convenient to do in this PR, we can optimise it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which nameserver is error can be returned from the lua-resty-rocketmq library, We can create an issue there.

end

core.log.info("queue: ", result.sendResult.messageQueue.queueId)

return true
end


function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end


function _M.log(conf, ctx)
local entry
if conf.meta_format == "origin" then
entry = log_util.get_req_original(ctx, conf)
-- core.log.info("origin entry: ", entry)
yuz10 marked this conversation as resolved.
Show resolved Hide resolved

else
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
core.log.info("custom log format entry: ", core.json.delay_encode(entry))
else
entry = log_util.get_full_log(ngx, conf)
core.log.info("full log entry: ", core.json.delay_encode(entry))
end
end

if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end

local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
end

-- reuse producer via lrucache to avoid unbalanced partitions of messages in rocketmq
local nameserver_list = core.table.new(core.table.nkeys(conf.nameserver_list), 0)

for host, port in pairs(conf.nameserver_list) do
local nameserver = host .. ':' .. port
core.table.insert(nameserver_list, nameserver)
end

local producer_config = {
timeout = conf.timeout * 1000,
use_tls = conf.use_tls,
access_key = conf.access_key,
secret_key = conf.secret_key,
}

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
nameserver_list, producer_config)
if err then
return nil, "failed to create the rocketmq producer: " .. err
end
core.log.info("rocketmq nameserver_list[1] port ",
prod.client.nameservers[1].port)
-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data = entries[1]
if type(data) ~= "string" then
data, err = core.json.encode(data) -- encode as single {}
end
else
data, err = core.json.encode(entries) -- encode as array [{}]
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

core.log.info("send data to rocketmq: ", data)
return send_rocketmq_data(conf, data, prod)
end

local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[conf] = log_buffer
log_buffer:push(entry)
end


return _M
6 changes: 6 additions & 0 deletions ci/linux-ci-init-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ docker pull openwhisk/action-nodejs-v14:nightly
docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v /var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:nightly
docker exec -i openwhisk waitready
docker exec -i openwhisk bash -c "wsk action update test <(echo 'function main(args){return {\"hello\":args.name || \"test\"}}') --kind nodejs:14"

docker exec -i rmqnamesrv rm /home/rocketmq/rocketmq-4.6.0/conf/tools.yml
docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test -c DefaultCluster
docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test2 -c DefaultCluster
docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test3 -c DefaultCluster
docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test4 -c DefaultCluster
24 changes: 24 additions & 0 deletions ci/pod/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,34 @@ services:
networks:
apisix_net:

namesrv:
yuz10 marked this conversation as resolved.
Show resolved Hide resolved
image: apacherocketmq/rocketmq:4.6.0
container_name: rmqnamesrv
restart: unless-stopped
ports:
- "9876:9876"
command: sh mqnamesrv
networks:
rocketmq_net:

broker:
yuz10 marked this conversation as resolved.
Show resolved Hide resolved
image: apacherocketmq/rocketmq:4.6.0
container_name: rmqbroker
restart: unless-stopped
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"
depends_on:
- namesrv
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
networks:
rocketmq_net:

networks:
apisix_net:
consul_net:
kafka_net:
nacos_net:
skywalk_net:
rocketmq_net:
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ plugins: # plugin list (sorted by priority)
- sls-logger # priority: 406
- tcp-logger # priority: 405
- kafka-logger # priority: 403
- rocketmq-logger # priority: 402
- syslog # priority: 401
- udp-logger # priority: 400
#- log-rotate # priority: 100
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"plugins/skywalking-logger",
"plugins/tcp-logger",
"plugins/kafka-logger",
"plugins/rocketmq-logger",
"plugins/udp-logger",
"plugins/syslog",
"plugins/log-rotate",
Expand Down
Loading