diff --git a/Makefile b/Makefile index 45936959c45c..49468dc57e41 100644 --- a/Makefile +++ b/Makefile @@ -339,6 +339,9 @@ install: runtime $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/syslog $(ENV_INSTALL) apisix/plugins/syslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/syslog/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/tencent-cloud-cls + $(ENV_INSTALL) apisix/plugins/tencent-cloud-cls/*.lua $(ENV_INST_LUADIR)/apisix/plugins/tencent-cloud-cls/ + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/pubsub $(ENV_INSTALL) apisix/pubsub/*.lua $(ENV_INST_LUADIR)/apisix/pubsub/ diff --git a/apisix/plugins/tencent-cloud-cls.lua b/apisix/plugins/tencent-cloud-cls.lua new file mode 100644 index 000000000000..b0726e607eae --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls.lua @@ -0,0 +1,141 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local bp_manager_mod = require("apisix.utils.batch-processor-manager") +local cls_sdk = require("apisix.plugins.tencent-cloud-cls.cls-sdk") +local plugin = require("apisix.plugin") +local math = math +local ngx = ngx +local pairs = pairs + + +local plugin_name = "tencent-cloud-cls" +local batch_processor_manager = bp_manager_mod.new(plugin_name) +local schema = { + type = "object", + properties = { + cls_host = { type = "string" }, + cls_topic = { type = "string" }, + secret_id = { type = "string" }, + secret_key = { type = "string" }, + sample_ratio = { + type = "number", + minimum = 0.00001, + maximum = 1, + default = 1 + }, + include_req_body = { type = "boolean", default = false }, + include_resp_body = { type = "boolean", default = false }, + global_tag = { type = "object" }, + }, + required = { "cls_host", "cls_topic", "secret_id", "secret_key" } +} + + +local metadata_schema = { + type = "object", + properties = { + log_format = log_util.metadata_schema_log_format, + }, +} + + +local _M = { + version = 0.1, + priority = 397, + name = plugin_name, + schema = batch_processor_manager:wrap_schema(schema), + metadata_schema = metadata_schema, +} + + +function _M.check_schema(conf, schema_type) + if schema_type == core.schema.TYPE_METADATA then + return core.schema.check(metadata_schema, conf) + end + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) +end + + +function _M.access(conf, ctx) + ctx.cls_sample = false + if conf.sample_ratio == 1 or math.random() < conf.sample_ratio then + core.log.debug("cls sampled") + ctx.cls_sample = true + return + end +end + + +function _M.body_filter(conf, ctx) + if ctx.cls_sample then + log_util.collect_body(conf, ctx) + end +end + + +function _M.log(conf, ctx) + -- sample if set + if not ctx.cls_sample then + core.log.debug("cls not sampled, skip log") + return + end + local metadata = plugin.plugin_metadata(plugin_name) + core.log.info("metadata: ", core.json.delay_encode(metadata)) + + local entry + + if metadata and metadata.value.log_format + and core.table.nkeys(metadata.value.log_format) > 0 + then + core.log.debug("using custom format log") + entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) + else + entry = log_util.get_full_log(ngx, conf) + end + + if conf.global_tag then + for k, v in pairs(conf.global_tag) do + entry[k] = v + end + end + + if batch_processor_manager:add_entry(conf, entry) then + return + end + + local process = function(entries) + local sdk, err = cls_sdk.new(conf.cls_host, conf.cls_topic, conf.secret_id, conf.secret_key) + if err then + core.log.error("init sdk failed err:", err) + return false, err + end + return sdk:send_to_cls(entries) + end + + batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process) +end + + +return _M diff --git a/apisix/plugins/tencent-cloud-cls/cls-sdk.lua b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua new file mode 100644 index 000000000000..d2b6e8ad4525 --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua @@ -0,0 +1,312 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local pb = require "pb" +local protoc = require("protoc").new() +local http = require("resty.http") +local socket = require("socket") +local str_util = require("resty.string") +local core = require("apisix.core") +local core_gethostname = require("apisix.core.utils").gethostname +local json = core.json +local json_encode = json.encode +local ngx = ngx +local ngx_time = ngx.time +local ngx_now = ngx.now +local ngx_sha1_bin = ngx.sha1_bin +local ngx_hmac_sha1 = ngx.hmac_sha1 +local fmt = string.format +local table = table +local concat_tab = table.concat +local clear_tab = table.clear +local new_tab = table.new +local insert_tab = table.insert +local ipairs = ipairs +local pairs = pairs +local type = type +local tostring = tostring +local setmetatable = setmetatable +local pcall = pcall + +-- api doc https://www.tencentcloud.com/document/product/614/16873 +local MAX_SINGLE_VALUE_SIZE = 1 * 1024 * 1024 +local MAX_LOG_GROUP_VALUE_SIZE = 5 * 1024 * 1024 -- 5MB + +local cls_api_path = "/structuredlog" +local auth_expire_time = 60 +local cls_conn_timeout = 1000 +local cls_read_timeout = 10000 +local cls_send_timeout = 10000 + +local headers_cache = {} +local params_cache = { + ssl_verify = false, + headers = headers_cache, +} + + +local function get_ip(hostname) + local _, resolved = socket.dns.toip(hostname) + local ip_list = {} + for _, v in ipairs(resolved.ip) do + insert_tab(ip_list, v) + end + return ip_list +end + +local host_ip = tostring(unpack(get_ip(core_gethostname()))) +local log_group_list = {} +local log_group_list_pb = { + logGroupList = log_group_list, +} + + +local function sha1(msg) + return str_util.to_hex(ngx_sha1_bin(msg)) +end + + +local function sha1_hmac(key, msg) + return str_util.to_hex(ngx_hmac_sha1(key, msg)) +end + + +-- sign algorithm https://cloud.tencent.com/document/product/614/12445 +local function sign(secret_id, secret_key) + local method = "post" + local format_params = "" + local format_headers = "" + local sign_algorithm = "sha1" + local http_request_info = fmt("%s\n%s\n%s\n%s\n", + method, cls_api_path, format_params, format_headers) + local cur_time = ngx_time() + local sign_time = fmt("%d;%d", cur_time, cur_time + auth_expire_time) + local string_to_sign = fmt("%s\n%s\n%s\n", sign_algorithm, sign_time, sha1(http_request_info)) + + local sign_key = sha1_hmac(secret_key, sign_time) + local signature = sha1_hmac(sign_key, string_to_sign) + + local arr = { + "q-sign-algorithm=sha1", + "q-ak=" .. secret_id, + "q-sign-time=" .. sign_time, + "q-key-time=" .. sign_time, + "q-header-list=", + "q-url-param-list=", + "q-signature=" .. signature, + } + + return concat_tab(arr, '&') +end + + +-- normalized log data for CLS API +local function normalize_log(log) + local normalized_log = {} + local log_size = 4 -- empty obj alignment + for k, v in pairs(log) do + local v_type = type(v) + local field = { key = k, value = "" } + if v_type == "string" then + field["value"] = v + elseif v_type == "number" then + field["value"] = tostring(v) + elseif v_type == "table" then + field["value"] = json_encode(v) + else + field["value"] = tostring(v) + core.log.warn("unexpected type " .. v_type .. " for field " .. k) + end + if #field.value > MAX_SINGLE_VALUE_SIZE then + core.log.warn(field.key, " value size over ", MAX_SINGLE_VALUE_SIZE, " , truncated") + field.value = field.value:sub(1, MAX_SINGLE_VALUE_SIZE) + end + insert_tab(normalized_log, field) + log_size = log_size + #field.key + #field.value + end + return normalized_log, log_size +end + + +local _M = { version = 0.1 } +local mt = { __index = _M } + +local pb_state +local function init_pb_state() + local old_pb_state = pb.state(nil) + protoc.reload() + local cls_sdk_protoc = protoc.new() + -- proto file in https://www.tencentcloud.com/document/product/614/42787 + local ok, err = pcall(cls_sdk_protoc.load, cls_sdk_protoc, [[ +package cls; + +message Log +{ + message Content + { + required string key = 1; // Key of each field group + required string value = 2; // Value of each field group + } + required int64 time = 1; // Unix timestamp + repeated Content contents = 2; // Multiple key-value pairs in one log +} + +message LogTag +{ + required string key = 1; + required string value = 2; +} + +message LogGroup +{ + repeated Log logs = 1; // Log array consisting of multiple logs + optional string contextFlow = 2; // This parameter does not take effect currently + optional string filename = 3; // Log filename + optional string source = 4; // Log source, which is generally the machine IP + repeated LogTag logTags = 5; +} + +message LogGroupList +{ + repeated LogGroup logGroupList = 1; // Log group list +} + ]], "tencent-cloud-cls/cls.proto") + if not ok then + cls_sdk_protoc:reset() + pb.state(old_pb_state) + return "failed to load cls.proto: ".. err + end + pb_state = pb.state(old_pb_state) +end + + +function _M.new(host, topic, secret_id, secret_key) + if not pb_state then + local err = init_pb_state() + if err then + return nil, err + end + end + local self = { + host = host, + topic = topic, + secret_id = secret_id, + secret_key = secret_key, + } + return setmetatable(self, mt) +end + + +local function do_request_uri(uri, params) + local client = http:new() + client:set_timeouts(cls_conn_timeout, cls_send_timeout, cls_read_timeout) + local res, err = client:request_uri(uri, params) + client:close() + return res, err +end + + +function _M.send_cls_request(self, pb_obj) + -- recovery of stored pb_store + local old_pb_state = pb.state(pb_state) + local ok, pb_data = pcall(pb.encode, "cls.LogGroupList", pb_obj) + pb_state = pb.state(old_pb_state) + if not ok or not pb_data then + core.log.error("failed to encode LogGroupList, err: ", pb_data) + return false, pb_data + end + + clear_tab(headers_cache) + headers_cache["Host"] = self.host + headers_cache["Content-Type"] = "application/x-protobuf" + headers_cache["Authorization"] = sign(self.secret_id, self.secret_key, cls_api_path) + + -- TODO: support lz4/zstd compress + params_cache.method = "POST" + params_cache.body = pb_data + + local cls_url = "http://" .. self.host .. cls_api_path .. "?topic_id=" .. self.topic + core.log.debug("CLS request URL: ", cls_url) + + local res, err = do_request_uri(cls_url, params_cache) + if not res then + return false, err + end + + if res.status ~= 200 then + err = fmt("got wrong status: %s, headers: %s, body, %s", + res.status, json.encode(res.headers), res.body) + -- 413, 404, 401, 403 are not retryable + if res.status == 413 or res.status == 404 or res.status == 401 or res.status == 403 then + core.log.error(err, ", not retryable") + return true + end + + return false, err + end + + core.log.debug("CLS report success") + return true +end + + +function _M.send_to_cls(self, logs) + clear_tab(log_group_list) + local now = ngx_now() * 1000 + + local total_size = 0 + local format_logs = new_tab(#logs, 0) + -- sums of all value in all LogGroup should be no more than 5MB + -- so send whenever size exceed max size + local group_list_start = 1 + for i = 1, #logs, 1 do + local contents, log_size = normalize_log(logs[i]) + if log_size > MAX_LOG_GROUP_VALUE_SIZE then + core.log.error("size of log is over 5MB, dropped") + goto continue + end + total_size = total_size + log_size + if total_size > MAX_LOG_GROUP_VALUE_SIZE then + insert_tab(log_group_list, { + logs = format_logs, + source = host_ip, + }) + local ok, err = self:send_cls_request(log_group_list_pb) + if not ok then + return false, err, group_list_start + end + group_list_start = i + format_logs = new_tab(#logs - i, 0) + total_size = 0 + clear_tab(log_group_list) + end + insert_tab(format_logs, { + time = now, + contents = contents, + }) + :: continue :: + end + + insert_tab(log_group_list, { + logs = format_logs, + source = host_ip, + }) + local ok, err = self:send_cls_request(log_group_list_pb) + return ok, err, group_list_start +end + +return _M diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 9e42add2bcdf..bb13ba332676 100755 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -461,6 +461,7 @@ plugins: # plugin list (sorted by priority) - udp-logger # priority: 400 - file-logger # priority: 399 - clickhouse-logger # priority: 398 + - tencent-cloud-cls # priority: 397 #- log-rotate # priority: 100 # <- recommend to use priority (0, 100) for your custom plugins - example-plugin # priority: 0 diff --git a/t/admin/plugins.t b/t/admin/plugins.t index 6c89c2bb0858..4271392ae3e1 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -122,6 +122,7 @@ syslog udp-logger file-logger clickhouse-logger +tencent-cloud-cls example-plugin aws-lambda azure-functions diff --git a/t/plugin/tencent-cloud-cls.t b/t/plugin/tencent-cloud-cls.t new file mode 100644 index 000000000000..14006bbd7e9f --- /dev/null +++ b/t/plugin/tencent-cloud-cls.t @@ -0,0 +1,330 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('debug'); +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + my $http_config = $block->http_config // <<_EOC_; + server { + listen 10420; + location /structuredlog { + content_by_lua_block { + ngx.req.read_body() + local data = ngx.req.get_body_data() + local headers = ngx.req.get_headers() + ngx.log(ngx.WARN, "tencent-cloud-cls body: ", data) + for k, v in pairs(headers) do + ngx.log(ngx.WARN, "tencent-cloud-cls headers: " .. k .. ":" .. v) + end + ngx.say("ok") + } + } + } + server { + listen 10421; + location /structuredlog { + content_by_lua_block { + ngx.exit(500) + } + } + } +_EOC_ + + $block->set_value("http_config", $http_config); +}); + +run_tests; + +__DATA__ + +=== TEST 1: schema check +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.tencent-cloud-cls") + local ok, err = plugin.check_schema({ + cls_host = "ap-guangzhou.cls.tencentyun.com", + cls_topic = "143b5d70-139b-4aec-b54e-bb97756916de", + secret_id = "secret_id", + secret_key = "secret_key", + }) + if not ok then + ngx.say(err) + end + + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 2: cls config missing +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.tencent-cloud-cls") + local ok, err = plugin.check_schema({ + cls_host = "ap-guangzhou.cls.tencentyun.com", + cls_topic = "143b5d70-139b-4aec-b54e-bb97756916de", + secret_id = "secret_id", + }) + if not ok then + ngx.say(err) + end + + ngx.say("done") + } + } +--- response_body +property "secret_key" is required +done + + + +=== TEST 3: add plugin for incorrect server +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "tencent-cloud-cls": { + "cls_host": "127.0.0.1:10421", + "cls_topic": "143b5d70-139b-4aec-b54e-bb97756916de", + "secret_id": "secret_id", + "secret_key": "secret_key", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 4: incorrect server +--- request +GET /opentracing +--- response_body +opentracing +--- error_log +Batch Processor[tencent-cloud-cls] failed to process entries [1/1]: got wrong status: 500 +--- wait: 0.5 + + + +=== TEST 5: add plugin +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "tencent-cloud-cls": { + "cls_host": "127.0.0.1:10420", + "cls_topic": "143b5d70-139b-4aec-b54e-bb97756916de", + "secret_id": "secret_id", + "secret_key": "secret_key", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 6: access local server +--- request +GET /opentracing +--- response_body +opentracing +--- error_log +Batch Processor[tencent-cloud-cls] successfully processed the entries +--- wait: 0.5 + + + +=== TEST 7: verify request +--- extra_init_by_lua + local cls = require("apisix.plugins.tencent-cloud-cls.cls-sdk") + cls.send_to_cls = function(self, logs) + if (#logs ~= 1) then + ngx.log(ngx.ERR, "unexpected logs length: ", #logs) + return + end + return true + end +--- request +GET /opentracing +--- response_body +opentracing +--- error_log +Batch Processor[tencent-cloud-cls] successfully processed the entries +--- wait: 0.5 + + + +=== TEST 8: verify cls api request +--- extra_init_by_lua + local cls = require("apisix.plugins.tencent-cloud-cls.cls-sdk") + cls.send_cls_request = function(self, pb_obj) + if (#pb_obj.logGroupList ~= 1) then + ngx.log(ngx.ERR, "unexpected logGroupList length: ", #pb_obj.logGroupList) + return false + end + local log_group = pb_obj.logGroupList[1] + if #log_group.logs ~= 1 then + ngx.log(ngx.ERR, "unexpected logs length: ", #log_group.logs) + return false + end + local log = log_group.logs[1] + if #log.contents == 0 then + ngx.log(ngx.ERR, "unexpected contents length: ", #log.contents) + return false + end + return true + end +--- request +GET /opentracing +--- response_body +opentracing + + + +=== TEST 9: plugin metadata +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.tencent-cloud-cls") + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/tencent-cloud-cls', + ngx.HTTP_PUT, + [[{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + }]] + ) + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 10: log use log_format +--- extra_init_by_lua + local cls = require("apisix.plugins.tencent-cloud-cls.cls-sdk") + cls.send_cls_request = function(self, pb_obj) + if (#pb_obj.logGroupList ~= 1) then + ngx.log(ngx.ERR, "unexpected logGroupList length: ", #pb_obj.logGroupList) + return false + end + local log_group = pb_obj.logGroupList[1] + if #log_group.logs ~= 1 then + ngx.log(ngx.ERR, "unexpected logs length: ", #log_group.logs) + return false + end + local log = log_group.logs[1] + if #log.contents == 0 then + ngx.log(ngx.ERR, "unexpected contents length: ", #log.contents) + return false + end + local has_host, has_timestamp, has_client_ip = false, false, false + for i, tag in ipairs(log.contents) do + if tag.key == "host" then + has_host = true + end + if tag.key == "@timestamp" then + has_timestamp = true + end + if tag.key == "client_ip" then + has_client_ip = true + end + end + if not(has_host and has_timestamp and has_client_ip) then + return false + end + return true + end +--- request +GET /opentracing +--- response_body +opentracing +--- wait: 0.5