Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batchprocessor): support partial consumption of batch entries #6203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 112 additions & 95 deletions apisix/plugins/datadog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ local ngx = ngx
local udp = ngx.socket.udp
local format = string.format
local concat = table.concat
local ipairs = ipairs
local tostring = tostring

local plugin_name = "datadog"
Expand Down Expand Up @@ -64,13 +63,15 @@ local _M = {
metadata_schema = metadata_schema,
}


function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end


local function generate_tag(entry, const_tags)
local tags
if const_tags and #const_tags > 0 then
Expand Down Expand Up @@ -108,127 +109,143 @@ local function generate_tag(entry, const_tags)
end


function _M.log(conf, ctx)
local entry = fetch_log(ngx, {})
entry.balancer_ip = ctx.balancer_ip or ""
entry.scheme = ctx.upstream_scheme or ""
local function send_metric_over_udp(entry, metadata)
local err_msg
local sock = udp()
local host, port = metadata.value.host, metadata.value.port

-- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
if conf.prefer_name then
if entry.service_id and entry.service_id ~= "" then
local svc = service_fetch(entry.service_id)
local ok, err = sock:setpeername(host, port)
if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end

if svc and svc.value.name ~= "" then
entry.service_id = svc.value.name
end
end
-- Generate prefix & suffix according dogstatsd udp data format.
local suffix = generate_tag(entry, metadata.value.constant_tags)
local prefix = metadata.value.namespace
if prefix ~= "" then
prefix = prefix .. "."
end

if ctx.route_name and ctx.route_name ~= "" then
entry.route_id = ctx.route_name
end
-- request counter
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.counter", 1, "c", suffix))
if not ok then
err_msg = "error sending request.counter: " .. err
core.log.error("failed to report request count to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

if batch_processor_manager:add_entry(conf, entry) then
return
-- request latency histogram
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.latency",
entry.latency, "h", suffix))
if not ok then
err_msg = "error sending request.latency: " .. err
core.log.error("failed to report request latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
-- Fetching metadata details
local metadata = plugin.plugin_metadata(plugin_name)
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
-- upstream latency
if entry.upstream_latency then
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "upstream.latency",
entry.upstream_latency, "h", suffix))
if not ok then
err_msg = "error sending upstream.latency: " .. err
core.log.error("failed to report upstream latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
end

-- Creating a udp socket
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
core.log.info("sending batch metrics to dogstatsd: ", host, ":", port)
-- apisix_latency
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "apisix.latency",
entry.apisix_latency, "h", suffix))
if not ok then
err_msg = "error sending apisix.latency: " .. err
core.log.error("failed to report apisix latency to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

local ok, err = sock:setpeername(host, port)
-- request body size timer
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "ingress.size",
entry.request.size, "ms", suffix))
if not ok then
err_msg = "error sending ingress.size: " .. err
core.log.error("failed to report req body size to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

if not ok then
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end
-- response body size timer
ok, err = sock:send(format("%s:%s|%s%s", prefix .. "egress.size",
entry.response.size, "ms", suffix))
if not ok then
err_msg = "error sending egress.size: " .. err
core.log.error("failed to report response body size to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- Generate prefix & suffix according dogstatsd udp data format.
local prefix = metadata.value.namespace
if prefix ~= "" then
prefix = prefix .. "."
end
ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end

core.log.info("datadog batch_entry: ", core.json.delay_encode(entries, true))
for _, entry in ipairs(entries) do
local suffix = generate_tag(entry, metadata.value.constant_tags)
if not err_msg then
return true
end

-- request counter
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"request.counter", 1, "c", suffix))
if not ok then
core.log.error("failed to report request count to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end
return false, err_msg
end


-- request latency histogram
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"request.latency", entry.latency, "h", suffix))
if not ok then
core.log.error("failed to report request latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
local function push_metrics(entries)
-- Fetching metadata details
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))

-- upstream latency
if entry.upstream_latency then
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"upstream.latency", entry.upstream_latency, "h", suffix))
if not ok then
core.log.error("failed to report upstream latency to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
end
end
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
core.log.info("sending batch metrics to dogstatsd: ", metadata.value.host,
":", metadata.value.port)

-- apisix_latency
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"apisix.latency", entry.apisix_latency, "h", suffix))
if not ok then
core.log.error("failed to report apisix latency to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end
for i = 1, #entries do
local ok, err = send_metric_over_udp(entries[i], metadata)
if not ok then
return false, err, i
end
end

return true
end

-- request body size timer
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"ingress.size", entry.request.size, "ms", suffix))
if not ok then
core.log.error("failed to report req body size to dogstatsd server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err)
end

-- response body size timer
local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
"egress.size", entry.response.size, "ms", suffix))
if not ok then
core.log.error("failed to report response body size to dogstatsd server: host["
.. host .. "] port[" .. tostring(port) .. "] err: " .. err)
function _M.log(conf, ctx)
local entry = fetch_log(ngx, {})
entry.balancer_ip = ctx.balancer_ip or ""
entry.scheme = ctx.upstream_scheme or ""

-- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
if conf.prefer_name then
if entry.service_id and entry.service_id ~= "" then
local svc = service_fetch(entry.service_id)

if svc and svc.value.name ~= "" then
entry.service_id = svc.value.name
end
end

-- Releasing the UDP socket descriptor
ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
if ctx.route_name and ctx.route_name ~= "" then
entry.route_id = ctx.route_name
end
end

-- Returning at the end and ensuring the resource has been released.
return true
if batch_processor_manager:add_entry(conf, entry) then
return
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, push_metrics)
end

return _M
2 changes: 1 addition & 1 deletion apisix/plugins/loggly.lua
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ local function handle_log(entries)
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i], metadata)
if not ok then
return false, err
return false, err, i
end
end
else
Expand Down
29 changes: 25 additions & 4 deletions apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,38 @@ local function set_metrics(self, count)
end


local function slice_batch(batch, n)
local slice = {}
local idx = 1
for i = n or 1, #batch do
slice[idx] = batch[i]
idx = idx + 1
end
return slice
end


function execute_func(premature, self, batch)
if premature then
return
end

local ok, err = self.func(batch.entries, self.batch_max_size)
-- In case of "err" and a valid "first_fail" batch processor considers, all first_fail-1
-- entries have been successfully consumed and hence reschedule the job for entries with
-- index first_fail to #entries based on the current retry policy.
local ok, err, first_fail = self.func(batch.entries, self.batch_max_size)
if not ok then
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
if first_fail then
core.log.error("Batch Processor[", self.name, "] failed to process entries [",
#batch.entries + 1 - first_fail, "/", #batch.entries ,"]: ", err)
batch.entries = slice_batch(batch.entries, first_fail)
else
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
end

batch.retry_count = batch.retry_count + 1
if batch.retry_count <= self.max_retry_count then
if batch.retry_count <= self.max_retry_count and #batch.entries > 0 then
schedule_func_exec(self, self.retry_delay,
batch)
else
Expand Down
7 changes: 5 additions & 2 deletions docs/en/latest/batch-processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ function _M.log(conf, ctx)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
-- return false, err_msg if failed
-- return false, err_msg, first_fail if failed
-- first_fail(optional) indicates first_fail-1 entries have been successfully processed
-- and during processing of entries[first_fail], the error occurred. So the batch processor
-- only retries for the entries having index >= first_fail as per the retry policy.
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
Expand Down Expand Up @@ -120,7 +123,7 @@ local err
local func = function(entries)
...
return true
-- return false, err_msg if failed
-- return false, err_msg, first_fail if failed
end
log_buffer, err = batch_processor:new(func, config_bat)

Expand Down
7 changes: 5 additions & 2 deletions docs/zh/latest/batch-processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ function _M.log(conf, ctx)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
-- return false, err_msg if failed
-- return false, err_msg, first_fail if failed
-- first_fail(optional) indicates first_fail-1 entries have been successfully processed
-- and during processing of entries[first_fail], the error occurred. So the batch processor
-- only retries for the entries having index >= first_fail as per the retry policy.
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
Expand Down Expand Up @@ -118,7 +121,7 @@ local err
local func = function(entries)
...
return true
-- return false, err_msg if failed
-- return false, err_msg, first_fail if failed
end
log_buffer, err = batch_processor:new(func, config_bat)

Expand Down
Loading