From 49762bcd7509448340ea0d405e0230e99ae0a8f5 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 7 Dec 2021 09:14:36 +0800 Subject: [PATCH] feat: rocketmq logger (#5653) --- README.md | 2 +- apisix/plugins/rocketmq-logger.lua | 256 +++++ ci/linux-ci-init-service.sh | 6 + ci/pod/docker-compose.yml | 24 + conf/config-default.yaml | 1 + docs/en/latest/config.json | 1 + docs/en/latest/plugins/rocketmq-logger.md | 234 +++++ docs/zh/latest/README.md | 2 +- docs/zh/latest/config.json | 1 + docs/zh/latest/plugins/rocketmq-logger.md | 229 +++++ rockspec/apisix-master-0.rockspec | 1 + t/admin/plugins.t | 1 + t/debug/debug-mode.t | 1 + t/plugin/rocketmq-logger-log-format.t | 121 +++ t/plugin/rocketmq-logger.t | 1098 +++++++++++++++++++++ 15 files changed, 1976 insertions(+), 2 deletions(-) create mode 100644 apisix/plugins/rocketmq-logger.lua create mode 100644 docs/en/latest/plugins/rocketmq-logger.md create mode 100644 docs/zh/latest/plugins/rocketmq-logger.md create mode 100644 t/plugin/rocketmq-logger-log-format.t create mode 100644 t/plugin/rocketmq-logger.t diff --git a/README.md b/README.md index 1591e542230e..b3c8909bcc00 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against - High performance: The single-core QPS reaches 18k with an average delay of fewer than 0.2 milliseconds. - [Fault Injection](docs/en/latest/plugins/fault-injection.md) - [REST Admin API](docs/en/latest/admin-api.md): Using the REST Admin API to control Apache APISIX, which only allows 127.0.0.1 access by default, you can modify the `allow_admin` field in `conf/config.yaml` to specify a list of IPs that are allowed to call the Admin API. Also, note that the Admin API uses key auth to verify the identity of the caller. **The `admin_key` field in `conf/config.yaml` needs to be modified before deployment to ensure security**. - - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md)) + - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md), [RocketMQ Logger](docs/en/latest/plugins/rocketmq-logger.md)) - [Datadog](docs/en/latest/plugins/datadog.md): push custom metrics to the DogStatsD server, comes bundled with [Datadog agent](https://docs.datadoghq.com/agent/), over the UDP protocol. DogStatsD basically is an implementation of StatsD protocol which collects the custom metrics for Apache APISIX agent, aggregates it into a single data point and sends it to the configured Datadog server. - [Helm charts](https://github.com/apache/apisix-helm-chart) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua new file mode 100644 index 000000000000..bf7adc5724e1 --- /dev/null +++ b/apisix/plugins/rocketmq-logger.lua @@ -0,0 +1,256 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local producer = require ("resty.rocketmq.producer") +local acl_rpchook = require("resty.rocketmq.acl_rpchook") +local batch_processor = require("apisix.utils.batch-processor") +local plugin = require("apisix.plugin") + +local type = type +local pairs = pairs +local plugin_name = "rocketmq-logger" +local stale_timer_running = false +local ngx = ngx +local timer_at = ngx.timer.at +local buffers = {} + +local lrucache = core.lrucache.new({ + type = "plugin", +}) + +local schema = { + type = "object", + properties = { + meta_format = { + type = "string", + default = "default", + enum = {"default", "origin"}, + }, + nameserver_list = { + type = "array", + minItems = 1, + items = { + type = "string" + } + }, + topic = {type = "string"}, + key = {type = "string"}, + tag = {type = "string"}, + timeout = {type = "integer", minimum = 1, default = 3}, + use_tls = {type = "boolean", default = false}, + access_key = {type = "string", default = ""}, + secret_key = {type = "string", default = ""}, + name = {type = "string", default = "rocketmq logger"}, + max_retry_count = {type = "integer", minimum = 0, default = 0}, + retry_delay = {type = "integer", minimum = 0, default = 1}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, + inactive_timeout = {type = "integer", minimum = 1, default = 5}, + include_req_body = {type = "boolean", default = false}, + include_req_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + include_resp_body = {type = "boolean", default = false}, + include_resp_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + }, + required = {"nameserver_list", "topic"} +} + +local metadata_schema = { + type = "object", + properties = { + log_format = log_util.metadata_schema_log_format, + }, +} + +local _M = { + version = 0.1, + priority = 402, + name = plugin_name, + schema = schema, + metadata_schema = metadata_schema, +} + + +function _M.check_schema(conf, schema_type) + if schema_type == core.schema.TYPE_METADATA then + return core.schema.check(metadata_schema, conf) + end + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) +end + + +-- remove stale objects from the memory after timer expires +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in pairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.warn("removing batch processor stale object, conf: ", + core.json.delay_encode(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + +local function create_producer(nameserver_list, producer_config) + core.log.info("create new rocketmq producer instance") + local prod = producer.new(nameserver_list, "apisixLogProducer") + if producer_config.use_tls then + prod:setUseTLS(true) + end + if producer_config.access_key ~= '' then + local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key) + prod:addRPCHook(aclHook) + end + prod:setTimeout(producer_config.timeout) + return prod +end + + +local function send_rocketmq_data(conf, log_message, prod) + local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key) + if not result then + return false, "failed to send data to rocketmq topic: " .. err .. + ", nameserver_list: " .. core.json.encode(conf.nameserver_list) + end + + core.log.info("queue: ", result.sendResult.messageQueue.queueId) + + return true +end + + +function _M.body_filter(conf, ctx) + log_util.collect_body(conf, ctx) +end + + +function _M.log(conf, ctx) + local entry + if conf.meta_format == "origin" then + entry = log_util.get_req_original(ctx, conf) + else + local metadata = plugin.plugin_metadata(plugin_name) + core.log.info("metadata: ", core.json.delay_encode(metadata)) + if metadata and metadata.value.log_format + and core.table.nkeys(metadata.value.log_format) > 0 + then + entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) + core.log.info("custom log format entry: ", core.json.delay_encode(entry)) + else + entry = log_util.get_full_log(ngx, conf) + core.log.info("full log entry: ", core.json.delay_encode(entry)) + end + end + + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + + local log_buffer = buffers[conf] + if log_buffer then + log_buffer:push(entry) + return + end + + -- reuse producer via lrucache to avoid unbalanced partitions of messages in rocketmq + local producer_config = { + timeout = conf.timeout * 1000, + use_tls = conf.use_tls, + access_key = conf.access_key, + secret_key = conf.secret_key, + } + + local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, + conf.nameserver_list, producer_config) + if err then + return nil, "failed to create the rocketmq producer: " .. err + end + core.log.info("rocketmq nameserver_list[1] port ", + prod.client.nameservers[1].port) + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data = entries[1] + if type(data) ~= "string" then + data, err = core.json.encode(data) -- encode as single {} + end + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + core.log.info("send data to rocketmq: ", data) + return send_rocketmq_data(conf, data, prod) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return + end + + buffers[conf] = log_buffer + log_buffer:push(entry) +end + + +return _M diff --git a/ci/linux-ci-init-service.sh b/ci/linux-ci-init-service.sh index 83144b7b3b30..0c3ff5d03096 100755 --- a/ci/linux-ci-init-service.sh +++ b/ci/linux-ci-init-service.sh @@ -25,3 +25,9 @@ docker pull openwhisk/action-nodejs-v14:nightly docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v /var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:nightly docker exec -i openwhisk waitready docker exec -i openwhisk bash -c "wsk action update test <(echo 'function main(args){return {\"hello\":args.name || \"test\"}}') --kind nodejs:14" + +docker exec -i rmqnamesrv rm /home/rocketmq/rocketmq-4.6.0/conf/tools.yml +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test2 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test3 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test4 -c DefaultCluster diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index c71ab63d6dc6..2dedaf9dff80 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -355,6 +355,29 @@ services: networks: apisix_net: + rocketmq_namesrv: + image: apacherocketmq/rocketmq:4.6.0 + container_name: rmqnamesrv + restart: unless-stopped + ports: + - "9876:9876" + command: sh mqnamesrv + networks: + rocketmq_net: + + rocketmq_broker: + image: apacherocketmq/rocketmq:4.6.0 + container_name: rmqbroker + restart: unless-stopped + ports: + - "10909:10909" + - "10911:10911" + - "10912:10912" + depends_on: + - rocketmq_namesrv + command: sh mqbroker -n rocketmq_namesrv:9876 -c ../conf/broker.conf + networks: + rocketmq_net: networks: apisix_net: @@ -362,3 +385,4 @@ networks: kafka_net: nacos_net: skywalk_net: + rocketmq_net: diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 7cf130b256cb..7be15f9267b0 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -349,6 +349,7 @@ plugins: # plugin list (sorted by priority) - sls-logger # priority: 406 - tcp-logger # priority: 405 - kafka-logger # priority: 403 + - rocketmq-logger # priority: 402 - syslog # priority: 401 - udp-logger # priority: 400 #- log-rotate # priority: 100 diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index 8a775e099b29..72f027f0e91f 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -116,6 +116,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..f17968c4dd51 --- /dev/null +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -0,0 +1,234 @@ +--- +title: rocketmq-logger +--- + + + +## Summary + +- [**Name**](#name) +- [**Attributes**](#attributes) +- [**Info**](#info) +- [**How To Enable**](#how-to-enable) +- [**Test Plugin**](#test-plugin) +- [**Disable Plugin**](#disable-plugin) + +## Name + +`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters. + + In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. + +For more info on Batch-Processor in Apache APISIX please refer. +[Batch-Processor](../batch-processor.md) + +## Attributes + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- | +| nameserver_list | object | required | | | An array of rocketmq nameservers. | +| topic | string | required | | | Target topic to push data. | +| key | string | optional | | | Keys of messages to send. | +| tag | string | optional | | | Tags of messages to send. | +| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | +| use_tls | boolean | optional | false | | Whether to open TLS | +| access_key | string | optional | "" | | access key for ACL, empty string means disable ACL. | +| secret_key | string | optional | "" | | secret key for ACL. | +| name | string | optional | "rocketmq logger" | | A unique identifier to identity the batch processor. | +| meta_format | enum | optional | "default" | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)| +| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service. | +| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. | +| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.| +| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | +| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | +| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. | +| include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. | +| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. | +| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. | + +### examples of meta_format + +- **default**: + +```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } +``` + +- **origin**: + +```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef +``` + +## Info + +The `message` will write to the buffer first. +It will send to the rocketmq server when the buffer exceed the `batch_max_size`, +or every `buffer_duration` flush the buffer. + +In case of success, returns `true`. +In case of errors, returns `nil` with a string describing the error (`buffer overflow`). + +### Sample Nameserver list + +Specify the nameservers of the external rocketmq servers as below sample. + +```json +[ + "127.0.0.1:9876", + "127.0.0.2:9876" +] +``` + +## How To Enable + +The following is an example on how to enable the rocketmq-logger for a specific route. + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "batch_max_size": 1, + "name": "rocketmq logger" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## Test Plugin + +success: + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## Metadata + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | +| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). | + + Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use rocketmq-logger plugin. + +**APISIX Variables** + +| Variable Name | Description | Usage Example | +|------------------|-------------------------|----------------| +| route_id | id of `route` | $route_id | +| route_name | name of `route` | $route_name | +| service_id | id of `service` | $service_id | +| service_name | name of `service` | $service_name | +| consumer_name | username of `consumer` | $consumer_name | + +### Example + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +It is expected to see some logs like that: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## Disable Plugin + +Remove the corresponding json configuration in the plugin configuration to disable the `rocketmq-logger`. +APISIX plugins are hot-reloaded, therefore no need to restart APISIX. + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/docs/zh/latest/README.md b/docs/zh/latest/README.md index c8a3a654cafa..d46eb94a851b 100644 --- a/docs/zh/latest/README.md +++ b/docs/zh/latest/README.md @@ -135,7 +135,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵 - 高性能:在单核上 QPS 可以达到 18k,同时延迟只有 0.2 毫秒。 - [故障注入](plugins/fault-injection.md) - [REST Admin API](admin-api.md): 使用 REST Admin API 来控制 Apache APISIX,默认只允许 127.0.0.1 访问,你可以修改 `conf/config.yaml` 中的 `allow_admin` 字段,指定允许调用 Admin API 的 IP 列表。同时需要注意的是,Admin API 使用 key auth 来校验调用者身份,**在部署前需要修改 `conf/config.yaml` 中的 `admin_key` 字段,来保证安全。** - - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md)) + - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md), [RocketMQ Logger](plugins/rocketmq-logger.md)) - [Helm charts](https://github.com/apache/apisix-helm-chart) - **高度可扩展** diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json index 9a2eef66dd11..cd914b7a4700 100644 --- a/docs/zh/latest/config.json +++ b/docs/zh/latest/config.json @@ -114,6 +114,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..f61c0b4acf9a --- /dev/null +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -0,0 +1,229 @@ +--- +title: rocketmq-logger +--- + + + +## 目录 + +- [**简介**](#简介) +- [**属性**](#属性) +- [**工作原理**](#工作原理) +- [**如何启用**](#如何启用) +- [**测试插件**](#测试插件) +- [**禁用插件**](#禁用插件) + +## 简介 + +`rocketmq-logger` 插件可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。 + +如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 + +有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 +[Batch-Processor](../batch-processor.md) + +## 属性 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ | +| nameserver_list | object | 必须 | | | 要推送的 rocketmq 的 nameserver 列表。 | +| topic | string | 必须 | | | 要推送的 topic。 | +| key | string | 可选 | | | 发送消息的keys。 | +| tag | string | 可选 | | | 发送消息的tags。 | +| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 | +| use_tls | boolean | 可选 | false | | 是否开启TLS加密。 | +| access_key | string | 可选 | "" | | ACL认证的access key,空字符串表示不开启ACL。 | +| secret_key | string | 可选 | "" | | ACL认证的secret key。 | +| name | string | 可选 | "rocketmq logger" | | batch processor 的唯一标识。 | +| meta_format | enum | 可选 | "default" | ["default","origin"] | `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)| +| batch_max_size | integer | 可选 | 1000 | [1,...] | 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 `rocketmq` 服务。| +| inactive_timeout | integer | 可选 | 5 | [1,...] | 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 `rocketmq` 服务。 | +| buffer_duration | integer | 可选 | 60 | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位)。 | +| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | +| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | +| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。| +| include_req_body_expr | array | 可选 | | | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 | +| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 | +| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`| + +### meta_format 参考示例 + +- **default**: + +```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } +``` + +- **origin**: + +```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef +``` + +## 工作原理 + +消息将首先写入缓冲区。 +当缓冲区超过 `batch_max_size` 时,它将发送到 rocketmq 服务器, +或每个 `buffer_duration` 刷新缓冲区。 + +如果成功,则返回 `true`。 +如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。 + +### Nameserver 列表 + +配置多个nameserver地址如下: + +```json +[ + "127.0.0.1:9876", + "127.0.0.2:9876" +] +``` + +## 如何启用 + +1. 为特定路由启用 rocketmq-logger 插件。 + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## 测试插件 + + 成功 + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## 插件元数据设置 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ | +| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,**该设置是全局生效的**,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 | + +**APISIX 变量** + +| 变量名 | 描述 | 使用示例 | +|------------------|-------------------------|----------------| +| route_id | `route` 的 id | $route_id | +| route_name | `route` 的 name | $route_name | +| service_id | `service` 的 id | $service_id | +| service_name | `service` 的 name | $service_name | +| consumer_name | `consumer` 的 username | $consumer_name | + +### 设置日志格式示例 + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +在日志收集处,将得到类似下面的日志: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## 禁用插件 + +当您要禁用 `rocketmq-logger` 插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效: + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index 56b1f59ffe50..444471be4919 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -72,6 +72,7 @@ dependencies = { "api7-snowflake = 2.0-1", "inspect == 3.1.1", "lualdap = 1.2.6-1", + "lua-resty-rocketmq = 0.2.1-1", } build = { diff --git a/t/admin/plugins.t b/t/admin/plugins.t index dbe585997b08..4bc6c0d1ee48 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -106,6 +106,7 @@ google-cloud-logging sls-logger tcp-logger kafka-logger +rocketmq-logger syslog udp-logger example-plugin diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t index 2f38dcbf5151..026569558546 100644 --- a/t/debug/debug-mode.t +++ b/t/debug/debug-mode.t @@ -81,6 +81,7 @@ loaded plugin and sort by priority: 410 name: http-logger loaded plugin and sort by priority: 406 name: sls-logger loaded plugin and sort by priority: 405 name: tcp-logger loaded plugin and sort by priority: 403 name: kafka-logger +loaded plugin and sort by priority: 402 name: rocketmq-logger loaded plugin and sort by priority: 401 name: syslog loaded plugin and sort by priority: 400 name: udp-logger loaded plugin and sort by priority: 0 name: example-plugin diff --git a/t/plugin/rocketmq-logger-log-format.t b/t/plugin/rocketmq-logger-log-format.t new file mode 100644 index 000000000000..b3a364b14c9c --- /dev/null +++ b/t/plugin/rocketmq-logger-log-format.t @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('info'); +repeat_each(1); +no_long_string(); +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: add plugin metadata +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_PUT, + [[{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + }]], + [[{ + "node": { + "value": { + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + } + }, + "action": "set" + }]] + ) + + ngx.status = code + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: set route(id: 1), batch_max_size=1 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "tag" : "tag1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 3: hit route and report rocketmq logger +--- request +GET /hello +--- response_body +hello world +--- wait: 0.5 +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"host":"localhost"/ diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t new file mode 100644 index 000000000000..0ac81229a0cb --- /dev/null +++ b/t/plugin/rocketmq-logger.t @@ -0,0 +1,1098 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: sanity +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + topic = "test", + key = "key1", + nameserver_list = { + "127.0.0.1:3" + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +done +--- no_error_log +[error] + + + +=== TEST 2: missing nameserver list +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({topic = "test", key= "key1"}) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "nameserver_list" is required +done +--- no_error_log +[error] + + + +=== TEST 3: wrong type of string +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + nameserver_list = { + "127.0.0.1:3000" + }, + timeout = "10", + topic ="test", + key= "key1" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "timeout" validation failed: wrong type: expected integer, got string +done +--- no_error_log +[error] + + + +=== TEST 4: set route(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 5: access +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 6: unavailable nameserver +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9877" ], + "topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9877" ], + "topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +failed to send data to rocketmq topic +[error] +--- wait: 1 + + + +=== TEST 7: set route(meta_format = origin, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": true, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 8: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close + +abcdef +--- wait: 2 + + + +=== TEST 9: set route(meta_format = origin, include_req_body = false) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 10: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close +--- wait: 2 + + + +=== TEST 11: set route(meta_format = default) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 12: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 13: set route(id: 1), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 14: access, test key field is optional +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 15: set route(meta_format = default), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 16: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 17: use the topic with 3 partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 18: report log to rocketmq by different partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", + "producer_type": "sync", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 19: report log to rocketmq by different partitions in async mode +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", + "producer_type": "async", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 20: update the nameserver_list, generate different rocketmq producers +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + ngx.sleep(0.5) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:19876" ], + "topic" : "test4", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 10 +--- response +passed +--- wait: 5 +--- error_log +phase_func(): rocketmq nameserver_list[1] port 9876 +phase_func(): rocketmq nameserver_list[1] port 19876 +--- no_error_log eval +qr/not found topic/ + + + +=== TEST 21: use the topic that does not exist on rocketmq(even if rocketmq allows auto create topics, first time push messages to rocketmq would got this error) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "undefined_topic", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 5 +--- response +passed +--- error_log eval +qr/getTopicRouteInfoFromNameserver return TOPIC_NOT_EXIST, No topic route info in name server for the topic: undefined_topic/ + + + +=== TEST 22: rocketmq nameserver list info in log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log_like eval +qr/create new rocketmq producer instance, nameserver_list: \[\{"port":9876,"host":"127.0.0.127"}]/ +qr/failed to send data to rocketmq topic: .*, nameserver_list: \{"127.0.0.127":9876}/ + + + +=== TEST 23: delete plugin metadata, tests would fail if run rocketmq-logger-log-format.t and plugin metadata is added +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_DELETE, + nil, + [[{"action": "delete"}]]) + } + } +--- request +GET /t +--- response_body + +--- no_error_log +[error] + + + +=== TEST 24: set route(id: 1,include_req_body = true,include_req_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_req_body": true, + "include_req_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 25: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 26: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 27: check log schema(include_req_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + topic = "test", + key = "key1", + nameserver_list = { + "127.0.0.1:3" + }, + include_req_body = true, + include_req_body_expr = { + {"bar", "<>", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_req_body_expr' expression: invalid operator '<>' +done +--- no_error_log +[error] + + + +=== TEST 28: check log schema(include_resp_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + topic = "test", + key = "key1", + nameserver_list = { + "127.0.0.1:3" + }, + include_resp_body = true, + include_resp_body_expr = { + {"bar", "", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_resp_body_expr' expression: invalid operator '' +done +--- no_error_log +[error] + + + +=== TEST 29: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "include_resp_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 30: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2 + + + +=== TEST 31: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2