diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml index e6ca208f6e2e..71e5de84e64f 100644 --- a/changelog/unreleased/kong/http-log-concurrent-optimize.yml +++ b/changelog/unreleased/kong/http-log-concurrent-optimize.yml @@ -1,3 +1,3 @@ -message: "**HTTP-Log**: Added a new configuration `no_queue` to indicates whether to disable the queue mechanism." +message: "**HTTP-Log**: Added a new configuration `queue.concurrency`" type: feature scope: Plugin diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 7f5f1fadb49f..053462b36f4b 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -210,8 +210,17 @@ return { acl = { "always_use_authenticated_groups", }, + opentelemetry = { + "queue.concurrency", + }, http_log = { - "no_queue" + "queue.concurrency", + }, + statsd = { + "queue.concurrency", + }, + datadog = { + "queue.concurrency", }, }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index c267ddc7de3c..fd1d0cd48eeb 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -14,7 +14,6 @@ local tonumber = tonumber local fmt = string.format local pairs = pairs local max = math.max -local timer_at = ngx.timer.at local sandbox_opts = { env = { kong = kong, ngx = ngx } } @@ -190,27 +189,12 @@ function HttpLogHandler:log(conf) local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf)) kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name) - local entry = cjson.encode(kong.log.serialize()) - - if conf.no_queue then - local queue = Queue.create(queue_conf, send_entries, conf) - local ok, err = timer_at(0, function(premature) - if premature then - return - end - queue:handle({ entry }) - end) - if not ok then - kong.log.err("failed to create timer: ", err) - end - return - end local ok, err = Queue.enqueue( queue_conf, send_entries, conf, - entry + cjson.encode(kong.log.serialize()) ) if not ok then kong.log.err("Failed to enqueue log entry to log server: ", err) diff --git a/kong/plugins/http-log/schema.lua b/kong/plugins/http-log/schema.lua index 8db7219ef607..430761a5ed41 100644 --- a/kong/plugins/http-log/schema.lua +++ b/kong/plugins/http-log/schema.lua @@ -59,7 +59,6 @@ return { }}, { queue = typedefs.queue }, { custom_fields_by_lua = typedefs.lua_code }, - { no_queue = { description = "Indicates whether to disable the queue mechanism.", type = "boolean", default = false, required = true }, }, }, custom_validator = function(config) -- check no double userinfo + authorization header diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 8f45ca24ee6b..37a95ff39c7a 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -76,6 +76,7 @@ local math_min = math.min local now = ngx.now local sleep = ngx.sleep local null = ngx.null +local timer_at = ngx.timer.at local Queue = {} @@ -203,14 +204,6 @@ end -- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size` -- @return table: a Queue object. local function get_or_create_queue(queue_conf, handler, handler_conf) - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") local name = assert(queue_conf.name) local key = _make_queue_key(name) @@ -223,70 +216,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) return queue end - queue = Queue.create(queue_conf, handler, handler_conf) - - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) - - queues[key] = queue - - queue:log_debug("queue created") - - return queue -end - -function Queue.create(queue_conf, handler, handler_conf) - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") - - - assert( - type(queue_conf.max_batch_size) == "number", - "arg #1 (queue_conf) max_batch_size must be a number" - ) - assert( - type(queue_conf.max_coalescing_delay) == "number", - "arg #1 (queue_conf) max_coalescing_delay must be a number" - ) - assert( - type(queue_conf.max_entries) == "number", - "arg #1 (queue_conf) max_entries must be a number" - ) - assert( - type(queue_conf.max_retry_time) == "number", - "arg #1 (queue_conf) max_retry_time must be a number" - ) - assert( - type(queue_conf.initial_retry_delay) == "number", - "arg #1 (queue_conf) initial_retry_delay must be a number" - ) - assert( - type(queue_conf.max_retry_delay) == "number", - "arg #1 (queue_conf) max_retry_delay must be a number" - ) - - local max_bytes_type = type(queue_conf.max_bytes) - assert( - max_bytes_type == "nil" or max_bytes_type == "number", - "arg #1 (queue_conf) max_bytes must be a number or nil" - ) - - local name = assert(queue_conf.name) - local key = _make_queue_key(name) - - local queue = { + queue = { -- Queue parameters from the enqueue call name = name, key = key, @@ -309,7 +239,22 @@ function Queue.create(queue_conf, handler, handler_conf) queue[option] = value end - return setmetatable(queue, Queue_mt) + queue = setmetatable(queue, Queue_mt) + + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + + queues[key] = queue + + queue:log_debug("queue created") + + return queue end @@ -370,31 +315,7 @@ function Queue.can_enqueue(queue_conf, entry) return _can_enqueue(queue, entry) end - --- Delete the frontmost entry from the queue and adjust the current utilization variables. -function Queue:delete_frontmost_entry() - if self.max_bytes then - -- If max_bytes is set, reduce the currently queued byte count by the - self.bytes_queued = self.bytes_queued - #self.entries[self.front] - end - self.entries[self.front] = nil - self.front = self.front + 1 - if self.front == self.back then - self.front = 1 - self.back = 1 - end -end - - --- Drop the oldest entry, adjusting the semaphore value in the process. This is --- called when the queue runs out of space and needs to make space. -function Queue:drop_oldest_entry() - assert(self.semaphore:count() > 0) - self.semaphore:wait(0) - self:delete_frontmost_entry() -end - -function Queue:handle(entries) +local function handle(self, entries) local entry_count = #entries local start_time = now() @@ -434,6 +355,30 @@ function Queue:handle(entries) end +-- Delete the frontmost entry from the queue and adjust the current utilization variables. +function Queue:delete_frontmost_entry() + if self.max_bytes then + -- If max_bytes is set, reduce the currently queued byte count by the + self.bytes_queued = self.bytes_queued - #self.entries[self.front] + end + self.entries[self.front] = nil + self.front = self.front + 1 + if self.front == self.back then + self.front = 1 + self.back = 1 + end +end + + +-- Drop the oldest entry, adjusting the semaphore value in the process. This is +-- called when the queue runs out of space and needs to make space. +function Queue:drop_oldest_entry() + assert(self.semaphore:count() > 0) + self.semaphore:wait(0) + self:delete_frontmost_entry() +end + + -- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an -- error or no items were on the queue to be processed. function Queue:process_once() @@ -482,7 +427,7 @@ function Queue:process_once() self.already_dropped_entries = false end - self:handle(batch) + handle(self, batch) end @@ -567,6 +512,21 @@ local function enqueue(self, entry) return nil, "entry must be a non-nil Lua value" end + + if self.concurrency == 0 then + local ok, err = timer_at(0, function(premature) + if premature then + return + end + handle(self, { entry }) + end) + if not ok then + return nil, "failed to crete timer: " .. err + end + return true + end + + if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then if not self.warned then self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100) @@ -635,6 +595,49 @@ end function Queue.enqueue(queue_conf, handler, handler_conf, value) + + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") + + assert( + type(queue_conf.max_batch_size) == "number", + "arg #1 (queue_conf) max_batch_size must be a number" + ) + assert( + type(queue_conf.max_coalescing_delay) == "number", + "arg #1 (queue_conf) max_coalescing_delay must be a number" + ) + assert( + type(queue_conf.max_entries) == "number", + "arg #1 (queue_conf) max_entries must be a number" + ) + assert( + type(queue_conf.max_retry_time) == "number", + "arg #1 (queue_conf) max_retry_time must be a number" + ) + assert( + type(queue_conf.initial_retry_delay) == "number", + "arg #1 (queue_conf) initial_retry_delay must be a number" + ) + assert( + type(queue_conf.max_retry_delay) == "number", + "arg #1 (queue_conf) max_retry_delay must be a number" + ) + + local max_bytes_type = type(queue_conf.max_bytes) + assert( + max_bytes_type == "nil" or max_bytes_type == "number", + "arg #1 (queue_conf) max_bytes must be a number or nil" + ) + + -- TODO: assert concurrency + local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end diff --git a/kong/tools/queue_schema.lua b/kong/tools/queue_schema.lua index 94132ed21b5f..01720297f07a 100644 --- a/kong/tools/queue_schema.lua +++ b/kong/tools/queue_schema.lua @@ -49,5 +49,13 @@ return Schema.define { between = { 0.001, 1000000 }, -- effectively unlimited maximum description = "Maximum time in seconds between retries, caps exponential backoff." } }, + { concurrency = { + type = "integer", + default = 1, + between = { 0, 1 }, + required = true, + description = "TBD11111111" + } }, + } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua index 38ee81a544ac..f12359a8aa5a 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua @@ -180,7 +180,6 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, - no_queue = false, } }, } @@ -238,7 +237,6 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, - no_queue = false, } }, } @@ -356,7 +354,6 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, - no_queue = false, } }, } @@ -678,7 +675,6 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, - no_queue = false, } } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua index 2df84438e2d8..a869822e847e 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua @@ -303,7 +303,6 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, - no_queue = false, } }, { @@ -411,7 +410,6 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, - no_queue = false, }, consumer = { id = "UUID" @@ -613,8 +611,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - }, - no_queue = false, + } }, consumer = null, created_at = 1234567890, @@ -1132,7 +1129,6 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, - no_queue = false, }, consumer = null, created_at = 1234567890, diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index c7a1a8996193..2175e31a1f48 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -351,7 +351,9 @@ for _, strategy in helpers.each_strategy() do .. ":" .. helpers.mock_upstream_port .. "/post_log/http", - no_queue = true, + queue = { + concurrency = 0, + }, } }