Skip to content

Commit

Permalink
Revert "Revert "fix(*): prevent queues from growing without bounds (#…
Browse files Browse the repository at this point in the history
…10046) (#10058)""

This reverts commit 2eca27a.
  • Loading branch information
AndyZhang0707 authored and windmgc committed Dec 11, 2023
1 parent 2350ba2 commit 0cff934
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 35 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@

- Fixed a bug where internal redirects (i.e. those produced by the error_page directive) could interfere with worker process handling the request when buffered proxying is being used.

## Unrelease

### Fixes

##### Plugins

- Update the batch queues module so that queues no longer grow without bounds if
their consumers fail to process the entries. Instead, old batches are now dropped
and an error is logged.
[#10046](https://github.com/Kong/kong/pull/10046)


## [2.8.3]

> Released 2022/11/02
Expand Down
2 changes: 2 additions & 0 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ local CONF_INFERENCES = {
untrusted_lua = { enum = { "on", "off", "sandbox" } },
untrusted_lua_sandbox_requires = { typ = "array" },
untrusted_lua_sandbox_environment = { typ = "array" },

max_queued_batches = { typ = "number" },
}


Expand Down
2 changes: 1 addition & 1 deletion kong/plugins/http-log/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf)
}

local err
q, err = BatchQueue.new(process, opts)
q, err = BatchQueue.new("http-log", process, opts)
if not q then
kong.log.err("could not create queue: ", err)
return
Expand Down
89 changes: 55 additions & 34 deletions kong/tools/batch_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
-- end
--
-- local q = BatchQueue.new(
-- name, -- name of the queue for identification purposes in the log
-- process, -- function used to "process/consume" values from the queue
-- { -- Opts table with control values. Defaults shown:
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- retry_count = 0, -- number of times to retry processing
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued
-- }
-- )
--
Expand Down Expand Up @@ -68,11 +70,9 @@ local timer_at = ngx.timer.at
local remove = table.remove
local type = type
local huge = math.huge
local fmt = string.format
local min = math.min
local now = ngx.now
local ERR = ngx.ERR
local ngx_log = ngx.log
local DEBUG = ngx.DEBUG
local WARN = ngx.WARN

Expand Down Expand Up @@ -100,10 +100,10 @@ local process
local function schedule_flush(self)
local ok, err = timer_at(self.flush_timeout/1000, flush, self)
if not ok then
ngx_log(ERR, "failed to create delayed flush timer: ", err)
self:log(ERR, "failed to create delayed flush timer: %s", err)
return
end
--ngx_log(DEBUG, "delayed timer created")
--self:log(DEBUG, "delayed timer created")
self.flush_scheduled = true
end

Expand All @@ -113,10 +113,10 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @param delay number: timer delay in seconds
local function schedule_process(self, batch, delay)
local ok, err = timer_at(delay, process, self, batch)
local function schedule_process(self, delay)
local ok, err = timer_at(delay, process, self)
if not ok then
ngx_log(ERR, "failed to create process timer: ", err)
self:log(ERR, "failed to create process timer: %s", err)
return
end
self.process_scheduled = true
Expand Down Expand Up @@ -147,13 +147,13 @@ flush = function(premature, self)

if get_now() - self.last_t < self.flush_timeout then
-- flushing reported: we had activity
ngx_log(DEBUG, "[flush] queue had activity, delaying flush")
self:log(DEBUG, "[flush] queue had activity, delaying flush")
schedule_flush(self)
return
end

-- no activity and timeout reached
ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout")
self:flush()
self.flush_scheduled = false
end
Expand All @@ -165,38 +165,40 @@ end
-- @param self Queue
-- @param batch: table with `entries` and `retries` counter
-- @return nothing
process = function(premature, self, batch)
process = function(premature, self)
if premature then
return
end

local batch = self.batch_queue[1]
if not batch then
self:log(WARN, "queue process called but no batches to be processed")
return
end

local next_retry_delay

local ok, err = self.process(batch.entries)
if ok then -- success, reset retry delays
self.retry_delay = 1
next_retry_delay = 0

remove(self.batch_queue, 1)
else
batch.retries = batch.retries + 1
if batch.retries < self.retry_count then
ngx_log(WARN, "failed to process entries: ", tostring(err))
-- queue our data for processing again, at the end of the queue
self.batch_queue[#self.batch_queue + 1] = batch
self:log(WARN, "failed to process entries: %s", tostring(err))
else
ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it",
batch.retries))
self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries)
remove(self.batch_queue, 1)
end

self.retry_delay = self.retry_delay + 1
next_retry_delay = min(RETRY_MAX_DELAY, self.retry_delay * self.retry_delay)
end

if #self.batch_queue > 0 then -- more to process?
ngx_log(DEBUG, fmt("processing oldest data, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, next_retry_delay)
self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue)
schedule_process(self, next_retry_delay)
return
end

Expand All @@ -218,13 +220,15 @@ end
-- @param opts table, optionally including
-- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay`
-- @return table: a Queue object.
function Queue.new(process, opts)
function Queue.new(name, process, opts)
opts = opts or {}

assert(type(name) == "string",
"arg #1 (name) must be a string")
assert(type(process) == "function",
"arg #1 (process) must be a function")
"arg #2 (process) must be a function")
assert(type(opts) == "table",
"arg #2 (opts) must be a table")
"arg #3 (opts) must be a table")
assert(opts.retry_count == nil or type(opts.retry_count) == "number",
"retry_count must be a number")
assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number",
Expand All @@ -233,15 +237,19 @@ function Queue.new(process, opts)
"batch_max_size must be a number")
assert(opts.process_delay == nil or type(opts.batch_max_size) == "number",
"process_delay must be a number")
assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number",
"max_queued_batches must be a number")

local self = {
name = name,
process = process,

-- flush timeout in milliseconds
flush_timeout = opts.flush_timeout and opts.flush_timeout * 1000 or 2000,
retry_count = opts.retry_count or 0,
batch_max_size = opts.batch_max_size or 1000,
process_delay = opts.process_delay or 1,
max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100,

retry_delay = 1,

Expand All @@ -258,6 +266,17 @@ function Queue.new(process, opts)
end


-------------------------------------------------------------------------------
-- Log a message that includes the name of the queue for identification purposes
-- @param self Queue
-- @param level: log level
-- @param formatstring: format string, will get the queue name and ": " prepended
-- @param ...: formatter arguments
function Queue:log(level, formatstring, ...)
return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...})))
end


-------------------------------------------------------------------------------
-- Add data to the queue
-- @param entry the value included in the queue. It can be any Lua value besides nil.
Expand All @@ -269,8 +288,8 @@ function Queue:add(entry)

if self.batch_max_size == 1 then
-- no batching
local batch = { entries = { entry }, retries = 0 }
schedule_process(self, batch, 0)
self.batch_queue = { { entries = { entry }, retries = 0 } }
schedule_process(self, 0)
return true
end

Expand Down Expand Up @@ -304,8 +323,12 @@ function Queue:flush()

-- Queue the current batch, if it has at least 1 entry
if current_batch_size > 0 then
ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)")
self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size)

while #self.batch_queue >= self.max_queued_batches do
self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches)
remove(self.batch_queue, 1)
end
self.batch_queue[#self.batch_queue + 1] = self.current_batch
self.current_batch = { entries = {}, retries = 0 }
end
Expand All @@ -314,10 +337,8 @@ function Queue:flush()
-- in the future. This will keep calling itself in the future until
-- the queue is empty
if #self.batch_queue > 0 and not self.process_scheduled then
ngx_log(DEBUG, fmt("processing oldest entry, %d still queued",
#self.batch_queue - 1))
local oldest_batch = remove(self.batch_queue, 1)
schedule_process(self, oldest_batch, self.process_delay)
self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue)
schedule_process(self, self.process_delay)
end

return true
Expand Down
33 changes: 33 additions & 0 deletions spec/01-unit/27-batch_queue_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

local BatchQueue = require "kong.tools.batch_queue"
local helpers = require "spec.helpers"

describe("batch queue", function()

it("observes the limit parameter", function()
local count = 0
local last
local function process(entries)
count = count + #entries
last = entries[#entries]
return true
end

local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0})

q:add(1)
q:flush()
q:add(2)
q:flush()
q:add(3)
q:flush()

helpers.wait_until(function()
ngx.sleep(.1)
return #q.batch_queue == 0
end, 1)

assert.equal(2, count)
assert.equal(3, last)
end)
end)

0 comments on commit 0cff934

Please sign in to comment.