From 7b9e2d764d5d925d2fd1250ebd669bc7e068e573 Mon Sep 17 00:00:00 2001 From: xinchenyuan Date: Tue, 9 Aug 2022 20:45:00 +0800 Subject: [PATCH] refa error & proto file load --- apisix/plugins/tencent-cloud-cls.lua | 9 +- apisix/plugins/tencent-cloud-cls/cls-sdk.lua | 109 +++++++++++++++++-- apisix/plugins/tencent-cloud-cls/cls.pb | 20 ---- apisix/plugins/tencent-cloud-cls/cls.proto | 50 --------- 4 files changed, 104 insertions(+), 84 deletions(-) delete mode 100644 apisix/plugins/tencent-cloud-cls/cls.pb delete mode 100644 apisix/plugins/tencent-cloud-cls/cls.proto diff --git a/apisix/plugins/tencent-cloud-cls.lua b/apisix/plugins/tencent-cloud-cls.lua index 2a6190b49e4fa..5c624cca3b444 100644 --- a/apisix/plugins/tencent-cloud-cls.lua +++ b/apisix/plugins/tencent-cloud-cls.lua @@ -19,6 +19,7 @@ local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local bp_manager_mod = require("apisix.utils.batch-processor-manager") local cls_sdk = require("apisix.plugins.tencent-cloud-cls.cls-sdk") +local plugin = require("apisix.plugin") local math = math local ngx = ngx local pairs = pairs @@ -127,8 +128,12 @@ function _M.log(conf, ctx) end local process = function(entries) - return cls_sdk.send_to_cls(conf.secret_id, conf.secret_key, - conf.cls_host, conf.cls_topic, entries) + local sdk, err = cls_sdk.new(conf.cls_host, conf.cls_topic, conf.secret_id, conf.secret_key) + if err then + core.log.error("init sdk failed err:", err) + return false, err + end + return sdk:send_to_cls(entries) end batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process) diff --git a/apisix/plugins/tencent-cloud-cls/cls-sdk.lua b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua index 2a53abcefde6e..62d17b083c42b 100644 --- a/apisix/plugins/tencent-cloud-cls/cls-sdk.lua +++ b/apisix/plugins/tencent-cloud-cls/cls-sdk.lua @@ -16,8 +16,7 @@ -- local pb = require "pb" -local assert = assert -assert(pb.loadfile("apisix/plugins/tencent-cloud-cls/cls.pb")) +local protoc = require("protoc").new() local http = require("resty.http") local socket = require("socket") local str_util = require("resty.string") @@ -40,6 +39,8 @@ local ipairs = ipairs local pairs = pairs local type = type local tostring = tostring +local setmetatable = setmetatable +local pcall = pcall local MAX_SINGLE_VALUE_SIZE = 1 * 1024 * 1024 local MAX_LOG_GROUP_VALUE_SIZE = 5 * 1024 * 1024 -- 5MB @@ -112,7 +113,13 @@ local function sign(secret_id, secret_key) end -local function send_cls_request(host, topic, secret_id, secret_key, pb_data) +local function send_cls_request(host, topic, secret_id, secret_key, pb_obj) + local ok, pb_data = pcall(pb.encode, "cls.LogGroupList", pb_obj) + if not ok or not pb_data then + core.log.error("failed to encode LogGroupList, err: ", pb_data) + return false, pb_data + end + local http_new = http:new() http_new:set_timeouts(cls_conn_timeout, cls_send_timeout, cls_read_timeout) @@ -178,13 +185,88 @@ local function normalize_log(log) end -local function send_to_cls(secret_id, secret_key, host, topic_id, logs) +local _M = { version = 0.1 } +local mt = { __index = _M } + +local pb_state +local function init_pb_state() + pb.state(nil) + protoc.reload() + local cls_sdk_protoc = protoc.new() + if not cls_sdk_protoc.loaded["tencent-cloud-cls/cls.proto"] then + -- https://www.tencentcloud.com/document/product/614/42787 + local ok, err = pcall(cls_sdk_protoc.load, cls_sdk_protoc, [[ +package cls; + +message Log +{ + message Content + { + required string key = 1; // Key of each field group + required string value = 2; // Value of each field group + } + required int64 time = 1; // Unix timestamp + repeated Content contents = 2; // Multiple key-value pairs in one log +} + +message LogTag +{ + required string key = 1; + required string value = 2; +} + +message LogGroup +{ + repeated Log logs = 1; // Log array consisting of multiple logs + optional string contextFlow = 2; // This parameter does not take effect currently + optional string filename = 3; // Log filename + optional string source = 4; // Log source, which is generally the machine IP + repeated LogTag logTags = 5; +} + +message LogGroupList +{ + repeated LogGroup logGroupList = 1; // Log group list +} + ]], "tencent-cloud-cls/cls.proto") + if not ok then + cls_sdk_protoc:reset() + return "failed to load cls.proto: ".. err + end + end + pb_state = pb.state(nil) +end + + +function _M.new(host, topic, secret_id, secret_key) + if not pb_state then + local err = init_pb_state() + if err then + return nil, err + end + end + local self = { + host = host, + topic = topic, + secret_id = secret_id, + secret_key = secret_key, + } + return setmetatable(self, mt) +end + + +function _M.send_to_cls(self, logs) clear_tab(log_group_list) local now = ngx_now() * 1000 + -- recovery of stored pb_store + pb.state(pb_state) + local total_size = 0 local format_logs = new_tab(#logs, 0) - -- sums of all value in a LogGroup should be no more than 5MB + -- sums of all value in all LogGroup should be no more than 5MB + -- so send whenever size exceed max size + local group_list_start = 1 for i = 1, #logs, 1 do local contents, log_size = normalize_log(logs[i]) if log_size > MAX_LOG_GROUP_VALUE_SIZE then @@ -197,10 +279,14 @@ local function send_to_cls(secret_id, secret_key, host, topic_id, logs) logs = format_logs, source = host_ip, }) + local ok, err = send_cls_request(self.host, self.topic, + self.secret_id, self.secret_key, log_group_list_pb) + if not ok then + return false, err, group_list_start + end + group_list_start = i format_logs = new_tab(#logs - i, 0) total_size = 0 - local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb)) - send_cls_request(host, topic_id, secret_id, secret_key, data) clear_tab(log_group_list) end insert_tab(format_logs, { @@ -214,10 +300,9 @@ local function send_to_cls(secret_id, secret_key, host, topic_id, logs) logs = format_logs, source = host_ip, }) - local data = assert(pb.encode("cls.LogGroupList", log_group_list_pb)) - return send_cls_request(host, topic_id, secret_id, secret_key, data) + local ok, err = send_cls_request(self.host, self.topic, self.secret_id, + self.secret_key, log_group_list_pb) + return ok, err, group_list_start end -return { - send_to_cls = send_to_cls -} +return _M diff --git a/apisix/plugins/tencent-cloud-cls/cls.pb b/apisix/plugins/tencent-cloud-cls/cls.pb deleted file mode 100644 index 41c84cb31e357..0000000000000 --- a/apisix/plugins/tencent-cloud-cls/cls.pb +++ /dev/null @@ -1,20 +0,0 @@ - - - cls.protocls"~ -Log -time (B0Rtime, -contents ( 2.cls.Log.ContentRcontents1 -Content -key ( Rkey -value ( Rvalue"0 -LogTag -key ( Rkey -value ( Rvalue" -LogGroup -logs ( 2.cls.LogRlogs - contextFlow ( R contextFlow -filename ( Rfilename -source ( Rsource% -logTags ( 2 .cls.LogTagRlogTags"A - LogGroupList1 - logGroupList ( 2 .cls.LogGroupR logGroupList \ No newline at end of file diff --git a/apisix/plugins/tencent-cloud-cls/cls.proto b/apisix/plugins/tencent-cloud-cls/cls.proto deleted file mode 100644 index 0029be11ba585..0000000000000 --- a/apisix/plugins/tencent-cloud-cls/cls.proto +++ /dev/null @@ -1,50 +0,0 @@ -// -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// https://cloud.tencent.com/document/product/614/59470 -package cls; - -message Log -{ - message Content - { - required string key = 1; // 每组字段的 key - required string value = 2; // 每组字段的 value - } - required int64 time = 1; // 时间戳,UNIX时间格式 - repeated Content contents = 2; // 一条日志里的多个kv组合 -} - -message LogTag -{ - required string key = 1; - required string value = 2; -} - -message LogGroup -{ - repeated Log logs = 1; // 多条日志合成的日志数组 - optional string contextFlow = 2; // 目前暂无效用 - optional string filename = 3; // 日志文件名 - optional string source = 4; // 日志来源,一般使用机器IP - repeated LogTag logTags = 5; -} - -message LogGroupList -{ - repeated LogGroup logGroupList = 1; // 日志组列表 -}