From 92e5f1ecb596dcb0a6ddd31adf5eaa3f48d98230 Mon Sep 17 00:00:00 2001 From: Akayeshmantha Date: Wed, 6 May 2020 11:36:40 +0200 Subject: [PATCH 1/4] buffer log requests. --- apisix/plugins/syslog.lua | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index bbd53158b34b..7028b749f5c6 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -29,10 +29,12 @@ local schema = { drop_limit = {type = "integer", default = 1048576}, timeout = {type = "integer", minimum = 1, default = 3}, sock_type = {type = "string", default = "tcp"}, - max_retry_times = {type = "integer", minimum = 1, default = 3}, - retry_interval = {type = "integer", minimum = 10, default = 100}, + max_retry_times = {type = "integer", minimum = 1, default = 1}, + retry_interval = {type = "integer", minimum = 0, default = 1}, pool_size = {type = "integer", minimum = 5, default = 5}, tls = {type = "boolean", default = false}, + batch_max_size = {type = "integer", minimum = 1, default = 1000}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, }, required = {"host", "port"} } @@ -68,6 +70,13 @@ function _M.log(conf) return end + local log_buffer = buffers[entry.route_id] + + if log_buffer then + log_buffer:push(entry) + return + end + -- fetch api_ctx local api_ctx = ngx.ctx.api_ctx if not api_ctx then @@ -75,6 +84,22 @@ function _M.log(conf) return core.response.exit(500) end + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data, err = core.json.encode(entries[1]) -- encode as single {} + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + return send_kafka_data(conf, data) + end + -- fetch it from lrucache local logger, err = lrucache(api_ctx.conf_type .. "#" .. api_ctx.conf_id, api_ctx.conf_version, logger_socket.new, logger_socket, { From da3d53955da11beeadffcecb39aaac2341f5da7c Mon Sep 17 00:00:00 2001 From: Akayeshmantha Date: Wed, 6 May 2020 12:01:04 +0200 Subject: [PATCH 2/4] finalize batch request. --- apisix/plugins/syslog.lua | 94 +++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 29 deletions(-) diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index 7028b749f5c6..d0c3a1d0c9ff 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -16,15 +16,21 @@ -- local core = require("apisix.core") local log_util = require("apisix.utils.log-util") +local batch_processor = require("apisix.utils.batch-processor") local logger_socket = require("resty.logger.socket") +local pairs = pairs +local type = type +local table = table local plugin_name = "syslog" local ngx = ngx +local buffers = {} local schema = { type = "object", properties = { host = {type = "string"}, port = {type = "integer"}, + name = {type = "string", default = "sys logger"}, flush_limit = {type = "integer", minimum = 1, default = 4096}, drop_limit = {type = "integer", default = 1048576}, timeout = {type = "integer", minimum = 1, default = 3}, @@ -61,6 +67,47 @@ function _M.flush_syslog(logger) end end +local function send_syslog_data(conf, log_message) + local err_msg + local res = true + + -- fetch api_ctx + local api_ctx = ngx.ctx.api_ctx + if not api_ctx then + core.log.error("invalid api_ctx cannot proceed with sys logger plugin") + return core.response.exit(500) + end + + -- fetch it from lrucache + local logger, err = lrucache(api_ctx.conf_type .. "#" .. api_ctx.conf_id, api_ctx.conf_version, + logger_socket.new, logger_socket, { + host = conf.host, + port = conf.port, + flush_limit = conf.flush_limit, + drop_limit = conf.drop_limit, + timeout = conf.timeout, + sock_type = conf.sock_type, + max_retry_times = conf.max_retry_times, + retry_interval = conf.retry_interval, + pool_size = conf.pool_size, + tls = conf.tls, + }) + + if not logger then + res = false + err_msg = "failed when initiating the sys logger processor".. err + end + + -- reuse the logger object + local ok, err = logger:log(core.json.encode(entry)) + if not ok then + res = false + err_msg = "failed to log message" .. err + end + + return res, err_msg +end + -- log phase in APISIX function _M.log(conf) local entry = log_util.get_full_log(ngx) @@ -77,13 +124,6 @@ function _M.log(conf) return end - -- fetch api_ctx - local api_ctx = ngx.ctx.api_ctx - if not api_ctx then - core.log.error("invalid api_ctx cannot proceed with sys logger plugin") - return core.response.exit(500) - end - -- Generate a function to be executed by the batch processor local func = function(entries, batch_max_size) local data, err @@ -97,33 +137,29 @@ function _M.log(conf) return false, 'error occurred while encoding the data: ' .. err end - return send_kafka_data(conf, data) + return send_syslog_data(conf, data) end - -- fetch it from lrucache - local logger, err = lrucache(api_ctx.conf_type .. "#" .. api_ctx.conf_id, api_ctx.conf_version, - logger_socket.new, logger_socket, { - host = conf.host, - port = conf.port, - flush_limit = conf.flush_limit, - drop_limit = conf.drop_limit, - timeout = conf.timeout, - sock_type = conf.sock_type, - max_retry_times = conf.max_retry_times, - retry_interval = conf.retry_interval, - pool_size = conf.pool_size, - tls = conf.tls, - }) + local config = { + name = conf.name, + retry_delay = conf.retry_interval, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_times, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.timeout, + } - if not logger then - core.log.error("failed when initiating the sys logger processor", err) - end + local err + log_buffer, err = batch_processor:new(func, config) - -- reuse the logger object - local ok, err = logger:log(core.json.encode(entry)) - if not ok then - core.log.error("failed to log message", err) + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return end + + buffers[entry.route_id] = log_buffer + log_buffer:push(entry) + end return _M From 275570e8a8f16a446e04205bfa9c505cf081741b Mon Sep 17 00:00:00 2001 From: Akayeshmantha Date: Wed, 6 May 2020 12:05:12 +0200 Subject: [PATCH 3/4] finalize batch request. --- apisix/plugins/syslog.lua | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index d0c3a1d0c9ff..4d1bae78a8ab 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -80,18 +80,18 @@ local function send_syslog_data(conf, log_message) -- fetch it from lrucache local logger, err = lrucache(api_ctx.conf_type .. "#" .. api_ctx.conf_id, api_ctx.conf_version, - logger_socket.new, logger_socket, { - host = conf.host, - port = conf.port, - flush_limit = conf.flush_limit, - drop_limit = conf.drop_limit, - timeout = conf.timeout, - sock_type = conf.sock_type, - max_retry_times = conf.max_retry_times, - retry_interval = conf.retry_interval, - pool_size = conf.pool_size, - tls = conf.tls, - }) + logger_socket.new, logger_socket, { + host = conf.host, + port = conf.port, + flush_limit = conf.flush_limit, + drop_limit = conf.drop_limit, + timeout = conf.timeout, + sock_type = conf.sock_type, + max_retry_times = conf.max_retry_times, + retry_interval = conf.retry_interval, + pool_size = conf.pool_size, + tls = conf.tls, + }) if not logger then res = false From 9fc1ba001b944d31f930d3b4d9733e613304130b Mon Sep 17 00:00:00 2001 From: Akayeshmantha Date: Thu, 7 May 2020 11:44:08 +0200 Subject: [PATCH 4/4] fix the linting issues. --- apisix/plugins/syslog.lua | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua index 4d1bae78a8ab..270b719b1823 100644 --- a/apisix/plugins/syslog.lua +++ b/apisix/plugins/syslog.lua @@ -18,9 +18,6 @@ local core = require("apisix.core") local log_util = require("apisix.utils.log-util") local batch_processor = require("apisix.utils.batch-processor") local logger_socket = require("resty.logger.socket") -local pairs = pairs -local type = type -local table = table local plugin_name = "syslog" local ngx = ngx local buffers = {} @@ -99,7 +96,7 @@ local function send_syslog_data(conf, log_message) end -- reuse the logger object - local ok, err = logger:log(core.json.encode(entry)) + local ok, err = logger:log(core.json.encode(log_message)) if not ok then res = false err_msg = "failed to log message" .. err