Skip to content

Commit

Permalink
wip: feat: introduce queue.concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
vm-001 committed Aug 9, 2024
1 parent 86482ba commit 39645d8
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 129 deletions.
2 changes: 1 addition & 1 deletion changelog/unreleased/kong/http-log-concurrent-optimize.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
message: "**HTTP-Log**: Added a new configuration `no_queue` to indicates whether to disable the queue mechanism."
message: "**HTTP-Log**: Added a new configuration `queue.concurrency`"
type: feature
scope: Plugin
11 changes: 10 additions & 1 deletion kong/clustering/compat/removed_fields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,17 @@ return {
acl = {
"always_use_authenticated_groups",
},
opentelemetry = {
"queue.concurrency",
},
http_log = {
"no_queue"
"queue.concurrency",
},
statsd = {
"queue.concurrency",
},
datadog = {
"queue.concurrency",
},
},
}
18 changes: 1 addition & 17 deletions kong/plugins/http-log/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ local tonumber = tonumber
local fmt = string.format
local pairs = pairs
local max = math.max
local timer_at = ngx.timer.at


local sandbox_opts = { env = { kong = kong, ngx = ngx } }
Expand Down Expand Up @@ -190,27 +189,12 @@ function HttpLogHandler:log(conf)

local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf))
kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name)
local entry = cjson.encode(kong.log.serialize())

if conf.no_queue then
local queue = Queue.create(queue_conf, send_entries, conf)
local ok, err = timer_at(0, function(premature)
if premature then
return
end
queue:handle({ entry })
end)
if not ok then
kong.log.err("failed to create timer: ", err)
end
return
end

local ok, err = Queue.enqueue(
queue_conf,
send_entries,
conf,
entry
cjson.encode(kong.log.serialize())
)
if not ok then
kong.log.err("Failed to enqueue log entry to log server: ", err)
Expand Down
1 change: 0 additions & 1 deletion kong/plugins/http-log/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ return {
}},
{ queue = typedefs.queue },
{ custom_fields_by_lua = typedefs.lua_code },
{ no_queue = { description = "Indicates whether to disable the queue mechanism.", type = "boolean", default = false, required = true }, },
},
custom_validator = function(config)
-- check no double userinfo + authorization header
Expand Down
200 changes: 101 additions & 99 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ local math_min = math.min
local now = ngx.now
local sleep = ngx.sleep
local null = ngx.null
local timer_at = ngx.timer.at


local Queue = {}
Expand Down Expand Up @@ -203,14 +204,6 @@ end
-- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size`
-- @return table: a Queue object.
local function get_or_create_queue(queue_conf, handler, handler_conf)
assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")

local name = assert(queue_conf.name)
local key = _make_queue_key(name)
Expand All @@ -223,70 +216,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)
return queue
end

queue = Queue.create(queue_conf, handler, handler_conf)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)

queues[key] = queue

queue:log_debug("queue created")

return queue
end

function Queue.create(queue_conf, handler, handler_conf)
assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")


assert(
type(queue_conf.max_batch_size) == "number",
"arg #1 (queue_conf) max_batch_size must be a number"
)
assert(
type(queue_conf.max_coalescing_delay) == "number",
"arg #1 (queue_conf) max_coalescing_delay must be a number"
)
assert(
type(queue_conf.max_entries) == "number",
"arg #1 (queue_conf) max_entries must be a number"
)
assert(
type(queue_conf.max_retry_time) == "number",
"arg #1 (queue_conf) max_retry_time must be a number"
)
assert(
type(queue_conf.initial_retry_delay) == "number",
"arg #1 (queue_conf) initial_retry_delay must be a number"
)
assert(
type(queue_conf.max_retry_delay) == "number",
"arg #1 (queue_conf) max_retry_delay must be a number"
)

local max_bytes_type = type(queue_conf.max_bytes)
assert(
max_bytes_type == "nil" or max_bytes_type == "number",
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

local name = assert(queue_conf.name)
local key = _make_queue_key(name)

local queue = {
queue = {
-- Queue parameters from the enqueue call
name = name,
key = key,
Expand All @@ -309,7 +239,21 @@ function Queue.create(queue_conf, handler, handler_conf)
queue[option] = value
end

return setmetatable(queue, Queue_mt)
queue = setmetatable(queue, Queue_mt)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)

queues[key] = queue

queue:log_debug("queue created")

return queue
end


Expand Down Expand Up @@ -370,31 +314,7 @@ function Queue.can_enqueue(queue_conf, entry)
return _can_enqueue(queue, entry)
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
if self.max_bytes then
-- If max_bytes is set, reduce the currently queued byte count by the
self.bytes_queued = self.bytes_queued - #self.entries[self.front]
end
self.entries[self.front] = nil
self.front = self.front + 1
if self.front == self.back then
self.front = 1
self.back = 1
end
end


-- Drop the oldest entry, adjusting the semaphore value in the process. This is
-- called when the queue runs out of space and needs to make space.
function Queue:drop_oldest_entry()
assert(self.semaphore:count() > 0)
self.semaphore:wait(0)
self:delete_frontmost_entry()
end

function Queue:handle(entries)
local function handle(self, entries)
local entry_count = #entries

local start_time = now()
Expand Down Expand Up @@ -434,6 +354,30 @@ function Queue:handle(entries)
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
if self.max_bytes then
-- If max_bytes is set, reduce the currently queued byte count by the
self.bytes_queued = self.bytes_queued - #self.entries[self.front]
end
self.entries[self.front] = nil
self.front = self.front + 1
if self.front == self.back then
self.front = 1
self.back = 1
end
end


-- Drop the oldest entry, adjusting the semaphore value in the process. This is
-- called when the queue runs out of space and needs to make space.
function Queue:drop_oldest_entry()
assert(self.semaphore:count() > 0)
self.semaphore:wait(0)
self:delete_frontmost_entry()
end


-- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an
-- error or no items were on the queue to be processed.
function Queue:process_once()
Expand Down Expand Up @@ -482,7 +426,7 @@ function Queue:process_once()
self.already_dropped_entries = false
end

self:handle(batch)
handle(self, batch)
end


Expand Down Expand Up @@ -567,6 +511,21 @@ local function enqueue(self, entry)
return nil, "entry must be a non-nil Lua value"
end


if self.concurrency == 0 then
local ok, err = timer_at(0, function(premature)
if premature then
return
end
handle(self, { entry })
end)
if not ok then
return nil, "failed to crete timer: " .. err
end
return true
end


if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then
if not self.warned then
self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100)
Expand Down Expand Up @@ -635,6 +594,49 @@ end


function Queue.enqueue(queue_conf, handler, handler_conf, value)

assert(type(queue_conf) == "table",
"arg #1 (queue_conf) must be a table")
assert(type(handler) == "function",
"arg #2 (handler) must be a function")
assert(handler_conf == nil or type(handler_conf) == "table",
"arg #3 (handler_conf) must be a table or nil")
assert(type(queue_conf.name) == "string",
"arg #1 (queue_conf) must include a name")

assert(
type(queue_conf.max_batch_size) == "number",
"arg #1 (queue_conf) max_batch_size must be a number"
)
assert(
type(queue_conf.max_coalescing_delay) == "number",
"arg #1 (queue_conf) max_coalescing_delay must be a number"
)
assert(
type(queue_conf.max_entries) == "number",
"arg #1 (queue_conf) max_entries must be a number"
)
assert(
type(queue_conf.max_retry_time) == "number",
"arg #1 (queue_conf) max_retry_time must be a number"
)
assert(
type(queue_conf.initial_retry_delay) == "number",
"arg #1 (queue_conf) initial_retry_delay must be a number"
)
assert(
type(queue_conf.max_retry_delay) == "number",
"arg #1 (queue_conf) max_retry_delay must be a number"
)

local max_bytes_type = type(queue_conf.max_bytes)
assert(
max_bytes_type == "nil" or max_bytes_type == "number",
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

-- TODO: assert concurrency

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
return enqueue(queue, value)
end
Expand Down
8 changes: 8 additions & 0 deletions kong/tools/queue_schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,13 @@ return Schema.define {
between = { 0.001, 1000000 }, -- effectively unlimited maximum
description = "Maximum time in seconds between retries, caps exponential backoff."
} },
{ concurrency = {
type = "integer",
default = 1,
between = { 0, 1 },
required = true,
description = "TBD11111111"
} },

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ describe("declarative config: process_auto_fields", function()
max_retry_delay = 60,
max_retry_time = 60,
},
no_queue = false,
}
},
}
Expand Down Expand Up @@ -238,7 +237,6 @@ describe("declarative config: process_auto_fields", function()
max_retry_delay = 60,
max_retry_time = 60,
},
no_queue = false,
}
},
}
Expand Down Expand Up @@ -356,7 +354,6 @@ describe("declarative config: process_auto_fields", function()
max_retry_delay = 60,
max_retry_time = 60,
},
no_queue = false,
}
},
}
Expand Down Expand Up @@ -678,7 +675,6 @@ describe("declarative config: process_auto_fields", function()
max_retry_delay = 60,
max_retry_time = 60,
},
no_queue = false,
}
}
}
Expand Down
Loading

0 comments on commit 39645d8

Please sign in to comment.