diff --git a/apisix/plugins/tencent-cloud-cls.lua b/apisix/plugins/tencent-cloud-cls.lua new file mode 100644 index 0000000000000..e63091f7dad31 --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls.lua @@ -0,0 +1,69 @@ +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local bp_manager_mod = require("apisix.utils.batch-processor-manager") +local cls_sdk = require("apisix.plugins.tencent-cloud-cls.cls-sdk") +local random = math.random +local ngx = ngx +math.randomseed(ngx.time() + ngx.worker.pid()) + +local plugin_name = "tencent-cloud-cls" +local batch_processor_manager = bp_manager_mod.new(plugin_name) +local schema = { + type = "object", + properties = { + cls_host = { type = "string" }, + cls_topic = { type = "string" }, + -- https://console.cloud.tencent.com/capi + secret_id = { type = "string" }, + secret_key = { type = "string" }, + sample_rate = { type = "integer", minimum = 1, maximum = 100, default = 100 }, + include_req_body = { type = "boolean", default = false }, + include_resp_body = { type = "boolean", default = false }, + }, + required = { "cls_host", "cls_topic", "secret_id", "secret_key" } +} + +local _M = { + version = 0.1, + priority = 413, + name = plugin_name, + schema = batch_processor_manager:wrap_schema(schema), +} + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + +function _M.body_filter(conf, ctx) + -- sample if set + if conf.sample_rate < 100 and random(1, 100) > conf.sample_rate then + core.log.debug("not sampled") + return + end + log_util.collect_body(conf, ctx) + ctx.cls_sample = true +end + +function _M.log(conf, ctx) + -- sample if set + if ctx.cls_sample == nil then + core.log.debug("not sampled") + return + end + local entry = log_util.get_full_log(ngx, conf) + if not entry.route_id then + entry.route_id = "no-matched" + end + + if batch_processor_manager:add_entry(conf, entry) then + return + end + + local process = function(entries) + return cls_sdk.send_to_cls(conf.secret_id, conf.secret_key, conf.cls_host, conf.cls_topic, entries) + end + + batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process) +end + +return _M diff --git a/apisix/plugins/tencent-cloud-cls/cls-sdk.lua b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua new file mode 100644 index 0000000000000..7b65fe037e4c2 --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua @@ -0,0 +1,193 @@ +-- https://cloud.tencent.com/document/api/614/16873 +local pb = require "pb" +assert(pb.loadfile("apisix/plugins/tencent-cloud-cls/cls.pb")) +local http = require("resty.http") +local socket = require("socket") +local str_util = require("resty.string") +local core = require("apisix.core") +local json = core.json +local json_encode = json.encode + +local ngx = ngx +local ngx_time = ngx.time +local ngx_now = ngx.now +local ngx_sha1_bin = ngx.sha1_bin +local ngx_hmac_sha1 = ngx.hmac_sha1 + +local fmt = string.format +local concat_tab = table.concat +local clear_tab = table.clear +local new_tab = table.new +local insert_tab = table.insert + +local MAX_SINGLE_VALUE_SIZE = 1 * 1024 * 1024 +local MAX_LOG_GROUP_VALUE_SIZE = 5 * 1024 * 1024 -- 5MB + +local cls_api_path = "/structuredlog" +local auth_expire_time = 60 +local cls_conn_timeout = 1000 +local cls_read_timeout = 10000 +local cls_send_timeout = 10000 + +local headers_cache = {} +local params_cache = { + ssl_verify = false, + headers = headers_cache, +} + +local function get_ip(hostname) + local _, resolved = socket.dns.toip(hostname) + local ListTab = {} + for _, v in ipairs(resolved.ip) do + table.insert(ListTab, v) + end + return ListTab +end + +local host_ip = tostring(unpack(get_ip(socket.dns.gethostname()))) +local log_group_list = {} +local log_group_list_pb = { + logGroupList = log_group_list, +} + +local function sha1(msg) + return str_util.to_hex(ngx_sha1_bin(msg)) +end + +local function sha1_hmac(key, msg) + return str_util.to_hex(ngx_hmac_sha1(key, msg)) +end + +-- sign algorithm https://cloud.tencent.com/document/product/614/12445 +local function sign(secret_id, secret_key) + local method = "post" + local format_params = "" + local format_headers = "" + local sign_algorithm = "sha1" + local http_request_info = fmt("%s\n%s\n%s\n%s\n", method, cls_api_path, format_params, format_headers) + local cur_time = ngx_time() + local sign_time = fmt("%d;%d", cur_time, cur_time + auth_expire_time) + local string_to_sign = fmt("%s\n%s\n%s\n", sign_algorithm, sign_time, sha1(http_request_info)) + + local sign_key = sha1_hmac(secret_key, sign_time) + local signature = sha1_hmac(sign_key, string_to_sign) + + local arr = { + "q-sign-algorithm=sha1", + "q-ak=" .. secret_id, + "q-sign-time=" .. sign_time, + "q-key-time=" .. sign_time, + "q-header-list=", + "q-url-param-list=", + "q-signature=" .. signature, + } + + return concat_tab(arr, '&') +end + +local function send_cls_request(host, topic, secret_id, secret_key, pb_data) + local http_new = http:new() + http_new:set_timeouts(cls_conn_timeout, cls_send_timeout, cls_read_timeout) + + clear_tab(headers_cache) + headers_cache["Host"] = host + headers_cache["Content-Type"] = "application/x-protobuf" + headers_cache["Authorization"] = sign(secret_id, secret_key, cls_api_path) + + -- TODO: support lz4/zstd compress + params_cache.method = "POST" + params_cache.body = pb_data + + local cls_url = "http://" .. host .. cls_api_path .. "?topic_id=" .. topic + core.log.debug("CLS request URL: ", cls_url) + + local res, err = http_new:request_uri(cls_url, params_cache) + if not res then + return false, err + end + + if res.status ~= 200 then + err = fmt("got wrong status: %s, headers: %s, body, %s", res.status, json.encode(res.headers), res.body) + -- 413, 404, 401, 403 are not retryable + if res.status == 413 or res.status == 404 or res.status == 401 or res.status == 403 then + core.log.err(err, ", not retryable") + return true + end + + return false, err + end + + core.log.debug("CLS report success") + return true +end + +-- normalized log data for CLS API +local function normalize_log(log) + local normalized_log = {} + local log_size = 4 -- empty obj alignment + for k, v in pairs(log) do + local v_type = type(v) + local field = { key = k, value = "" } + if v_type == "string" then + field["value"] = v + elseif v_type == "number" then + field["value"] = tostring(v) + elseif v_type == "table" then + field["value"] = json_encode(v) + else + field["value"] = tostring(v) + core.log.warn("unexpected type " .. v_type .. " for field " .. k) + end + if #field.value > MAX_SINGLE_VALUE_SIZE then + core.log.warn(field.key, " value size over ", MAX_SINGLE_VALUE_SIZE, " , truncated") + field.value = field.value:sub(1, MAX_SINGLE_VALUE_SIZE) + end + insert_tab(normalized_log, field) + log_size = log_size + #field.key + #field.value + end + return normalized_log, log_size +end + +local function send_to_cls(secret_id, secret_key, host, topic_id, logs) + clear_tab(log_group_list) + local now = ngx_now() * 1000 + + local total_size = 0 + local format_logs = new_tab(#logs, 0) + -- sums of all value in a LogGroup should be no more than 5MB + for i = 1, #logs, 1 do + local contents, log_size = normalize_log(logs[i]) + if log_size > MAX_LOG_GROUP_VALUE_SIZE then + core.log.error("size of log is over 5MB, dropped") + goto continue + end + total_size = total_size + log_size + if total_size > MAX_LOG_GROUP_VALUE_SIZE then + insert_tab(log_group_list, { + logs = format_logs, + source = host_ip, + }) + format_logs = new_tab(#logs - i, 0) + total_size = 0 + local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb)) + send_cls_request(host, topic_id, secret_id, secret_key, data) + clear_tab(log_group_list) + end + insert_tab(format_logs, { + time = now, + contents = contents, + }) + :: continue :: + end + + insert_tab(log_group_list, { + logs = format_logs, + source = host_ip, + }) + local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb)) + return send_cls_request(host, topic_id, secret_id, secret_key, data) +end + +return { + send_to_cls = send_to_cls +} diff --git a/apisix/plugins/tencent-cloud-cls/cls.pb b/apisix/plugins/tencent-cloud-cls/cls.pb new file mode 100644 index 0000000000000..41c84cb31e357 --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls/cls.pb @@ -0,0 +1,20 @@ + +­ + cls.protocls"~ +Log +time (B0Rtime, +contents ( 2.cls.Log.ContentRcontents1 +Content +key ( Rkey +value ( Rvalue"0 +LogTag +key ( Rkey +value ( Rvalue"¥ +LogGroup +logs ( 2.cls.LogRlogs + contextFlow ( R contextFlow +filename ( Rfilename +source ( Rsource% +logTags ( 2 .cls.LogTagRlogTags"A + LogGroupList1 + logGroupList ( 2 .cls.LogGroupR logGroupList \ No newline at end of file diff --git a/apisix/plugins/tencent-cloud-cls/cls.proto b/apisix/plugins/tencent-cloud-cls/cls.proto new file mode 100644 index 0000000000000..ffa3f6bc24d0d --- /dev/null +++ b/apisix/plugins/tencent-cloud-cls/cls.proto @@ -0,0 +1,34 @@ +syntax = "proto2"; + +package cls; + +message Log +{ + required int64 time = 1 [jstype = JS_STRING]; + message Content + { + required string key = 1; + required string value = 2; + } + repeated Content contents = 2; +} + +message LogTag +{ + required string key = 1; + required string value = 2; +} + +message LogGroup +{ + repeated Log logs = 1; + optional string contextFlow = 2; + optional string filename = 3; + optional string source = 4; + repeated LogTag logTags = 5; +} + +message LogGroupList +{ + repeated LogGroup logGroupList = 1; +} diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index bb0b39e294e6b..398ea87d839a9 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -77,7 +77,7 @@ dependencies = { "net-url = 0.9-1", "xml2lua = 1.5-2", "nanoid = 0.1-1", - "lua-resty-mediador = 0.1.2-1" + "lua-resty-mediador = 0.1.2-1", } build = {