From 063bbf2bf695c48f4cb62be29bb4e8b56101f394 Mon Sep 17 00:00:00 2001 From: Andy Zhang Date: Wed, 13 Sep 2023 17:27:19 +0800 Subject: [PATCH] Revert "fix(*): prevent queues from growing without bounds (#10046) (#10058)" This reverts commit 4f30dd3be435198f63acd8eb0cc9eb39a1c083c6. --- CHANGELOG.md | 12 ---- kong/conf_loader/init.lua | 2 - kong/plugins/http-log/handler.lua | 2 +- kong/tools/batch_queue.lua | 89 +++++++++++----------------- spec/01-unit/27-batch_queue_spec.lua | 33 ----------- 5 files changed, 35 insertions(+), 103 deletions(-) delete mode 100644 spec/01-unit/27-batch_queue_spec.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 1468a005ed30..26d1478a4935 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,18 +66,6 @@ - [0.9.9 and prior](#099---20170202) -## Unrelease - -### Fixes - -##### Plugins - -- Update the batch queues module so that queues no longer grow without bounds if - their consumers fail to process the entries. Instead, old batches are now dropped - and an error is logged. - [#10046](https://github.com/Kong/kong/pull/10046) - - ## [2.8.3] > Released 2022/11/02 diff --git a/kong/conf_loader/init.lua b/kong/conf_loader/init.lua index ed70bc1cf835..feb873d99db4 100644 --- a/kong/conf_loader/init.lua +++ b/kong/conf_loader/init.lua @@ -657,8 +657,6 @@ local CONF_INFERENCES = { untrusted_lua = { enum = { "on", "off", "sandbox" } }, untrusted_lua_sandbox_requires = { typ = "array" }, untrusted_lua_sandbox_environment = { typ = "array" }, - - max_queued_batches = { typ = "number" }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 2c4d130b97b4..ef82bf5bc147 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -170,7 +170,7 @@ function HttpLogHandler:log(conf) } local err - q, err = BatchQueue.new("http-log", process, opts) + q, err = BatchQueue.new(process, opts) if not q then kong.log.err("could not create queue: ", err) return diff --git a/kong/tools/batch_queue.lua b/kong/tools/batch_queue.lua index 92322905a224..8eaf5ae56ef3 100644 --- a/kong/tools/batch_queue.lua +++ b/kong/tools/batch_queue.lua @@ -24,14 +24,12 @@ -- end -- -- local q = BatchQueue.new( --- name, -- name of the queue for identification purposes in the log -- process, -- function used to "process/consume" values from the queue -- { -- Opts table with control values. Defaults shown: --- retry_count = 0, -- number of times to retry processing --- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing --- process_delay = 1, -- in seconds, how often the current batch is closed & queued --- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued --- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued +-- retry_count = 0, -- number of times to retry processing +-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing +-- process_delay = 1, -- in seconds, how often the current batch is closed & queued +-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued -- } -- ) -- @@ -70,9 +68,11 @@ local timer_at = ngx.timer.at local remove = table.remove local type = type local huge = math.huge +local fmt = string.format local min = math.min local now = ngx.now local ERR = ngx.ERR +local ngx_log = ngx.log local DEBUG = ngx.DEBUG local WARN = ngx.WARN @@ -100,10 +100,10 @@ local process local function schedule_flush(self) local ok, err = timer_at(self.flush_timeout/1000, flush, self) if not ok then - self:log(ERR, "failed to create delayed flush timer: %s", err) + ngx_log(ERR, "failed to create delayed flush timer: ", err) return end - --self:log(DEBUG, "delayed timer created") + --ngx_log(DEBUG, "delayed timer created") self.flush_scheduled = true end @@ -113,10 +113,10 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @param delay number: timer delay in seconds -local function schedule_process(self, delay) - local ok, err = timer_at(delay, process, self) +local function schedule_process(self, batch, delay) + local ok, err = timer_at(delay, process, self, batch) if not ok then - self:log(ERR, "failed to create process timer: %s", err) + ngx_log(ERR, "failed to create process timer: ", err) return end self.process_scheduled = true @@ -147,13 +147,13 @@ flush = function(premature, self) if get_now() - self.last_t < self.flush_timeout then -- flushing reported: we had activity - self:log(DEBUG, "[flush] queue had activity, delaying flush") + ngx_log(DEBUG, "[flush] queue had activity, delaying flush") schedule_flush(self) return end -- no activity and timeout reached - self:log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") + ngx_log(DEBUG, "[flush] queue had no activity, flushing triggered by flush_timeout") self:flush() self.flush_scheduled = false end @@ -165,31 +165,27 @@ end -- @param self Queue -- @param batch: table with `entries` and `retries` counter -- @return nothing -process = function(premature, self) +process = function(premature, self, batch) if premature then return end - local batch = self.batch_queue[1] - if not batch then - self:log(WARN, "queue process called but no batches to be processed") - return - end - local next_retry_delay local ok, err = self.process(batch.entries) if ok then -- success, reset retry delays self.retry_delay = 1 next_retry_delay = 0 - remove(self.batch_queue, 1) + else batch.retries = batch.retries + 1 if batch.retries < self.retry_count then - self:log(WARN, "failed to process entries: %s", tostring(err)) + ngx_log(WARN, "failed to process entries: ", tostring(err)) + -- queue our data for processing again, at the end of the queue + self.batch_queue[#self.batch_queue + 1] = batch else - self:log(ERR, "entry batch was already tried %d times, dropping it", batch.retries) - remove(self.batch_queue, 1) + ngx_log(ERR, fmt("entry batch was already tried %d times, dropping it", + batch.retries)) end self.retry_delay = self.retry_delay + 1 @@ -197,8 +193,10 @@ process = function(premature, self) end if #self.batch_queue > 0 then -- more to process? - self:log(DEBUG, "processing oldest data, %d still queued", #self.batch_queue) - schedule_process(self, next_retry_delay) + ngx_log(DEBUG, fmt("processing oldest data, %d still queued", + #self.batch_queue - 1)) + local oldest_batch = remove(self.batch_queue, 1) + schedule_process(self, oldest_batch, next_retry_delay) return end @@ -220,15 +218,13 @@ end -- @param opts table, optionally including -- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay` -- @return table: a Queue object. -function Queue.new(name, process, opts) +function Queue.new(process, opts) opts = opts or {} - assert(type(name) == "string", - "arg #1 (name) must be a string") assert(type(process) == "function", - "arg #2 (process) must be a function") + "arg #1 (process) must be a function") assert(type(opts) == "table", - "arg #3 (opts) must be a table") + "arg #2 (opts) must be a table") assert(opts.retry_count == nil or type(opts.retry_count) == "number", "retry_count must be a number") assert(opts.flush_timeout == nil or type(opts.flush_timeout) == "number", @@ -237,11 +233,8 @@ function Queue.new(name, process, opts) "batch_max_size must be a number") assert(opts.process_delay == nil or type(opts.batch_max_size) == "number", "process_delay must be a number") - assert(opts.max_queued_batches == nil or type(opts.max_queued_batches) == "number", - "max_queued_batches must be a number") local self = { - name = name, process = process, -- flush timeout in milliseconds @@ -249,7 +242,6 @@ function Queue.new(name, process, opts) retry_count = opts.retry_count or 0, batch_max_size = opts.batch_max_size or 1000, process_delay = opts.process_delay or 1, - max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100, retry_delay = 1, @@ -266,17 +258,6 @@ function Queue.new(name, process, opts) end -------------------------------------------------------------------------------- --- Log a message that includes the name of the queue for identification purposes --- @param self Queue --- @param level: log level --- @param formatstring: format string, will get the queue name and ": " prepended --- @param ...: formatter arguments -function Queue:log(level, formatstring, ...) - return ngx.log(level, string.format(self.name .. ": " .. formatstring, unpack({...}))) -end - - ------------------------------------------------------------------------------- -- Add data to the queue -- @param entry the value included in the queue. It can be any Lua value besides nil. @@ -288,8 +269,8 @@ function Queue:add(entry) if self.batch_max_size == 1 then -- no batching - self.batch_queue = { { entries = { entry }, retries = 0 } } - schedule_process(self, 0) + local batch = { entries = { entry }, retries = 0 } + schedule_process(self, batch, 0) return true end @@ -323,12 +304,8 @@ function Queue:flush() -- Queue the current batch, if it has at least 1 entry if current_batch_size > 0 then - self:log(DEBUG, "queueing batch for processing (%d entries)", current_batch_size) + ngx_log(DEBUG, "queueing batch for processing (", current_batch_size, " entries)") - while #self.batch_queue >= self.max_queued_batches do - self:log(ERR, "exceeded max_queued_batches (%d), dropping oldest", self.max_queued_batches) - remove(self.batch_queue, 1) - end self.batch_queue[#self.batch_queue + 1] = self.current_batch self.current_batch = { entries = {}, retries = 0 } end @@ -337,8 +314,10 @@ function Queue:flush() -- in the future. This will keep calling itself in the future until -- the queue is empty if #self.batch_queue > 0 and not self.process_scheduled then - self:log(DEBUG, "processing oldest entry, %d still queued", #self.batch_queue) - schedule_process(self, self.process_delay) + ngx_log(DEBUG, fmt("processing oldest entry, %d still queued", + #self.batch_queue - 1)) + local oldest_batch = remove(self.batch_queue, 1) + schedule_process(self, oldest_batch, self.process_delay) end return true diff --git a/spec/01-unit/27-batch_queue_spec.lua b/spec/01-unit/27-batch_queue_spec.lua deleted file mode 100644 index 38b1edcb989d..000000000000 --- a/spec/01-unit/27-batch_queue_spec.lua +++ /dev/null @@ -1,33 +0,0 @@ - -local BatchQueue = require "kong.tools.batch_queue" -local helpers = require "spec.helpers" - -describe("batch queue", function() - - it("observes the limit parameter", function() - local count = 0 - local last - local function process(entries) - count = count + #entries - last = entries[#entries] - return true - end - - local q = BatchQueue.new("batch-queue-unit-test", process, {max_queued_batches=2, batch_max_size=100, process_delay=0}) - - q:add(1) - q:flush() - q:add(2) - q:flush() - q:add(3) - q:flush() - - helpers.wait_until(function() - ngx.sleep(.1) - return #q.batch_queue == 0 - end, 1) - - assert.equal(2, count) - assert.equal(3, last) - end) -end)