diff --git a/changelog/unreleased/kong/feat-queue-concurrency-limit.yml b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml new file mode 100644 index 000000000000..57ffc3c621f3 --- /dev/null +++ b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml @@ -0,0 +1,5 @@ +message: | + Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers. + Note that setting `concurrency_limit` to `-1` means no limit at all, and each HTTP log entry would create an individual timer for sending. +type: feature +scope: Core diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 5c1b7404fe8a..94a33aac1526 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -166,6 +166,7 @@ return { opentelemetry = { "traces_endpoint", "logs_endpoint", + "queue.concurrency_limit", }, ai_proxy = { "max_request_body_size", @@ -210,5 +211,17 @@ return { acl = { "always_use_authenticated_groups", }, + http_log = { + "queue.concurrency_limit", + }, + statsd = { + "queue.concurrency_limit", + }, + datadog = { + "queue.concurrency_limit", + }, + zipkin = { + "queue.concurrency_limit", + }, }, } diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index dc6a8fb6a108..37284862b48e 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -240,16 +240,18 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue = setmetatable(queue, Queue_mt) - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) + if queue.concurrency_limit == 1 then + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + queues[key] = queue + end - queues[key] = queue queue:log_debug("queue created") @@ -314,6 +316,45 @@ function Queue.can_enqueue(queue_conf, entry) return _can_enqueue(queue, entry) end +local function handle(self, entries) + local entry_count = #entries + + local start_time = now() + local retry_count = 0 + while true do + self:log_debug("passing %d entries to handler", entry_count) + local status, ok, err = pcall(self.handler, self.handler_conf, entries) + if status and ok == true then + self:log_debug("handler processed %d entries successfully", entry_count) + break + end + + if not status then + -- protected call failed, ok is the error message + err = ok + end + + self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) + + if not err then + self:log_err("handler returned falsy value but no error information") + end + + if (now() - start_time) > self.max_retry_time then + self:log_err( + "could not send entries due to max_retry_time exceeded. %d queue entries were lost", + entry_count) + break + end + + -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with + -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. + -- The maximum time between retries is capped by the max_retry_delay configuration parameter. + sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) + retry_count = retry_count + 1 + end +end + -- Delete the frontmost entry from the queue and adjust the current utilization variables. function Queue:delete_frontmost_entry() @@ -387,41 +428,7 @@ function Queue:process_once() self.already_dropped_entries = false end - local start_time = now() - local retry_count = 0 - while true do - self:log_debug("passing %d entries to handler", entry_count) - local status - status, ok, err = pcall(self.handler, self.handler_conf, batch) - if status and ok == true then - self:log_debug("handler processed %d entries successfully", entry_count) - break - end - - if not status then - -- protected call failed, ok is the error message - err = ok - end - - self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) - - if not err then - self:log_err("handler returned falsy value but no error information") - end - - if (now() - start_time) > self.max_retry_time then - self:log_err( - "could not send entries, giving up after %d retries. %d queue entries were lost", - retry_count, entry_count) - break - end - - -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with - -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. - -- The maximum time between retries is capped by the max_retry_delay configuration parameter. - sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) - retry_count = retry_count + 1 - end + handle(self, batch) end @@ -506,6 +513,21 @@ local function enqueue(self, entry) return nil, "entry must be a non-nil Lua value" end + if self.concurrency_limit == -1 then -- unlimited concurrency + -- do not enqueue when concurrency_limit is unlimited + local ok, err = kong.timer:at(0, function(premature) + if premature then + return + end + handle(self, { entry }) + end) + if not ok then + return nil, "failed to crete timer: " .. err + end + return true + end + + if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then if not self.warned then self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100) @@ -615,6 +637,11 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value) "arg #1 (queue_conf) max_bytes must be a number or nil" ) + assert( + type(queue_conf.concurrency_limit) == "number", + "arg #1 (queue_conf) concurrency_limit must be a number" + ) + local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end diff --git a/kong/tools/queue_schema.lua b/kong/tools/queue_schema.lua index 94132ed21b5f..51d73981bd8b 100644 --- a/kong/tools/queue_schema.lua +++ b/kong/tools/queue_schema.lua @@ -49,5 +49,12 @@ return Schema.define { between = { 0.001, 1000000 }, -- effectively unlimited maximum description = "Maximum time in seconds between retries, caps exponential backoff." } }, + { concurrency_limit = { + type = "integer", + default = 1, + one_of = { -1, 1 }, + description = "The number of of queue delivery timers. -1 indicates unlimited." + } }, + } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua index f12359a8aa5a..44660491190d 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua @@ -179,6 +179,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency_limit = 1, }, } }, @@ -236,6 +237,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency_limit = 1, }, } }, @@ -353,6 +355,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency_limit = 1, }, } }, @@ -674,6 +677,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency_limit = 1, }, } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua index a869822e847e..da3beb80d698 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua @@ -302,6 +302,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency_limit = 1, }, } }, @@ -409,6 +410,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency_limit = 1, }, }, consumer = { @@ -611,7 +613,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - } + concurrency_limit = 1, + }, }, consumer = null, created_at = 1234567890, @@ -1128,6 +1131,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency_limit = 1, }, }, consumer = null, diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 548be6b42bbe..5d9eeeea7e7f 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -786,7 +786,7 @@ describe("plugin queue", function() assert.equal("One", processed[1]) assert.equal("Three", processed[2]) assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error') - assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost') + assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost') end) it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function() @@ -799,6 +799,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } local function enqueue(queue_conf, entry) @@ -836,6 +837,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist @@ -861,6 +863,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 9eecc8ec7a45..15ae94852017 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_35.config.traces_endpoint = nil expected_otel_prior_35.config.logs_endpoint = nil expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_otel_prior_35) @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_34.config.traces_endpoint = nil expected_otel_prior_34.config.logs_endpoint = nil expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_otel_prior_34) -- cleanup @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_35.config.header_type = "preserve" expected_zipkin_prior_35.config.default_header_type = "b3" expected_zipkin_prior_35.config.propagation = nil + expected_zipkin_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_zipkin_prior_35) -- cleanup @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_34.config.header_type = "preserve" expected_zipkin_prior_34.config.default_header_type = "b3" expected_zipkin_prior_34.config.propagation = nil + expected_zipkin_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_zipkin_prior_34) -- cleanup diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index fb96cb03d38e..47136e42bd0a 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -338,6 +338,25 @@ for _, strategy in helpers.each_strategy() do } } + local route1_4 = bp.routes:insert { + hosts = { "no_queue.test" }, + service = service1 + } + + bp.plugins:insert { + route = { id = route1_4.id }, + name = "http-log", + config = { + http_endpoint = "http://" .. helpers.mock_upstream_host + .. ":" + .. helpers.mock_upstream_port + .. "/post_log/http", + queue = { + concurrency_limit = -1, + }, + } + } + helpers.setenv(vault_env_name, vault_env_value) assert(helpers.start_kong({ @@ -638,6 +657,20 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) + + it("should not use queue when queue.concurrency_limit is -1", function() + reset_log("http") + local res = proxy_client:get("/status/200", { + headers = { + ["Host"] = "no_queue.test" + } + }) + assert.res_status(200, res) + + local entries = get_log("http", 1) + assert.same("127.0.0.1", entries[1].client_ip) + assert.logfile().has.no.line("processing queue", true) -- should not use queue + end) end) -- test both with a single worker for a deterministic test,