Skip to content

Commit

Permalink
feat(batchqueue): increase default maximum queue size and remove glob…
Browse files Browse the repository at this point in the history
…al configuration parameter (#10271)

We don't want to support this parameter going forward as we're going
to re-implement the queue system.  This change hard-codes the queue limit
to a value that would be considered "large enough".

We added an undocumented environment variable serves to help users who require
a different limit on the maximum size of queues.  A proper solution is
under development and this undocumented environment variable will only
exist until the reimplementation has been released

Co-authored-by: Datong Sun <datong.sun@konghq.com>
  • Loading branch information
hanshuebner and dndx authored Feb 10, 2023
1 parent 4205881 commit 989b23e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
12 changes: 1 addition & 11 deletions kong.conf.default
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -----------------------
G# -----------------------
# Kong configuration file
# -----------------------
#
Expand Down Expand Up @@ -1809,13 +1809,3 @@
# behavior and explicitly instructs Kong which
# OpenResty installation to use.

#max_queued_batches = 100 # Maximum number of batches to keep on an internal
# plugin queue before dropping old batches. This is
# meant as a global, last-resort control to prevent
# queues from consuming infinite memory. When batches
# are being dropped, an error message
# "exceeded max_queued_batches (%d), dropping oldest"
# will be logged. The error message will also include
# a string that identifies the plugin causing the
# problem. Queues are used by the http-log, statsd,
# opentelemetry and datadog plugins.
2 changes: 0 additions & 2 deletions kong/conf_loader/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,6 @@ local CONF_PARSERS = {

proxy_server = { typ = "string" },
proxy_server_ssl_verify = { typ = "boolean" },

max_queued_batches = { typ = "number" },
}


Expand Down
2 changes: 0 additions & 2 deletions kong/templates/kong_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,4 @@ opentelemetry_tracing = off
opentelemetry_tracing_sampling_rate = 1.0
tracing_instrumentations = off
tracing_sampling_rate = 1.0
max_queued_batches = 100
]]
28 changes: 23 additions & 5 deletions kong/tools/batch_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
-- batch_max_size = 1000, -- max number of entries that can be queued before they are queued for processing
-- process_delay = 1, -- in seconds, how often the current batch is closed & queued
-- flush_timeout = 2, -- in seconds, how much time passes without activity before the current batch is closed and queued
-- max_queued_batches = 100, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued
-- max_queued_batches = 10000, -- max number of batches that can be queued before the oldest batch is dropped when a new one is queued
-- }
-- )
--
Expand Down Expand Up @@ -81,6 +81,24 @@ local WARN = ngx.WARN
local RETRY_MAX_DELAY = 60


local function getenv_number(name, default)
local s = os.getenv(name)
if s then
local n = tonumber(s)
if n ~= nil then
return n
end

ngx.log(ERR, "cannot parse environment variable " .. name .. " as number, returning default")
end

return default
end


local DEFAULT_MAX_QUEUED_BATCHES = getenv_number("KONG_MAX_QUEUED_BATCHES", 10000)


local Queue = {}


Expand Down Expand Up @@ -220,12 +238,12 @@ end
-- @param opts table, optionally including
-- `retry_count`, `flush_timeout`, `batch_max_size` and `process_delay`
-- @return table: a Queue object.
function Queue.new(name, process, opts)
function Queue.new(name, handler, opts)
opts = opts or {}

assert(type(name) == "string",
"arg #1 (name) must be a string")
assert(type(process) == "function",
assert(type(handler) == "function",
"arg #2 (process) must be a function")
assert(type(opts) == "table",
"arg #3 (opts) must be a table")
Expand All @@ -242,14 +260,14 @@ function Queue.new(name, process, opts)

local self = {
name = name,
process = process,
process = handler,

-- flush timeout in milliseconds
flush_timeout = opts.flush_timeout and opts.flush_timeout * 1000 or 2000,
retry_count = opts.retry_count or 0,
batch_max_size = opts.batch_max_size or 1000,
process_delay = opts.process_delay or 1,
max_queued_batches = opts.max_queued_batches or (kong.configuration and kong.configuration.max_queued_batches) or 100,
max_queued_batches = opts.max_queued_batches or DEFAULT_MAX_QUEUED_BATCHES,

retry_delay = 1,

Expand Down

1 comment on commit 989b23e

@khcp-gha-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:989b23e4af138162cf1109c6b5f37d79084a5bc1
Artifacts available https://github.com/Kong/kong/actions/runs/4145546168

Please sign in to comment.