Skip to content

Commit

Permalink
optimize: use buffer for plugin syslog. (#1551)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayeshmantha Perera authored May 8, 2020
1 parent a328fcc commit fa59f20
Showing 1 changed file with 71 additions and 13 deletions.
84 changes: 71 additions & 13 deletions apisix/plugins/syslog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local batch_processor = require("apisix.utils.batch-processor")
local logger_socket = require("resty.logger.socket")
local plugin_name = "syslog"
local ngx = ngx
local buffers = {}

local schema = {
type = "object",
properties = {
host = {type = "string"},
port = {type = "integer"},
name = {type = "string", default = "sys logger"},
flush_limit = {type = "integer", minimum = 1, default = 4096},
drop_limit = {type = "integer", default = 1048576},
timeout = {type = "integer", minimum = 1, default = 3},
sock_type = {type = "string", default = "tcp"},
max_retry_times = {type = "integer", minimum = 1, default = 3},
retry_interval = {type = "integer", minimum = 10, default = 100},
max_retry_times = {type = "integer", minimum = 1, default = 1},
retry_interval = {type = "integer", minimum = 0, default = 1},
pool_size = {type = "integer", minimum = 5, default = 5},
tls = {type = "boolean", default = false},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
buffer_duration = {type = "integer", minimum = 1, default = 60},
},
required = {"host", "port"}
}
Expand All @@ -59,14 +64,9 @@ function _M.flush_syslog(logger)
end
end

-- log phase in APISIX
function _M.log(conf)
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for sys logger")
return
end
local function send_syslog_data(conf, log_message)
local err_msg
local res = true

-- fetch api_ctx
local api_ctx = ngx.ctx.api_ctx
Expand All @@ -91,14 +91,72 @@ function _M.log(conf)
})

if not logger then
core.log.error("failed when initiating the sys logger processor", err)
res = false
err_msg = "failed when initiating the sys logger processor".. err
end

-- reuse the logger object
local ok, err = logger:log(core.json.encode(entry))
local ok, err = logger:log(core.json.encode(log_message))
if not ok then
core.log.error("failed to log message", err)
res = false
err_msg = "failed to log message" .. err
end

return res, err_msg
end

-- log phase in APISIX
function _M.log(conf)
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for sys logger")
return
end

local log_buffer = buffers[entry.route_id]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_syslog_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_interval,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_times,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.timeout,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[entry.route_id] = log_buffer
log_buffer:push(entry)

end

return _M

0 comments on commit fa59f20

Please sign in to comment.