Skip to content

Commit

Permalink
Revert "fix(*): prevent queues from growing without bounds (#10046) (#…
Browse files Browse the repository at this point in the history
…10254)"

This reverts commit 1a869fc.
  • Loading branch information
AndyZhang0707 committed Sep 18, 2023
1 parent 92b946c commit 9b41c75
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 105 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@

- **HTTP Log**: fix internal error during validating the schema if http_endpoint contains
userinfo but headers is empty [#9574](https://github.com/Kong/kong/pull/9574)
- Update the batch queues module so that queues no longer grow without bounds if
their consumers fail to process the entries. Instead, old batches are now dropped
and an error is logged.
[#10247](https://github.com/Kong/kong/pull/10247)

##### CLI

Expand Down
11 changes: 0 additions & 11 deletions kong.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -1544,14 +1544,3 @@
# **Warning**: Certain variables, when made
# available, may create opportunities to
# escape the sandbox.

#max_queued_batches = 100 # Maximum number of batches to keep on an internal
# plugin queue before dropping old batches. This is
# meant as a global, last-resort control to prevent
# queues from consuming infinite memory. When batches
# are being dropped, an error message
# "exceeded max_queued_batches (%d), dropping oldest"
# will be logged. The error message will also include
# a string that identifies the plugin causing the
# problem. Queues are used by the http-log, statsd,
# opentelemetry and datadog plugins.
2 changes: 0 additions & 2 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,6 @@ local CONF_INFERENCES = {
untrusted_lua = { enum = { "on", "off", "sandbox" } },
untrusted_lua_sandbox_requires = { typ = "array" },
untrusted_lua_sandbox_environment = { typ = "array" },

max_queued_batches = { typ = "number" },
}


Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/http-log/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf)
}

local err
q, err = BatchQueue.new("http-log", process, opts)
q, err = BatchQueue.new(process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
2 changes: 0 additions & 2 deletions kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,4 @@ pluginserver_names = NONE
untrusted_lua = sandbox
untrusted_lua_sandbox_requires =
untrusted_lua_sandbox_environment =
max_queued_batches = 100
]]
89 changes: 34 additions & 55 deletions kong/tools/batch_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
-- end
--
-- local q = BatchQueue.new(
-- name, -- name of the queue for identification purposes in the log
-- process, -- function used to "process/consume" values from the queue
-- { -- Opts table with control values. Defaults shown:
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- }
-- )
--
Expand Down Expand Up @@ -70,9 +68,11 @@ local timer_at = ngx.timer.at
local remove = table.remove
local type = type
local huge = math.huge
local fmt = string.format
local min = math.min
local now = ngx.now
local ERR = ngx.ERR
local ngx_log = ngx.log
local DEBUG = ngx.DEBUG
local WARN = ngx.WARN

Expand Down Expand Up @@ -100,10 +100,10 @@ local process
local function schedule_flush(self)
local ok, err = timer_at(self.flush_timeout/1000, flush, self)
if not ok then
self:log(ERR, "failed to create delayed flush timer: %s", err)
ngx_log(ERR, "failed to create delayed flush timer: ", err)
return
end
--self:log(DEBUG, "delayed timer created")
--ngx_log(DEBUG, "delayed timer created")
self.flush_scheduled = true
end

Expand All @@ -113,10 +113,10 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @param delay number: timer delay in seconds
local function schedule_process(self, delay)
local ok, err = timer_at(delay, process, self)
local function schedule_process(self, batch, delay)
local ok, err = timer_at(delay, process, self, batch)
if not ok then
self:log(ERR, "failed to create process timer: %s", err)
ngx_log(ERR, "failed to create process timer: ", err)
return
end
self.process_scheduled = true
Expand Down Expand Up @@ -147,13 +147,13 @@ flush = function(premature, self)

if get_now() - self.last_t < self.flush_timeout then
-- flushing reported: we had activity
self:log(DEBUG, "[flush] queue had activity, delaying flush")
ngx_log(DEBUG, "[flush] queue had activity, delaying flush")
schedule_flush(self)
return
end

-- no activity and timeout reached
self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
self:flush()
self.flush_scheduled = false
end
Expand All @@ -165,40 +165,38 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @return nothing
process = function(premature, self)
process = function(premature, self, batch)
if premature then
return
end

local batch = self.batch_queue[1]
if not batch then
self:log(WARN, "queue process called but no batches to be processed")
return
end

local next_retry_delay

local ok, err = self.process(batch.entries)
if ok then -- success, reset retry delays
self.retry_delay = 1
next_retry_delay = 0
remove(self.batch_queue, 1)

else
batch.retries = batch.retries + 1
if batch.retries < self.retry_count then
self:log(WARN, "failed to process entries: %s", tostring(err))
ngx_log(WARN, "failed to process entries: ", tostring(err))
-- queue our data for processing again, at the end of the queue
self.batch_queue[#self.batch_queue + 1] = batch
else
self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries)
remove(self.batch_queue, 1)
ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it",
batch.retries))
end

self.retry_delay = self.retry_delay + 1
next_retry_delay = min(RETRY_MAX_DELAY, self.retry_delay * self.retry_delay)
end

if #self.batch_queue > 0 then -- more to process?
self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue)
schedule_process(self, next_retry_delay)
ngx_log(DEBUG, fmt("processing oldest data, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, next_retry_delay)
return
end

Expand All @@ -220,15 +218,13 @@ end
-- @param opts table, optionally including
-- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay`
-- @return table: a Queue object.
function Queue.new(name, process, opts)
function Queue.new(process, opts)
opts = opts or {}

assert(type(name) == "string",
"arg #1 (name) must be a string")
assert(type(process) == "function",
"arg #2 (process) must be a function")
"arg #1 (process) must be a function")
assert(type(opts) == "table",
"arg #3 (opts) must be a table")
"arg #2 (opts) must be a table")
assert(opts.retry_count == nil or type(opts.retry_count) == "number",
"retry_count must be a number")
assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number",
Expand All @@ -237,19 +233,15 @@ function Queue.new(name, process, opts)
"batch_max_size must be a number")
assert(opts.process_delay == nil or type(opts.batch_max_size) == "number",
"process_delay must be a number")
assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number",
"max_queued_batches must be a number")

local self = {
name = name,
process = process,

-- flush timeout in milliseconds
flush_timeout = opts.flush_timeout and opts.flush_timeout * 1000 or 2000,
retry_count = opts.retry_count or 0,
batch_max_size = opts.batch_max_size or 1000,
process_delay = opts.process_delay or 1,
max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100,

retry_delay = 1,

Expand All @@ -266,17 +258,6 @@ function Queue.new(name, process, opts)
end


-------------------------------------------------------------------------------
-- Log a message that includes the name of the queue for identification purposes
-- @param self Queue
-- @param level: log level
-- @param formatstring: format string, will get the queue name and ": " prepended
-- @param ...: formatter arguments
function Queue:log(level, formatstring, ...)
return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...})))
end


-------------------------------------------------------------------------------
-- Add data to the queue
-- @param entry the value included in the queue. It can be any Lua value besides nil.
Expand All @@ -288,8 +269,8 @@ function Queue:add(entry)

if self.batch_max_size == 1 then
-- no batching
self.batch_queue = { { entries = { entry }, retries = 0 } }
schedule_process(self, 0)
local batch = { entries = { entry }, retries = 0 }
schedule_process(self, batch, 0)
return true
end

Expand Down Expand Up @@ -323,12 +304,8 @@ function Queue:flush()

-- Queue the current batch, if it has at least 1 entry
if current_batch_size > 0 then
self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size)
ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)")

while #self.batch_queue >= self.max_queued_batches do
self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches)
remove(self.batch_queue, 1)
end
self.batch_queue[#self.batch_queue + 1] = self.current_batch
self.current_batch = { entries = {}, retries = 0 }
end
Expand All @@ -337,8 +314,10 @@ function Queue:flush()
-- in the future. This will keep calling itself in the future until
-- the queue is empty
if #self.batch_queue > 0 and not self.process_scheduled then
self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue)
schedule_process(self, self.process_delay)
ngx_log(DEBUG, fmt("processing oldest entry, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, self.process_delay)
end

return true
Expand Down
30 changes: 0 additions & 30 deletions spec/01-unit/27-batch_queue_spec.lua

This file was deleted.

0 comments on commit 9b41c75

Please sign in to comment.